Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 24d16f76

History | View | Annotate | Download (15.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37
import warnings
38

    
39
from ganeti import serializer
40
from ganeti import constants
41
from ganeti import errors
42
from ganeti import utils
43
from ganeti import objects
44

    
45

    
46
KEY_METHOD = "method"
47
KEY_ARGS = "args"
48
KEY_SUCCESS = "success"
49
KEY_RESULT = "result"
50
KEY_VERSION = "version"
51

    
52
REQ_SUBMIT_JOB = "SubmitJob"
53
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
54
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
55
REQ_CANCEL_JOB = "CancelJob"
56
REQ_ARCHIVE_JOB = "ArchiveJob"
57
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
58
REQ_QUERY = "Query"
59
REQ_QUERY_FIELDS = "QueryFields"
60
REQ_QUERY_JOBS = "QueryJobs"
61
REQ_QUERY_INSTANCES = "QueryInstances"
62
REQ_QUERY_NODES = "QueryNodes"
63
REQ_QUERY_GROUPS = "QueryGroups"
64
REQ_QUERY_EXPORTS = "QueryExports"
65
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
66
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
67
REQ_QUERY_TAGS = "QueryTags"
68
REQ_QUERY_LOCKS = "QueryLocks"
69
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
70
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
71

    
72
DEF_CTMO = 10
73
DEF_RWTO = 60
74

    
75
# WaitForJobChange timeout
76
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
77

    
78

    
79
class ProtocolError(errors.LuxiError):
80
  """Denotes an error in the LUXI protocol."""
81

    
82

    
83
class ConnectionClosedError(ProtocolError):
84
  """Connection closed error."""
85

    
86

    
87
class TimeoutError(ProtocolError):
88
  """Operation timeout error."""
89

    
90

    
91
class RequestError(ProtocolError):
92
  """Error on request.
93

94
  This signifies an error in the request format or request handling,
95
  but not (e.g.) an error in starting up an instance.
96

97
  Some common conditions that can trigger this exception:
98
    - job submission failed because the job data was wrong
99
    - query failed because required fields were missing
100

101
  """
102

    
103

    
104
class NoMasterError(ProtocolError):
105
  """The master cannot be reached.
106

107
  This means that the master daemon is not running or the socket has
108
  been removed.
109

110
  """
111

    
112

    
113
class PermissionError(ProtocolError):
114
  """Permission denied while connecting to the master socket.
115

116
  This means the user doesn't have the proper rights.
117

118
  """
119

    
120

    
121
class Transport:
122
  """Low-level transport class.
123

124
  This is used on the client side.
125

126
  This could be replace by any other class that provides the same
127
  semantics to the Client. This means:
128
    - can send messages and receive messages
129
    - safe for multithreading
130

131
  """
132

    
133
  def __init__(self, address, timeouts=None):
134
    """Constructor for the Client class.
135

136
    Arguments:
137
      - address: a valid address the the used transport class
138
      - timeout: a list of timeouts, to be used on connect and read/write
139

140
    There are two timeouts used since we might want to wait for a long
141
    time for a response, but the connect timeout should be lower.
142

143
    If not passed, we use a default of 10 and respectively 60 seconds.
144

145
    Note that on reading data, since the timeout applies to an
146
    invidual receive, it might be that the total duration is longer
147
    than timeout value passed (we make a hard limit at twice the read
148
    timeout).
149

150
    """
151
    self.address = address
152
    if timeouts is None:
153
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
154
    else:
155
      self._ctimeout, self._rwtimeout = timeouts
156

    
157
    self.socket = None
158
    self._buffer = ""
159
    self._msgs = collections.deque()
160

    
161
    try:
162
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
163

    
164
      # Try to connect
165
      try:
166
        utils.Retry(self._Connect, 1.0, self._ctimeout,
167
                    args=(self.socket, address, self._ctimeout))
168
      except utils.RetryTimeout:
169
        raise TimeoutError("Connect timed out")
170

    
171
      self.socket.settimeout(self._rwtimeout)
172
    except (socket.error, NoMasterError):
173
      if self.socket is not None:
174
        self.socket.close()
175
      self.socket = None
176
      raise
177

    
178
  @staticmethod
179
  def _Connect(sock, address, timeout):
180
    sock.settimeout(timeout)
181
    try:
182
      sock.connect(address)
183
    except socket.timeout, err:
184
      raise TimeoutError("Connect timed out: %s" % str(err))
185
    except socket.error, err:
186
      error_code = err.args[0]
187
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
188
        raise NoMasterError(address)
189
      elif error_code in (errno.EPERM, errno.EACCES):
190
        raise PermissionError(address)
191
      elif error_code == errno.EAGAIN:
192
        # Server's socket backlog is full at the moment
193
        raise utils.RetryAgain()
194
      raise
195

    
196
  def _CheckSocket(self):
197
    """Make sure we are connected.
198

199
    """
200
    if self.socket is None:
201
      raise ProtocolError("Connection is closed")
202

    
203
  def Send(self, msg):
204
    """Send a message.
205

206
    This just sends a message and doesn't wait for the response.
207

208
    """
209
    if constants.LUXI_EOM in msg:
210
      raise ProtocolError("Message terminator found in payload")
211

    
212
    self._CheckSocket()
213
    try:
214
      # TODO: sendall is not guaranteed to send everything
215
      self.socket.sendall(msg + constants.LUXI_EOM)
216
    except socket.timeout, err:
217
      raise TimeoutError("Sending timeout: %s" % str(err))
218

    
219
  def Recv(self):
220
    """Try to receive a message from the socket.
221

222
    In case we already have messages queued, we just return from the
223
    queue. Otherwise, we try to read data with a _rwtimeout network
224
    timeout, and making sure we don't go over 2x_rwtimeout as a global
225
    limit.
226

227
    """
228
    self._CheckSocket()
229
    etime = time.time() + self._rwtimeout
230
    while not self._msgs:
231
      if time.time() > etime:
232
        raise TimeoutError("Extended receive timeout")
233
      while True:
234
        try:
235
          data = self.socket.recv(4096)
236
        except socket.timeout, err:
237
          raise TimeoutError("Receive timeout: %s" % str(err))
238
        except socket.error, err:
239
          if err.args and err.args[0] == errno.EAGAIN:
240
            continue
241
          raise
242
        break
243
      if not data:
244
        raise ConnectionClosedError("Connection closed while reading")
245
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
246
      self._buffer = new_msgs.pop()
247
      self._msgs.extend(new_msgs)
248
    return self._msgs.popleft()
249

    
250
  def Call(self, msg):
251
    """Send a message and wait for the response.
252

253
    This is just a wrapper over Send and Recv.
254

255
    """
256
    self.Send(msg)
257
    return self.Recv()
258

    
259
  def Close(self):
260
    """Close the socket"""
261
    if self.socket is not None:
262
      self.socket.close()
263
      self.socket = None
264

    
265

    
266
def ParseRequest(msg):
267
  """Parses a LUXI request message.
268

269
  """
270
  try:
271
    request = serializer.LoadJson(msg)
272
  except ValueError, err:
273
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
274

    
275
  logging.debug("LUXI request: %s", request)
276

    
277
  if not isinstance(request, dict):
278
    logging.error("LUXI request not a dict: %r", msg)
279
    raise ProtocolError("Invalid LUXI request (not a dict)")
280

    
281
  method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
282
  args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
283
  version = request.get(KEY_VERSION, None) # pylint: disable-msg=E1103
284

    
285
  if method is None or args is None:
286
    logging.error("LUXI request missing method or arguments: %r", msg)
287
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
288
                         " in request): %r") % msg)
