Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ ff1012ef

History | View | Annotate | Download (12.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the LUXI protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import logging
33

    
34
from ganeti import serializer
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import objects
38
from ganeti import pathutils
39
from ganeti.rpc import transport as t
40
from ganeti.rpc.errors import (ProtocolError, ConnectionClosedError,
41
                               TimeoutError, RequestError, NoMasterError,
42
                               PermissionError)
43

    
44
__all__ = [
45
  # functions:
46
  "ProtocolError",
47
  "ConnectionClosedError",
48
  "TimeoutError",
49
  "RequestError",
50
  "NoMasterError",
51
  "PermissionError",
52
  "ParseRequest",
53
  "ParseResponse",
54
  "FormatRequest",
55
  "FormatResponse",
56
  "CallLuxiMethod",
57
  # classes:
58
  "Client"
59
  ]
60

    
61

    
62
KEY_METHOD = constants.LUXI_KEY_METHOD
63
KEY_ARGS = constants.LUXI_KEY_ARGS
64
KEY_SUCCESS = constants.LUXI_KEY_SUCCESS
65
KEY_RESULT = constants.LUXI_KEY_RESULT
66
KEY_VERSION = constants.LUXI_KEY_VERSION
67

    
68
REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB
69
REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE
70
REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS
71
REQ_PICKUP_JOB = constants.LUXI_REQ_PICKUP_JOB
72
REQ_WAIT_FOR_JOB_CHANGE = constants.LUXI_REQ_WAIT_FOR_JOB_CHANGE
73
REQ_CANCEL_JOB = constants.LUXI_REQ_CANCEL_JOB
74
REQ_ARCHIVE_JOB = constants.LUXI_REQ_ARCHIVE_JOB
75
REQ_CHANGE_JOB_PRIORITY = constants.LUXI_REQ_CHANGE_JOB_PRIORITY
76
REQ_AUTO_ARCHIVE_JOBS = constants.LUXI_REQ_AUTO_ARCHIVE_JOBS
77
REQ_QUERY = constants.LUXI_REQ_QUERY
78
REQ_QUERY_FIELDS = constants.LUXI_REQ_QUERY_FIELDS
79
REQ_QUERY_JOBS = constants.LUXI_REQ_QUERY_JOBS
80
REQ_QUERY_INSTANCES = constants.LUXI_REQ_QUERY_INSTANCES
81
REQ_QUERY_NODES = constants.LUXI_REQ_QUERY_NODES
82
REQ_QUERY_GROUPS = constants.LUXI_REQ_QUERY_GROUPS
83
REQ_QUERY_NETWORKS = constants.LUXI_REQ_QUERY_NETWORKS
84
REQ_QUERY_EXPORTS = constants.LUXI_REQ_QUERY_EXPORTS
85
REQ_QUERY_CONFIG_VALUES = constants.LUXI_REQ_QUERY_CONFIG_VALUES
86
REQ_QUERY_CLUSTER_INFO = constants.LUXI_REQ_QUERY_CLUSTER_INFO
87
REQ_QUERY_TAGS = constants.LUXI_REQ_QUERY_TAGS
88
REQ_SET_DRAIN_FLAG = constants.LUXI_REQ_SET_DRAIN_FLAG
89
REQ_SET_WATCHER_PAUSE = constants.LUXI_REQ_SET_WATCHER_PAUSE
90
REQ_ALL = constants.LUXI_REQ_ALL
91

    
92
DEF_RWTO = constants.LUXI_DEF_RWTO
93
WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT
94

    
95

    
96
def ParseRequest(msg):
97
  """Parses a LUXI request message.
98

99
  """
100
  try:
101
    request = serializer.LoadJson(msg)
102
  except ValueError, err:
103
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
104

    
105
  logging.debug("LUXI request: %s", request)
106

    
107
  if not isinstance(request, dict):
108
    logging.error("LUXI request not a dict: %r", msg)
109
    raise ProtocolError("Invalid LUXI request (not a dict)")
110

    
111
  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
112
  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
113
  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
114

    
115
  if method is None or args is None:
116
    logging.error("LUXI request missing method or arguments: %r", msg)
117
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
118
                         " in request): %r") % msg)
