Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 912b2278

History | View | Annotate | Download (7.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the LUXI protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
from ganeti import constants
33
from ganeti import objects
34
import ganeti.rpc.client as cl
35
from ganeti.rpc.transport import Transport
36
from ganeti.rpc.errors import (ProtocolError, ConnectionClosedError,
37
                               TimeoutError, RequestError, NoMasterError,
38
                               PermissionError)
39

    
40
__all__ = [
41
  # functions:
42
  "ProtocolError",
43
  "ConnectionClosedError",
44
  "TimeoutError",
45
  "RequestError",
46
  "NoMasterError",
47
  "PermissionError",
48
  # classes:
49
  "Client"
50
  ]
51

    
52
REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB
53
REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE
54
REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS
55
REQ_PICKUP_JOB = constants.LUXI_REQ_PICKUP_JOB
56
REQ_WAIT_FOR_JOB_CHANGE = constants.LUXI_REQ_WAIT_FOR_JOB_CHANGE
57
REQ_CANCEL_JOB = constants.LUXI_REQ_CANCEL_JOB
58
REQ_ARCHIVE_JOB = constants.LUXI_REQ_ARCHIVE_JOB
59
REQ_CHANGE_JOB_PRIORITY = constants.LUXI_REQ_CHANGE_JOB_PRIORITY
60
REQ_AUTO_ARCHIVE_JOBS = constants.LUXI_REQ_AUTO_ARCHIVE_JOBS
61
REQ_QUERY = constants.LUXI_REQ_QUERY
62
REQ_QUERY_FIELDS = constants.LUXI_REQ_QUERY_FIELDS
63
REQ_QUERY_JOBS = constants.LUXI_REQ_QUERY_JOBS
64
REQ_QUERY_INSTANCES = constants.LUXI_REQ_QUERY_INSTANCES
65
REQ_QUERY_NODES = constants.LUXI_REQ_QUERY_NODES
66
REQ_QUERY_GROUPS = constants.LUXI_REQ_QUERY_GROUPS
67
REQ_QUERY_NETWORKS = constants.LUXI_REQ_QUERY_NETWORKS
68
REQ_QUERY_EXPORTS = constants.LUXI_REQ_QUERY_EXPORTS
69
REQ_QUERY_CONFIG_VALUES = constants.LUXI_REQ_QUERY_CONFIG_VALUES
70
REQ_QUERY_CLUSTER_INFO = constants.LUXI_REQ_QUERY_CLUSTER_INFO
71
REQ_QUERY_TAGS = constants.LUXI_REQ_QUERY_TAGS
72
REQ_SET_DRAIN_FLAG = constants.LUXI_REQ_SET_DRAIN_FLAG
73
REQ_SET_WATCHER_PAUSE = constants.LUXI_REQ_SET_WATCHER_PAUSE
74
REQ_ALL = constants.LUXI_REQ_ALL
75

    
76
DEF_RWTO = constants.LUXI_DEF_RWTO
77
WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT
78

    
79

    
80
class Client(cl.AbstractClient):
81
  """High-level client implementation.
82

83
  This uses a backing Transport-like class on top of which it
84
  implements data serialization/deserialization.
85

86
  """
87
  def __init__(self, address=None, timeouts=None, transport=Transport):
88
    """Constructor for the Client class.
89

90
    Arguments are the same as for L{AbstractClient}.
91

92
    """
93
    super(Client, self).__init__(address, timeouts, transport)
94

    
95
  def SetQueueDrainFlag(self, drain_flag):
96
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
97

    
98
  def SetWatcherPause(self, until):
99
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
100

    
101
  def PickupJob(self, job):
102
    return self.CallMethod(REQ_PICKUP_JOB, (job,))
103

    
104
  def SubmitJob(self, ops):
105
    ops_state = map(lambda op: op.__getstate__(), ops)
106
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
107

    
108
  def SubmitJobToDrainedQueue(self, ops):
109
    ops_state = map(lambda op: op.__getstate__(), ops)
110
    return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
111

    
112
  def SubmitManyJobs(self, jobs):
113
    jobs_state = []
114
    for ops in jobs:
115
      jobs_state.append([op.__getstate__() for op in ops])
116
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
117

    
118
  def CancelJob(self, job_id):
119
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
120

    
121
  def ArchiveJob(self, job_id):
122
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
123

    
124
  def ChangeJobPriority(self, job_id, priority):
125
    return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
126

    
127
  def AutoArchiveJobs(self, age):
128
    timeout = (DEF_RWTO - 1) / 2
129
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
130

    
131
  def WaitForJobChangeOnce(self, job_id, fields,
132
                           prev_job_info, prev_log_serial,
133
                           timeout=WFJC_TIMEOUT):
134
    """Waits for changes on a job.
135

136
    @param job_id: Job ID
137
    @type fields: list
138
    @param fields: List of field names to be observed
139
    @type prev_job_info: None or list
140
    @param prev_job_info: Previously received job information
141
    @type prev_log_serial: None or int/long
142
    @param prev_log_serial: Highest log serial number previously received
143
    @type timeout: int/float
144
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
145
                    be capped to that value)
146

147
    """
148
    assert timeout >= 0, "Timeout can not be negative"
149
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
150
                           (job_id, fields, prev_job_info,
151
                            prev_log_serial,
152
                            min(WFJC_TIMEOUT, timeout)))
153

    
154
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
155
    while True:
156
      result = self.WaitForJobChangeOnce(job_id, fields,
157
                                         prev_job_info, prev_log_serial)
158
      if result != constants.JOB_NOTCHANGED:
159
        break
160
    return result
161

    
162
  def Query(self, what, fields, qfilter):
163
    """Query for resources/items.
164

165
    @param what: One of L{constants.QR_VIA_LUXI}
166
    @type fields: List of strings
167
    @param fields: List of requested fields
168
    @type qfilter: None or list
169
    @param qfilter: Query filter
170
    @rtype: L{objects.QueryResponse}
171

172
    """
173
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
174
    return objects.QueryResponse.FromDict(result)
175

    
176
  def QueryFields(self, what, fields):
177
    """Query for available fields.
178

179
    @param what: One of L{constants.QR_VIA_LUXI}
180
    @type fields: None or list of strings
181
    @param fields: List of requested fields
182
    @rtype: L{objects.QueryFieldsResponse}
183

184
    """
185
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
186
    return objects.QueryFieldsResponse.FromDict(result)
187

    
188
  def QueryJobs(self, job_ids, fields):
189
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
190

    
191
  def QueryInstances(self, names, fields, use_locking):
192
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
193

    
194
  def QueryNodes(self, names, fields, use_locking):
195
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
196

    
197
  def QueryGroups(self, names, fields, use_locking):
198
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
199

    
200
  def QueryNetworks(self, names, fields, use_locking):
201
    return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
202

    
203
  def QueryExports(self, nodes, use_locking):
204
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
205

    
206
  def QueryClusterInfo(self):
207
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
208

    
209
  def QueryConfigValues(self, fields):
210
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
211

    
212
  def QueryTags(self, kind, name):
213
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))