289

    
290
  return (method, args, version)
291

    
292

    
293
def ParseResponse(msg):
294
  """Parses a LUXI response message.
295

296
  """
297
  # Parse the result
298
  try:
299
    data = serializer.LoadJson(msg)
300
  except Exception, err:
301
    raise ProtocolError("Error while deserializing response: %s" % str(err))
302

    
303
  # Validate response
304
  if not (isinstance(data, dict) and
305
          KEY_SUCCESS in data and
306
          KEY_RESULT in data):
307
    raise ProtocolError("Invalid response from server: %r" % data)
308

    
309
  return (data[KEY_SUCCESS], data[KEY_RESULT],
310
          data.get(KEY_VERSION, None)) # pylint: disable-msg=E1103
311

    
312

    
313
def FormatResponse(success, result, version=None):
314
  """Formats a LUXI response message.
315

316
  """
317
  response = {
318
    KEY_SUCCESS: success,
319
    KEY_RESULT: result,
320
    }
321

    
322
  if version is not None:
323
    response[KEY_VERSION] = version
324

    
325
  logging.debug("LUXI response: %s", response)
326

    
327
  return serializer.DumpJson(response)
328

    
329

    
330
def FormatRequest(method, args, version=None):
331
  """Formats a LUXI request message.
332

333
  """