119

    
120
  return (method, args, version)
121

    
122

    
123
def ParseResponse(msg):
124
  """Parses a LUXI response message.
125

126
  """
127
  # Parse the result
128
  try:
129
    data = serializer.LoadJson(msg)
130
  except KeyboardInterrupt:
131
    raise
132
  except Exception, err:
133
    raise ProtocolError("Error while deserializing response: %s" % str(err))
134

    
135
  # Validate response
136
  if not (isinstance(data, dict) and
137
          KEY_SUCCESS in data and
138
          KEY_RESULT in data):
139
    raise ProtocolError("Invalid response from server: %r" % data)
140

    
141
  return (data[KEY_SUCCESS], data[KEY_RESULT],
142
          data.get(KEY_VERSION, None)) # pylint: disable=E1103
143

    
144

    
145
def FormatResponse(success, result, version=None):
146
  """Formats a LUXI response message.
147

148
  """
149
  response = {
150
    KEY_SUCCESS: success,
151
    KEY_RESULT: result,
152
    }
153

    
154
  if version is not None:
155
    response[KEY_VERSION] = version
156

    
157
  logging.debug("LUXI response: %s", response)
158

    
159
  return serializer.DumpJson(response)
160

    
161

    
162
def FormatRequest(method, args, version=None):
163
  """Formats a LUXI request message.
164

165
  """
166
  # Build request
167
  request = {
168
    KEY_METHOD: method,
169
    KEY_ARGS: args,
170
    }
171

    
172
  if version is not None:
173
    request[KEY_VERSION] = version
174

    
175
  # Serialize the request
176
  return serializer.DumpJson(request)
177

    
178

    
179
def CallLuxiMethod(transport_cb, method, args, version=None):
180
  """Send a LUXI request via a transport and return the response.
181

182
  """
183
  assert callable(transport_cb)
184

    
185
  request_msg = FormatRequest(method, args, version=version)
186

    
187
  # Send request and wait for response
188
  response_msg = transport_cb(request_msg)
189

    
190
  (success, result, resp_version) = ParseResponse(response_msg)
191

    
192
  # Verify version if there was one in the response
193
  if resp_version is not None and resp_version != version:
194
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
195
                           (version, resp_version))
196

    
197
  if success:
198
    return result
199

    
200
  errors.MaybeRaise(result)
201
  raise RequestError(result)
202

    
203

    
204
class Client(object):
205
  """High-level client implementation.
206

207
  This uses a backing Transport-like class on top of which it
208
  implements data serialization/deserialization.
209

210
  """
211
  def __init__(self, address=None, timeouts=None, transport=t.Transport):
212
    """Constructor for the Client class.
213

214
    Arguments:
215
      - address: a valid address the the used transport class
216
      - timeout: a list of timeouts, to be used on connect and read/write
217
      - transport: a Transport-like class
218

219

220
    If timeout is not passed, the default timeouts of the transport
221
    class are used.
222

223
    """
224
    if address is None:
225
      address = pathutils.MASTER_SOCKET
226
    self.address = address
227
    self.timeouts = timeouts
228
    self.transport_class = transport
229
    self.transport = None
230
    self._InitTransport()
231

    
232
  def _InitTransport(self):
233
    """(Re)initialize the transport if needed.
234

235
    """
236
    if self.transport is None:
237
      self.transport = self.transport_class(self.address,
238
                                            timeouts=self.timeouts)
239

    
240
  def _CloseTransport(self):
241
    """Close the transport, ignoring errors.
242

243
    """
244
    if self.transport is None:
245
      return
246
    try:
247
      old_transp = self.transport
248
      self.transport = None
249
      old_transp.Close()
250
    except Exception: # pylint: disable=W0703
251
      pass
252

    
253
  def _SendMethodCall(self, data):
254
    # Send request and wait for response
255
    try:
256
      self._InitTransport()
257
      return self.transport.Call(data)
258
    except Exception:
259
      self._CloseTransport()
260
      raise
261

    
262
  def Close(self):
263
    """Close the underlying connection.
264

265
    """
