Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 28e3e216

History | View | Annotate | Download (14.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import serializer
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import utils
42

    
43

    
44
KEY_METHOD = "method"
45
KEY_ARGS = "args"
46
KEY_SUCCESS = "success"
47
KEY_RESULT = "result"
48
KEY_VERSION = "version"
49

    
50
REQ_SUBMIT_JOB = "SubmitJob"
51
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
52
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
53
REQ_CANCEL_JOB = "CancelJob"
54
REQ_ARCHIVE_JOB = "ArchiveJob"
55
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
56
REQ_QUERY_JOBS = "QueryJobs"
57
REQ_QUERY_INSTANCES = "QueryInstances"
58
REQ_QUERY_NODES = "QueryNodes"
59
REQ_QUERY_EXPORTS = "QueryExports"
60
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
61
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
62
REQ_QUERY_TAGS = "QueryTags"
63
REQ_QUERY_LOCKS = "QueryLocks"
64
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
65
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
66

    
67
DEF_CTMO = 10
68
DEF_RWTO = 60
69

    
70
# WaitForJobChange timeout
71
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
72

    
73

    
74
class ProtocolError(errors.LuxiError):
75
  """Denotes an error in the LUXI protocol."""
76

    
77

    
78
class ConnectionClosedError(ProtocolError):
79
  """Connection closed error."""
80

    
81

    
82
class TimeoutError(ProtocolError):
83
  """Operation timeout error."""
84

    
85

    
86
class RequestError(ProtocolError):
87
  """Error on request.
88

89
  This signifies an error in the request format or request handling,
90
  but not (e.g.) an error in starting up an instance.
91

92
  Some common conditions that can trigger this exception:
93
    - job submission failed because the job data was wrong
94
    - query failed because required fields were missing
95

96
  """
97

    
98

    
99
class NoMasterError(ProtocolError):
100
  """The master cannot be reached.
101

102
  This means that the master daemon is not running or the socket has
103
  been removed.
104

105
  """
106

    
107

    
108
class PermissionError(ProtocolError):
109
  """Permission denied while connecting to the master socket.
110

111
  This means the user doesn't have the proper rights.
112

113
  """
114

    
115

    
116
class Transport:
117
  """Low-level transport class.
118

119
  This is used on the client side.
120

121
  This could be replace by any other class that provides the same
122
  semantics to the Client. This means:
123
    - can send messages and receive messages
124
    - safe for multithreading
125

126
  """
127

    
128
  def __init__(self, address, timeouts=None):
129
    """Constructor for the Client class.
130

131
    Arguments:
132
      - address: a valid address the the used transport class
133
      - timeout: a list of timeouts, to be used on connect and read/write
134

135
    There are two timeouts used since we might want to wait for a long
136
    time for a response, but the connect timeout should be lower.
137

138
    If not passed, we use a default of 10 and respectively 60 seconds.
139

140
    Note that on reading data, since the timeout applies to an
141
    invidual receive, it might be that the total duration is longer
142
    than timeout value passed (we make a hard limit at twice the read
143
    timeout).
144

145
    """
146
    self.address = address
147
    if timeouts is None:
148
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
149
    else:
150
      self._ctimeout, self._rwtimeout = timeouts
151

    
152
    self.socket = None
153
    self._buffer = ""
154
    self._msgs = collections.deque()
155

    
156
    try:
157
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
158

    
159
      # Try to connect
160
      try:
161
        utils.Retry(self._Connect, 1.0, self._ctimeout,
162
                    args=(self.socket, address, self._ctimeout))
163
      except utils.RetryTimeout:
164
        raise TimeoutError("Connect timed out")
165

    
166
      self.socket.settimeout(self._rwtimeout)
167
    except (socket.error, NoMasterError):
168
      if self.socket is not None:
169
        self.socket.close()
170
      self.socket = None
171
      raise
172

    
173
  @staticmethod
174
  def _Connect(sock, address, timeout):
175
    sock.settimeout(timeout)
176
    try:
177
      sock.connect(address)
178
    except socket.timeout, err:
179
      raise TimeoutError("Connect timed out: %s" % str(err))
180
    except socket.error, err:
181
      error_code = err.args[0]
182
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
183
        raise NoMasterError(address)
184
      elif error_code in (errno.EPERM, errno.EACCES):
185
        raise PermissionError(address)
186
      elif error_code == errno.EAGAIN:
187
        # Server's socket backlog is full at the moment
188
        raise utils.RetryAgain()
189
      raise
190

    
191
  def _CheckSocket(self):
192
    """Make sure we are connected.
193

194
    """
195
    if self.socket is None:
196
      raise ProtocolError("Connection is closed")
197

    
198
  def Send(self, msg):
199
    """Send a message.
200

201
    This just sends a message and doesn't wait for the response.
202

203
    """
204
    if constants.LUXI_EOM in msg:
205
      raise ProtocolError("Message terminator found in payload")
206

    
207
    self._CheckSocket()
208
    try:
209
      # TODO: sendall is not guaranteed to send everything
210
      self.socket.sendall(msg + constants.LUXI_EOM)
211
    except socket.timeout, err:
212
      raise TimeoutError("Sending timeout: %s" % str(err))
213

    
214
  def Recv(self):
215
    """Try to receive a message from the socket.
216

217
    In case we already have messages queued, we just return from the
218
    queue. Otherwise, we try to read data with a _rwtimeout network
219
    timeout, and making sure we don't go over 2x_rwtimeout as a global
220
    limit.
221

222
    """
223
    self._CheckSocket()
224
    etime = time.time() + self._rwtimeout
225
    while not self._msgs:
226
      if time.time() > etime:
227
        raise TimeoutError("Extended receive timeout")
228
      while True:
229
        try:
230
          data = self.socket.recv(4096)
231
        except socket.timeout, err:
232
          raise TimeoutError("Receive timeout: %s" % str(err))
233
        except socket.error, err:
234
          if err.args and err.args[0] == errno.EAGAIN:
235
            continue
236
          raise
237
        break
238
      if not data:
239
        raise ConnectionClosedError("Connection closed while reading")
240
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
241
      self._buffer = new_msgs.pop()
242
      self._msgs.extend(new_msgs)
243
    return self._msgs.popleft()
244

    
245
  def Call(self, msg):
246
    """Send a message and wait for the response.
247

248
    This is just a wrapper over Send and Recv.
249

250
    """
251
    self.Send(msg)
252
    return self.Recv()
253

    
254
  def Close(self):
255
    """Close the socket"""
256
    if self.socket is not None:
257
      self.socket.close()
258
      self.socket = None
259

    
260

    
261
def ParseRequest(msg):
262
  """Parses a LUXI request message.
263

264
  """
265
  try:
266
    request = serializer.LoadJson(msg)
267
  except ValueError, err:
268
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
269

    
270
  logging.debug("LUXI request: %s", request)
271

    
272
  if not isinstance(request, dict):
273
    logging.error("LUXI request not a dict: %r", msg)
274
    raise ProtocolError("Invalid LUXI request (not a dict)")
275

    
276
  method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
277
  args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
278
  version = request.get(KEY_VERSION, None) # pylint: disable-msg=E1103
279

    
280
  if method is None or args is None:
281
    logging.error("LUXI request missing method or arguments: %r", msg)
282
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
283
                         " in request): %r") % msg)