334
  # Build request
335
  request = {
336
    KEY_METHOD: method,
337
    KEY_ARGS: args,
338
    }
339

    
340
  if version is not None:
341
    request[KEY_VERSION] = version
342

    
343
  # Serialize the request
344
  return serializer.DumpJson(request, indent=False)
345

    
346

    
347
def CallLuxiMethod(transport_cb, method, args, version=None):
348
  """Send a LUXI request via a transport and return the response.
349

350
  """
351
  assert callable(transport_cb)
352

    
353
  request_msg = FormatRequest(method, args, version=version)
354

    
355
  # Send request and wait for response
356
  response_msg = transport_cb(request_msg)
357

    
358
  (success, result, resp_version) = ParseResponse(response_msg)
359

    
360
  # Verify version if there was one in the response
361
  if resp_version is not None and resp_version != version:
362
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
363
                           (version, resp_version))
364

    
365
  if success:
366
    return result
367

    
368
  errors.MaybeRaise(result)
369
  raise RequestError(result)
370

    
371

    
372
class Client(object):
373
  """High-level client implementation.
374

375
  This uses a backing Transport-like class on top of which it
376
  implements data serialization/deserialization.
377

378
  """
379
  def __init__(self, address=None, timeouts=None, transport=Transport):
380
    """Constructor for the Client class.
381

382
    Arguments:
383
      - address: a valid address the the used transport class
384
      - timeout: a list of timeouts, to be used on connect and read/write
385
      - transport: a Transport-like class
386

387

388
    If timeout is not passed, the default timeouts of the transport
389
    class are used.
390

391
    """
392
    if address is None:
393
      address = constants.MASTER_SOCKET
394
    self.address = address
395
    self.timeouts = timeouts
396
    self.transport_class = transport
397
    self.transport = None
398
    self._InitTransport()
399

    
400
  def _InitTransport(self):
401
    """(Re)initialize the transport if needed.
402

403
    """
404
    if self.transport is None:
405
      self.transport = self.transport_class(self.address,
406
                                            timeouts=self.timeouts)
407

    
408
  def _CloseTransport(self):
409
    """Close the transport, ignoring errors.
410

411
    """
412
    if self.transport is None:
413
      return
414
    try:
415
      old_transp = self.transport
416
      self.transport = None
417
      old_transp.Close()
418
    except Exception: # pylint: disable-msg=W0703
419
      pass
420

    
421
  def _SendMethodCall(self, data):
422
    # Send request and wait for response
423
    try:
424
      self._InitTransport()
425
      return self.transport.Call(data)
426
    except Exception:
427
      self._CloseTransport()
428
      raise
429

    
430
  def CallMethod(self, method, args):
431
    """Send a generic request and return the response.
432

433
    """