266
    self._CloseTransport()
267

    
268
  def CallMethod(self, method, args):
269
    """Send a generic request and return the response.
270

271
    """
272
    if not isinstance(args, (list, tuple)):
273
      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
274
                                   " expected list, got %s" % type(args))
275
    return CallLuxiMethod(self._SendMethodCall, method, args,
276
                          version=constants.LUXI_VERSION)
277

    
278
  def SetQueueDrainFlag(self, drain_flag):
279
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
280

    
281
  def SetWatcherPause(self, until):
282
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
283

    
284
  def PickupJob(self, job):
285
    return self.CallMethod(REQ_PICKUP_JOB, (job,))
286

    
287
  def SubmitJob(self, ops):
288
    ops_state = map(lambda op: op.__getstate__(), ops)
289
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
290

    
291
  def SubmitJobToDrainedQueue(self, ops):
292
    ops_state = map(lambda op: op.__getstate__(), ops)
293
    return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
294

    
295
  def SubmitManyJobs(self, jobs):
296
    jobs_state = []
297
    for ops in jobs:
298
      jobs_state.append([op.__getstate__() for op in ops])
299
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
300

    
301
  def CancelJob(self, job_id):
302
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
303

    
304
  def ArchiveJob(self, job_id):
305
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
306

    
307
  def ChangeJobPriority(self, job_id, priority):
308
    return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
309

    
310
  def AutoArchiveJobs(self, age):
311
    timeout = (DEF_RWTO - 1) / 2
312
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
313

    
314
  def WaitForJobChangeOnce(self, job_id, fields,
315
                           prev_job_info, prev_log_serial,
316
                           timeout=WFJC_TIMEOUT):
317
    """Waits for changes on a job.
318

319
    @param job_id: Job ID
320
    @type fields: list
321
    @param fields: List of field names to be observed
322
    @type prev_job_info: None or list
323
    @param prev_job_info: Previously received job information
324
    @type prev_log_serial: None or int/long
325
    @param prev_log_serial: Highest log serial number previously received
326
    @type timeout: int/float
327
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
328
                    be capped to that value)
329

330
    """
331
    assert timeout >= 0, "Timeout can not be negative"
332
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
333
                           (job_id, fields, prev_job_info,
334
                            prev_log_serial,
335
                            min(WFJC_TIMEOUT, timeout)))
336

    
337
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
338
    while True:
339
      result = self.WaitForJobChangeOnce(job_id, fields,
340
                                         prev_job_info, prev_log_serial)
341
      if result != constants.JOB_NOTCHANGED:
342
        break
343
    return result
344

    
345
  def Query(self, what, fields, qfilter):
346
    """Query for resources/items.
347

348
    @param what: One of L{constants.QR_VIA_LUXI}
349
    @type fields: List of strings
350
    @param fields: List of requested fields
351
    @type qfilter: None or list
352
    @param qfilter: Query filter
353
    @rtype: L{objects.QueryResponse}
354

355
    """
356
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
357
    return objects.QueryResponse.FromDict(result)
358

    
359
  def QueryFields(self, what, fields):
360
    """Query for available fields.
361

362
    @param what: One of L{constants.QR_VIA_LUXI}
363
    @type fields: None or list of strings
364
    @param fields: List of requested fields
365
    @rtype: L{objects.QueryFieldsResponse}
366

367
    """
368
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
369
    return objects.QueryFieldsResponse.FromDict(result)
370

    
371
  def QueryJobs(self, job_ids, fields):
372
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
373

    
374
  def QueryInstances(self, names, fields, use_locking):
375
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
376

    
377
  def QueryNodes(self, names, fields, use_locking):
378
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
379

    
380
  def QueryGroups(self, names, fields, use_locking):
381
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
382

    
383
  def QueryNetworks(self, names, fields, use_locking):
384
    return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
385

    
386
  def QueryExports(self, nodes, use_locking):
387
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
388

    
389
  def QueryClusterInfo(self):
390
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
391

    
392
  def QueryConfigValues(self, fields):
393
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
394

    
395
  def QueryTags(self, kind, name):
396
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))