Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 899c4d2c

History | View | Annotate | Download (14.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import serializer
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import utils
42

    
43

    
44
KEY_METHOD = "method"
45
KEY_ARGS = "args"
46
KEY_SUCCESS = "success"
47
KEY_RESULT = "result"
48
KEY_VERSION = "version"
49

    
50
REQ_SUBMIT_JOB = "SubmitJob"
51
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
52
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
53
REQ_CANCEL_JOB = "CancelJob"
54
REQ_ARCHIVE_JOB = "ArchiveJob"
55
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
56
REQ_QUERY_JOBS = "QueryJobs"
57
REQ_QUERY_INSTANCES = "QueryInstances"
58
REQ_QUERY_NODES = "QueryNodes"
59
REQ_QUERY_EXPORTS = "QueryExports"
60
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
61
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
62
REQ_QUERY_TAGS = "QueryTags"
63
REQ_QUERY_LOCKS = "QueryLocks"
64
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
65
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
66

    
67
DEF_CTMO = 10
68
DEF_RWTO = 60
69

    
70
# WaitForJobChange timeout
71
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
72

    
73

    
74
class ProtocolError(errors.LuxiError):
75
  """Denotes an error in the LUXI protocol."""
76

    
77

    
78
class ConnectionClosedError(ProtocolError):
79
  """Connection closed error."""
80

    
81

    
82
class TimeoutError(ProtocolError):
83
  """Operation timeout error."""
84

    
85

    
86
class RequestError(ProtocolError):
87
  """Error on request.
88

89
  This signifies an error in the request format or request handling,
90
  but not (e.g.) an error in starting up an instance.
91

92
  Some common conditions that can trigger this exception:
93
    - job submission failed because the job data was wrong
94
    - query failed because required fields were missing
95

96
  """
97

    
98

    
99
class NoMasterError(ProtocolError):
100
  """The master cannot be reached.
101

102
  This means that the master daemon is not running or the socket has
103
  been removed.
104

105
  """
106

    
107

    
108
class PermissionError(ProtocolError):
109
  """Permission denied while connecting to the master socket.
110

111
  This means the user doesn't have the proper rights.
112

113
  """
114

    
115

    
116
class Transport:
117
  """Low-level transport class.
118

119
  This is used on the client side.
120

121
  This could be replace by any other class that provides the same
122
  semantics to the Client. This means:
123
    - can send messages and receive messages
124
    - safe for multithreading
125

126
  """
127

    
128
  def __init__(self, address, timeouts=None):
129
    """Constructor for the Client class.
130

131
    Arguments:
132
      - address: a valid address the the used transport class
133
      - timeout: a list of timeouts, to be used on connect and read/write
134

135
    There are two timeouts used since we might want to wait for a long
136
    time for a response, but the connect timeout should be lower.
137

138
    If not passed, we use a default of 10 and respectively 60 seconds.
139

140
    Note that on reading data, since the timeout applies to an
141
    invidual receive, it might be that the total duration is longer
142
    than timeout value passed (we make a hard limit at twice the read
143
    timeout).
144

145
    """
146
    self.address = address
147
    if timeouts is None:
148
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
149
    else:
150
      self._ctimeout, self._rwtimeout = timeouts
151

    
152
    self.socket = None
153
    self._buffer = ""
154
    self._msgs = collections.deque()
155

    
156
    try:
157
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
158

    
159
      # Try to connect
160
      try:
161
        utils.Retry(self._Connect, 1.0, self._ctimeout,
162
                    args=(self.socket, address, self._ctimeout))
163
      except utils.RetryTimeout:
164
        raise TimeoutError("Connect timed out")
165

    
166
      self.socket.settimeout(self._rwtimeout)
167
    except (socket.error, NoMasterError):
168
      if self.socket is not None:
169
        self.socket.close()
170
      self.socket = None
171
      raise
172

    
173
  @staticmethod
174
  def _Connect(sock, address, timeout):
175
    sock.settimeout(timeout)
176
    try:
177
      sock.connect(address)
178
    except socket.timeout, err:
179
      raise TimeoutError("Connect timed out: %s" % str(err))
180
    except socket.error, err:
181
      error_code = err.args[0]
182
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
183
        raise NoMasterError(address)
184
      elif error_code in (errno.EPERM, errno.EACCES):
185
        raise PermissionError(address)
186
      elif error_code == errno.EAGAIN:
187
        # Server's socket backlog is full at the moment
188
        raise utils.RetryAgain()
189
      raise
190

    
191
  def _CheckSocket(self):
192
    """Make sure we are connected.
193

194
    """
195
    if self.socket is None:
196
      raise ProtocolError("Connection is closed")
197

    
198
  def Send(self, msg):
199
    """Send a message.
200

201
    This just sends a message and doesn't wait for the response.
202

203
    """
204
    if constants.LUXI_EOM in msg:
205
      raise ProtocolError("Message terminator found in payload")
206

    
207
    self._CheckSocket()
208
    try:
209
      # TODO: sendall is not guaranteed to send everything
210
      self.socket.sendall(msg + constants.LUXI_EOM)
211
    except socket.timeout, err:
212
      raise TimeoutError("Sending timeout: %s" % str(err))
213

    
214
  def Recv(self):
215
    """Try to receive a message from the socket.
216

217
    In case we already have messages queued, we just return from the
218
    queue. Otherwise, we try to read data with a _rwtimeout network
219
    timeout, and making sure we don't go over 2x_rwtimeout as a global
220
    limit.
221

222
    """
223
    self._CheckSocket()
224
    etime = time.time() + self._rwtimeout
225
    while not self._msgs:
226
      if time.time() > etime:
227
        raise TimeoutError("Extended receive timeout")
228
      while True:
229
        try:
230
          data = self.socket.recv(4096)
231
        except socket.error, err:
232
          if err.args and err.args[0] == errno.EAGAIN:
233
            continue
234
          raise
235
        except socket.timeout, err:
236
          raise TimeoutError("Receive timeout: %s" % str(err))
237
        break
238
      if not data:
239
        raise ConnectionClosedError("Connection closed while reading")
240
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
241
      self._buffer = new_msgs.pop()
242
      self._msgs.extend(new_msgs)
243
    return self._msgs.popleft()
244

    
245
  def Call(self, msg):
246
    """Send a message and wait for the response.
247

248
    This is just a wrapper over Send and Recv.
249

250
    """
251
    self.Send(msg)
252
    return self.Recv()
253

    
254
  def Close(self):
255
    """Close the socket"""
256
    if self.socket is not None:
257
      self.socket.close()
258
      self.socket = None
259

    
260

    
261
def ParseRequest(msg):
262
  """Parses a LUXI request message.
263

264
  """
265
  try:
266
    request = serializer.LoadJson(msg)
267
  except ValueError, err:
268
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
269

    
270
  logging.debug("LUXI request: %s", request)
271

    
272
  if not isinstance(request, dict):
273
    logging.error("LUXI request not a dict: %r", msg)
274
    raise ProtocolError("Invalid LUXI request (not a dict)")
275

    
276
  method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
277
  args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
278
  version = request.get(KEY_VERSION, None)
279

    
280
  if method is None or args is None:
281
    logging.error("LUXI request missing method or arguments: %r", msg)
282
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
283
                         " in request): %r") % msg)