284

    
285
  return (method, args, version)
286

    
287

    
288
def ParseResponse(msg):
289
  """Parses a LUXI response message.
290

291
  """
292
  # Parse the result
293
  try:
294
    data = serializer.LoadJson(msg)
295
  except Exception, err:
296
    raise ProtocolError("Error while deserializing response: %s" % str(err))
297

    
298
  # Validate response
299
  if not (isinstance(data, dict) and
300
          KEY_SUCCESS in data and
301
          KEY_RESULT in data):
302
    raise ProtocolError("Invalid response from server: %r" % data)
303

    
304
  return (data[KEY_SUCCESS], data[KEY_RESULT],
305
          data.get(KEY_VERSION, None)) # pylint: disable-msg=E1103
306

    
307

    
308
def FormatResponse(success, result, version=None):
309
  """Formats a LUXI response message.
310

311
  """
312
  response = {
313
    KEY_SUCCESS: success,
314
    KEY_RESULT: result,
315
    }
316

    
317
  if version is not None:
318
    response[KEY_VERSION] = version
319

    
320
  logging.debug("LUXI response: %s", response)
321

    
322
  return serializer.DumpJson(response)
323

    
324

    
325
def FormatRequest(method, args, version=None):
326
  """Formats a LUXI request message.
327

328
  """
329
  # Build request
330
  request = {
331
    KEY_METHOD: method,
332
    KEY_ARGS: args,
333
    }
334

    
335
  if version is not None:
336
    request[KEY_VERSION] = version
337

    
338
  # Serialize the request
339
  return serializer.DumpJson(request, indent=False)
340

    
341

    
342
def CallLuxiMethod(transport_cb, method, args, version=None):
343
  """Send a LUXI request via a transport and return the response.
344

345
  """
346
  assert callable(transport_cb)
347

    
348
  request_msg = FormatRequest(method, args, version=version)
349

    
350
  # Send request and wait for response