434
    return CallLuxiMethod(self._SendMethodCall, method, args,
435
                          version=constants.LUXI_VERSION)
436

    
437
  def SetQueueDrainFlag(self, drain_flag):
438
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
439

    
440
  def SetWatcherPause(self, until):
441
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
442

    
443
  def SubmitJob(self, ops):
444
    ops_state = map(lambda op: op.__getstate__(), ops)
445
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
446

    
447
  def SubmitManyJobs(self, jobs):
448
    jobs_state = []
449
    for ops in jobs:
450
      jobs_state.append([op.__getstate__() for op in ops])
451
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
452

    
453
  def CancelJob(self, job_id):
454
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
455

    
456
  def ArchiveJob(self, job_id):
457
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
458

    
459
  def AutoArchiveJobs(self, age):
460
    timeout = (DEF_RWTO - 1) / 2
461
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
462

    
463
  def WaitForJobChangeOnce(self, job_id, fields,
464
                           prev_job_info, prev_log_serial,
465
                           timeout=WFJC_TIMEOUT):
466
    """Waits for changes on a job.
467

468
    @param job_id: Job ID
469
    @type fields: list
470
    @param fields: List of field names to be observed
471
    @type prev_job_info: None or list
472
    @param prev_job_info: Previously received job information
473
    @type prev_log_serial: None or int/long
474
    @param prev_log_serial: Highest log serial number previously received
475
    @type timeout: int/float
476
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
477
                    be capped to that value)
478

479
    """
480
    assert timeout >= 0, "Timeout can not be negative"
481
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
482
                           (job_id, fields, prev_job_info,
483
                            prev_log_serial,
484
                            min(WFJC_TIMEOUT, timeout)))
485

    
486
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
487
    while True:
488
      result = self.WaitForJobChangeOnce(job_id, fields,
489
                                         prev_job_info, prev_log_serial)
490
      if result != constants.JOB_NOTCHANGED:
491
        break
492
    return result
493

    
494
  def Query(self, what, fields, filter_):
495
    """Query for resources/items.
496

497
    @param what: One of L{constants.QR_OP_LUXI}
498
    @type fields: List of strings
499
    @param fields: List of requested fields
500
    @type filter_: None or list
501
    @param filter_: Query filter
502
    @rtype: L{objects.QueryResponse}
503

504
    """
505
    req = objects.QueryRequest(what=what, fields=fields, filter=filter_)
506
    result = self.CallMethod(REQ_QUERY, req.ToDict())
507
    return objects.QueryResponse.FromDict(result)
508

    
509
  def QueryFields(self, what, fields):
510
    """Query for available fields.
511

512
    @param what: One of L{constants.QR_OP_LUXI}
513
    @type fields: None or list of strings
514
    @param fields: List of requested fields
515
    @rtype: L{objects.QueryFieldsResponse}
516

517
    """
518
    req = objects.QueryFieldsRequest(what=what, fields=fields)
519
    result = self.CallMethod(REQ_QUERY_FIELDS, req.ToDict())
520
    return objects.QueryFieldsResponse.FromDict(result)
521

    
522
  def QueryJobs(self, job_ids, fields):
523
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
524

    
525
  def QueryInstances(self, names, fields, use_locking):
526
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
527

    
528
  def QueryNodes(self, names, fields, use_locking):
529
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
530

    
531
  def QueryGroups(self, names, fields, use_locking):
532
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
533

    
534
  def QueryExports(self, nodes, use_locking):
535
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
536

    
537
  def QueryClusterInfo(self):
538
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
539

    
540
  def QueryConfigValues(self, fields):
541
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
542

    
543
  def QueryTags(self, kind, name):
544
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
545

    
546
  def QueryLocks(self, fields, sync):
547
    warnings.warn("This LUXI call is deprecated and will be removed, use"
548
                  " Query(\"%s\", ...) instead" % constants.QR_LOCK)
549
    return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))