284

    
285
  return (method, args, version)
286

    
287

    
288
def ParseResponse(msg):
289
  """Parses a LUXI response message.
290

291
  """
292
  # Parse the result
293
  try:
294
    data = serializer.LoadJson(msg)
295
  except Exception, err:
296
    raise ProtocolError("Error while deserializing response: %s" % str(err))
297

    
298
  # Validate response
299
  if not (isinstance(data, dict) and
300
          KEY_SUCCESS in data and
301
          KEY_RESULT in data):
302
    raise ProtocolError("Invalid response from server: %r" % data)
303

    
304
  return (data[KEY_SUCCESS], data[KEY_RESULT], data.get(KEY_VERSION, None))
305

    
306

    
307
def FormatResponse(success, result, version=None):
308
  """Formats a LUXI response message.
309

310
  """
311
  response = {
312
    KEY_SUCCESS: success,
313
    KEY_RESULT: result,
314
    }
315

    
316
  if version is not None:
317
    response[KEY_VERSION] = version
318

    
319
  logging.debug("LUXI response: %s", response)
320

    
321
  return serializer.DumpJson(response)
322

    
323

    
324
def FormatRequest(method, args, version=None):
325
  """Formats a LUXI request message.
326

327
  """
328
  # Build request
329
  request = {
330
    KEY_METHOD: method,
331
    KEY_ARGS: args,
332
    }
333

    
334
  if version is not None:
335
    request[KEY_VERSION] = version
336

    
337
  # Serialize the request
338
  return serializer.DumpJson(request, indent=False)
339

    
340

    
341
def CallLuxiMethod(transport_cb, method, args, version=None):
342
  """Send a LUXI request via a transport and return the response.
343

344
  """
345
  assert callable(transport_cb)
346

    
347
  request_msg = FormatRequest(method, args, version=version)
348

    
349
  # Send request and wait for response
350
  response_msg = transport_cb(request_msg)