351
  response_msg = transport_cb(request_msg)
352

    
353
  (success, result, resp_version) = ParseResponse(response_msg)
354

    
355
  # Verify version if there was one in the response
356
  if resp_version is not None and resp_version != version:
357
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
358
                           (version, resp_version))
359

    
360
  if success:
361
    return result
362

    
363
  errors.MaybeRaise(result)
364
  raise RequestError(result)
365

    
366

    
367
class Client(object):
368
  """High-level client implementation.
369

370
  This uses a backing Transport-like class on top of which it
371
  implements data serialization/deserialization.
372

373
  """
374
  def __init__(self, address=None, timeouts=None, transport=Transport):
375
    """Constructor for the Client class.
376

377
    Arguments:
378
      - address: a valid address the the used transport class
379
      - timeout: a list of timeouts, to be used on connect and read/write
380
      - transport: a Transport-like class
381

382

383
    If timeout is not passed, the default timeouts of the transport
384
    class are used.
385

386
    """
387
    if address is None:
388
      address = constants.MASTER_SOCKET
389
    self.address = address
390
    self.timeouts = timeouts
391
    self.transport_class = transport
392
    self.transport = None
393
    self._InitTransport()
394

    
395
  def _InitTransport(self):
396
    """(Re)initialize the transport if needed.
397

398
    """
399
    if self.transport is None:
400
      self.transport = self.transport_class(self.address,
401
                                            timeouts=self.timeouts)
402

    
403
  def _CloseTransport(self):
404
    """Close the transport, ignoring errors.
405

406
    """
407
    if self.transport is None:
408
      return
409
    try:
410
      old_transp = self.transport
411
      self.transport = None
412
      old_transp.Close()
413
    except Exception: # pylint: disable-msg=W0703
414
      pass
415

    
416
  def _SendMethodCall(self, data):
417
    # Send request and wait for response
418
    try:
419
      self._InitTransport()
420
      return self.transport.Call(data)
421
    except Exception:
422
      self._CloseTransport()
423
      raise
424

    
425
  def CallMethod(self, method, args):
426
    """Send a generic request and return the response.
427

428
    """
429
    return CallLuxiMethod(self._SendMethodCall, method, args,
430
                          version=constants.LUXI_VERSION)
431

    
432
  def SetQueueDrainFlag(self, drain_flag):
433
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
434

    
435
  def SetWatcherPause(self, until):
436
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
437

    
438
  def SubmitJob(self, ops):
439
    ops_state = map(lambda op: op.__getstate__(), ops)
440
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
441

    
442
  def SubmitManyJobs(self, jobs):
443
    jobs_state = []
444
    for ops in jobs:
445
      jobs_state.append([op.__getstate__() for op in ops])
446
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
447

    
448
  def CancelJob(self, job_id):
449
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
450

    
451
  def ArchiveJob(self, job_id):
452
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
453

    
454
  def AutoArchiveJobs(self, age):
455
    timeout = (DEF_RWTO - 1) / 2
456
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
457

    
458
  def WaitForJobChangeOnce(self, job_id, fields,
459
                           prev_job_info, prev_log_serial,
460
                           timeout=WFJC_TIMEOUT):
461
    """Waits for changes on a job.
462

463
    @param job_id: Job ID
464
    @type fields: list
465
    @param fields: List of field names to be observed
466
    @type prev_job_info: None or list
467
    @param prev_job_info: Previously received job information
468
    @type prev_log_serial: None or int/long
469
    @param prev_log_serial: Highest log serial number previously received
470
    @type timeout: int/float
471
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
472
                    be capped to that value)
473

474
    """
475
    assert timeout >= 0, "Timeout can not be negative"
476
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
477
                           (job_id, fields, prev_job_info,
478
                            prev_log_serial,
479
                            min(WFJC_TIMEOUT, timeout)))
480

    
481
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
482
    while True:
483
      result = self.WaitForJobChangeOnce(job_id, fields,
484
                                         prev_job_info, prev_log_serial)
485
      if result != constants.JOB_NOTCHANGED:
486
        break
487
    return result
488

    
489
  def QueryJobs(self, job_ids, fields):
490
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
491

    
492
  def QueryInstances(self, names, fields, use_locking):
493
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
494

    
495
  def QueryNodes(self, names, fields, use_locking):
496
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
497

    
498
  def QueryExports(self, nodes, use_locking):
499
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
500

    
501
  def QueryClusterInfo(self):
502
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
503

    
504
  def QueryConfigValues(self, fields):
505
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
506

    
507
  def QueryTags(self, kind, name):
508
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
509

    
510
  def QueryLocks(self, fields, sync):
511
    return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))