351

    
352
  (success, result, resp_version) = ParseResponse(response_msg)
353

    
354
  # Verify version if there was one in the response
355
  if resp_version is not None and resp_version != version:
356
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
357
                           (version, resp_version))
358

    
359
  if success:
360
    return result
361

    
362
  errors.MaybeRaise(result)
363
  raise RequestError(result)
364

    
365

    
366
class Client(object):
367
  """High-level client implementation.
368

369
  This uses a backing Transport-like class on top of which it
370
  implements data serialization/deserialization.
371

372
  """
373
  def __init__(self, address=None, timeouts=None, transport=Transport):
374
    """Constructor for the Client class.
375

376
    Arguments:
377
      - address: a valid address the the used transport class
378
      - timeout: a list of timeouts, to be used on connect and read/write
379
      - transport: a Transport-like class
380

381

382
    If timeout is not passed, the default timeouts of the transport
383
    class are used.
384

385
    """
386
    if address is None:
387
      address = constants.MASTER_SOCKET
388
    self.address = address
389
    self.timeouts = timeouts
390
    self.transport_class = transport
391
    self.transport = None
392
    self._InitTransport()
393

    
394
  def _InitTransport(self):
395
    """(Re)initialize the transport if needed.
396

397
    """
398
    if self.transport is None:
399
      self.transport = self.transport_class(self.address,
400
                                            timeouts=self.timeouts)
401

    
402
  def _CloseTransport(self):
403
    """Close the transport, ignoring errors.
404

405
    """
406
    if self.transport is None:
407
      return
408
    try:
409
      old_transp = self.transport
410
      self.transport = None
411
      old_transp.Close()
412
    except Exception: # pylint: disable-msg=W0703
413
      pass
414

    
415
  def _SendMethodCall(self, data):
416
    # Send request and wait for response
417
    try:
418
      self._InitTransport()
419
      return self.transport.Call(data)
420
    except Exception:
421
      self._CloseTransport()
422
      raise
423

    
424
  def CallMethod(self, method, args):
425
    """Send a generic request and return the response.
426

427
    """
428
    return CallLuxiMethod(self._SendMethodCall, method, args,
429
                          version=constants.LUXI_VERSION)
430

    
431
  def SetQueueDrainFlag(self, drain_flag):
432
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
433

    
434
  def SetWatcherPause(self, until):
435
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
436

    
437
  def SubmitJob(self, ops):
438
    ops_state = map(lambda op: op.__getstate__(), ops)
439
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
440

    
441
  def SubmitManyJobs(self, jobs):
442
    jobs_state = []
443
    for ops in jobs:
444
      jobs_state.append([op.__getstate__() for op in ops])
445
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
446

    
447
  def CancelJob(self, job_id):
448
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
449

    
450
  def ArchiveJob(self, job_id):
451
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
452

    
453
  def AutoArchiveJobs(self, age):
454
    timeout = (DEF_RWTO - 1) / 2
455
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
456

    
457
  def WaitForJobChangeOnce(self, job_id, fields,
458
                           prev_job_info, prev_log_serial,
459
                           timeout=WFJC_TIMEOUT):
460
    """Waits for changes on a job.
461

462
    @param job_id: Job ID
463
    @type fields: list
464
    @param fields: List of field names to be observed
465
    @type prev_job_info: None or list
466
    @param prev_job_info: Previously received job information
467
    @type prev_log_serial: None or int/long
468
    @param prev_log_serial: Highest log serial number previously received
469
    @type timeout: int/float
470
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
471
                    be capped to that value)
472

473
    """
474
    assert timeout >= 0, "Timeout can not be negative"
475
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
476
                           (job_id, fields, prev_job_info,
477
                            prev_log_serial,
478
                            min(WFJC_TIMEOUT, timeout)))
479

    
480
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
481
    while True:
482
      result = self.WaitForJobChangeOnce(job_id, fields,
483
                                         prev_job_info, prev_log_serial)
484
      if result != constants.JOB_NOTCHANGED:
485
        break
486
    return result
487

    
488
  def QueryJobs(self, job_ids, fields):
489
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
490

    
491
  def QueryInstances(self, names, fields, use_locking):
492
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
493

    
494
  def QueryNodes(self, names, fields, use_locking):
495
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
496

    
497
  def QueryExports(self, nodes, use_locking):
498
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
499

    
500
  def QueryClusterInfo(self):
501
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
502

    
503
  def QueryConfigValues(self, fields):
504
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
505

    
506
  def QueryTags(self, kind, name):
507
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
508

    
509
  def QueryLocks(self, fields, sync):
510
    return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))