Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ a79ef2a5

History | View | Annotate | Download (14.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import serializer
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import utils
42

    
43

    
44
KEY_METHOD = "method"
45
KEY_ARGS = "args"
46
KEY_SUCCESS = "success"
47
KEY_RESULT = "result"
48
KEY_VERSION = "version"
49

    
50
REQ_SUBMIT_JOB = "SubmitJob"
51
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
52
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
53
REQ_CANCEL_JOB = "CancelJob"
54
REQ_ARCHIVE_JOB = "ArchiveJob"
55
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
56
REQ_QUERY_JOBS = "QueryJobs"
57
REQ_QUERY_INSTANCES = "QueryInstances"
58
REQ_QUERY_NODES = "QueryNodes"
59
REQ_QUERY_GROUPS = "QueryGroups"
60
REQ_QUERY_EXPORTS = "QueryExports"
61
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
62
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
63
REQ_QUERY_TAGS = "QueryTags"
64
REQ_QUERY_LOCKS = "QueryLocks"
65
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
66
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
67

    
68
DEF_CTMO = 10
69
DEF_RWTO = 60
70

    
71
# WaitForJobChange timeout
72
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
73

    
74

    
75
class ProtocolError(errors.LuxiError):
76
  """Denotes an error in the LUXI protocol."""
77

    
78

    
79
class ConnectionClosedError(ProtocolError):
80
  """Connection closed error."""
81

    
82

    
83
class TimeoutError(ProtocolError):
84
  """Operation timeout error."""
85

    
86

    
87
class RequestError(ProtocolError):
88
  """Error on request.
89

90
  This signifies an error in the request format or request handling,
91
  but not (e.g.) an error in starting up an instance.
92

93
  Some common conditions that can trigger this exception:
94
    - job submission failed because the job data was wrong
95
    - query failed because required fields were missing
96

97
  """
98

    
99

    
100
class NoMasterError(ProtocolError):
101
  """The master cannot be reached.
102

103
  This means that the master daemon is not running or the socket has
104
  been removed.
105

106
  """
107

    
108

    
109
class PermissionError(ProtocolError):
110
  """Permission denied while connecting to the master socket.
111

112
  This means the user doesn't have the proper rights.
113

114
  """
115

    
116

    
117
class Transport:
118
  """Low-level transport class.
119

120
  This is used on the client side.
121

122
  This could be replace by any other class that provides the same
123
  semantics to the Client. This means:
124
    - can send messages and receive messages
125
    - safe for multithreading
126

127
  """
128

    
129
  def __init__(self, address, timeouts=None):
130
    """Constructor for the Client class.
131

132
    Arguments:
133
      - address: a valid address the the used transport class
134
      - timeout: a list of timeouts, to be used on connect and read/write
135

136
    There are two timeouts used since we might want to wait for a long
137
    time for a response, but the connect timeout should be lower.
138

139
    If not passed, we use a default of 10 and respectively 60 seconds.
140

141
    Note that on reading data, since the timeout applies to an
142
    invidual receive, it might be that the total duration is longer
143
    than timeout value passed (we make a hard limit at twice the read
144
    timeout).
145

146
    """
147
    self.address = address
148
    if timeouts is None:
149
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
150
    else:
151
      self._ctimeout, self._rwtimeout = timeouts
152

    
153
    self.socket = None
154
    self._buffer = ""
155
    self._msgs = collections.deque()
156

    
157
    try:
158
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
159

    
160
      # Try to connect
161
      try:
162
        utils.Retry(self._Connect, 1.0, self._ctimeout,
163
                    args=(self.socket, address, self._ctimeout))
164
      except utils.RetryTimeout:
165
        raise TimeoutError("Connect timed out")
166

    
167
      self.socket.settimeout(self._rwtimeout)
168
    except (socket.error, NoMasterError):
169
      if self.socket is not None:
170
        self.socket.close()
171
      self.socket = None
172
      raise
173

    
174
  @staticmethod
175
  def _Connect(sock, address, timeout):
176
    sock.settimeout(timeout)
177
    try:
178
      sock.connect(address)
179
    except socket.timeout, err:
180
      raise TimeoutError("Connect timed out: %s" % str(err))
181
    except socket.error, err:
182
      error_code = err.args[0]
183
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
184
        raise NoMasterError(address)
185
      elif error_code in (errno.EPERM, errno.EACCES):
186
        raise PermissionError(address)
187
      elif error_code == errno.EAGAIN:
188
        # Server's socket backlog is full at the moment
189
        raise utils.RetryAgain()
190
      raise
191

    
192
  def _CheckSocket(self):
193
    """Make sure we are connected.
194

195
    """
196
    if self.socket is None:
197
      raise ProtocolError("Connection is closed")
198

    
199
  def Send(self, msg):
200
    """Send a message.
201

202
    This just sends a message and doesn't wait for the response.
203

204
    """
205
    if constants.LUXI_EOM in msg:
206
      raise ProtocolError("Message terminator found in payload")
207

    
208
    self._CheckSocket()
209
    try:
210
      # TODO: sendall is not guaranteed to send everything
211
      self.socket.sendall(msg + constants.LUXI_EOM)
212
    except socket.timeout, err:
213
      raise TimeoutError("Sending timeout: %s" % str(err))
214

    
215
  def Recv(self):
216
    """Try to receive a message from the socket.
217

218
    In case we already have messages queued, we just return from the
219
    queue. Otherwise, we try to read data with a _rwtimeout network
220
    timeout, and making sure we don't go over 2x_rwtimeout as a global
221
    limit.
222

223
    """
224
    self._CheckSocket()
225
    etime = time.time() + self._rwtimeout
226
    while not self._msgs:
227
      if time.time() > etime:
228
        raise TimeoutError("Extended receive timeout")
229
      while True:
230
        try:
231
          data = self.socket.recv(4096)
232
        except socket.error, err:
233
          if err.args and err.args[0] == errno.EAGAIN:
234
            continue
235
          raise
236
        except socket.timeout, err:
237
          raise TimeoutError("Receive timeout: %s" % str(err))
238
        break
239
      if not data:
240
        raise ConnectionClosedError("Connection closed while reading")
241
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
242
      self._buffer = new_msgs.pop()
243
      self._msgs.extend(new_msgs)
244
    return self._msgs.popleft()
245

    
246
  def Call(self, msg):
247
    """Send a message and wait for the response.
248

249
    This is just a wrapper over Send and Recv.
250

251
    """
252
    self.Send(msg)
253
    return self.Recv()
254

    
255
  def Close(self):
256
    """Close the socket"""
257
    if self.socket is not None:
258
      self.socket.close()
259
      self.socket = None
260

    
261

    
262
def ParseRequest(msg):
263
  """Parses a LUXI request message.
264

265
  """
266
  try:
267
    request = serializer.LoadJson(msg)
268
  except ValueError, err:
269
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
270

    
271
  logging.debug("LUXI request: %s", request)
272

    
273
  if not isinstance(request, dict):
274
    logging.error("LUXI request not a dict: %r", msg)
275
    raise ProtocolError("Invalid LUXI request (not a dict)")
276

    
277
  method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
278
  args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
279
  version = request.get(KEY_VERSION, None) # pylint: disable-msg=E1103
280

    
281
  if method is None or args is None:
282
    logging.error("LUXI request missing method or arguments: %r", msg)
283
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
284
                         " in request): %r") % msg)
285

    
286
  return (method, args, version)
287

    
288

    
289
def ParseResponse(msg):
290
  """Parses a LUXI response message.
291

292
  """
293
  # Parse the result
294
  try:
295
    data = serializer.LoadJson(msg)
296
  except Exception, err:
297
    raise ProtocolError("Error while deserializing response: %s" % str(err))
298

    
299
  # Validate response
300
  if not (isinstance(data, dict) and
301
          KEY_SUCCESS in data and
302
          KEY_RESULT in data):
303
    raise ProtocolError("Invalid response from server: %r" % data)
304

    
305
  return (data[KEY_SUCCESS], data[KEY_RESULT],
306
          data.get(KEY_VERSION, None)) # pylint: disable-msg=E1103
307

    
308

    
309
def FormatResponse(success, result, version=None):
310
  """Formats a LUXI response message.
311

312
  """
313
  response = {
314
    KEY_SUCCESS: success,
315
    KEY_RESULT: result,
316
    }
317

    
318
  if version is not None:
319
    response[KEY_VERSION] = version
320

    
321
  logging.debug("LUXI response: %s", response)
322

    
323
  return serializer.DumpJson(response)
324

    
325

    
326
def FormatRequest(method, args, version=None):
327
  """Formats a LUXI request message.
328

329
  """
330
  # Build request
331
  request = {
332
    KEY_METHOD: method,
333
    KEY_ARGS: args,
334
    }
335

    
336
  if version is not None:
337
    request[KEY_VERSION] = version
338

    
339
  # Serialize the request
340
  return serializer.DumpJson(request, indent=False)
341

    
342

    
343
def CallLuxiMethod(transport_cb, method, args, version=None):
344
  """Send a LUXI request via a transport and return the response.
345

346
  """
347
  assert callable(transport_cb)
348

    
349
  request_msg = FormatRequest(method, args, version=version)
350

    
351
  # Send request and wait for response
352
  response_msg = transport_cb(request_msg)
353

    
354
  (success, result, resp_version) = ParseResponse(response_msg)
355

    
356
  # Verify version if there was one in the response
357
  if resp_version is not None and resp_version != version:
358
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
359
                           (version, resp_version))
360

    
361
  if success:
362
    return result
363

    
364
  errors.MaybeRaise(result)
365
  raise RequestError(result)
366

    
367

    
368
class Client(object):
369
  """High-level client implementation.
370

371
  This uses a backing Transport-like class on top of which it
372
  implements data serialization/deserialization.
373

374
  """
375
  def __init__(self, address=None, timeouts=None, transport=Transport):
376
    """Constructor for the Client class.
377

378
    Arguments:
379
      - address: a valid address the the used transport class
380
      - timeout: a list of timeouts, to be used on connect and read/write
381
      - transport: a Transport-like class
382

383

384
    If timeout is not passed, the default timeouts of the transport
385
    class are used.
386

387
    """
388
    if address is None:
389
      address = constants.MASTER_SOCKET
390
    self.address = address
391
    self.timeouts = timeouts
392
    self.transport_class = transport
393
    self.transport = None
394
    self._InitTransport()
395

    
396
  def _InitTransport(self):
397
    """(Re)initialize the transport if needed.
398

399
    """
400
    if self.transport is None:
401
      self.transport = self.transport_class(self.address,
402
                                            timeouts=self.timeouts)
403

    
404
  def _CloseTransport(self):
405
    """Close the transport, ignoring errors.
406

407
    """
408
    if self.transport is None:
409
      return
410
    try:
411
      old_transp = self.transport
412
      self.transport = None
413
      old_transp.Close()
414
    except Exception: # pylint: disable-msg=W0703
415
      pass
416

    
417
  def _SendMethodCall(self, data):
418
    # Send request and wait for response
419
    try:
420
      self._InitTransport()
421
      return self.transport.Call(data)
422
    except Exception:
423
      self._CloseTransport()
424
      raise
425

    
426
  def CallMethod(self, method, args):
427
    """Send a generic request and return the response.
428

429
    """
430
    return CallLuxiMethod(self._SendMethodCall, method, args,
431
                          version=constants.LUXI_VERSION)
432

    
433
  def SetQueueDrainFlag(self, drain_flag):
434
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
435

    
436
  def SetWatcherPause(self, until):
437
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
438

    
439
  def SubmitJob(self, ops):
440
    ops_state = map(lambda op: op.__getstate__(), ops)
441
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
442

    
443
  def SubmitManyJobs(self, jobs):
444
    jobs_state = []
445
    for ops in jobs:
446
      jobs_state.append([op.__getstate__() for op in ops])
447
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
448

    
449
  def CancelJob(self, job_id):
450
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
451

    
452
  def ArchiveJob(self, job_id):
453
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
454

    
455
  def AutoArchiveJobs(self, age):
456
    timeout = (DEF_RWTO - 1) / 2
457
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
458

    
459
  def WaitForJobChangeOnce(self, job_id, fields,
460
                           prev_job_info, prev_log_serial,
461
                           timeout=WFJC_TIMEOUT):
462
    """Waits for changes on a job.
463

464
    @param job_id: Job ID
465
    @type fields: list
466
    @param fields: List of field names to be observed
467
    @type prev_job_info: None or list
468
    @param prev_job_info: Previously received job information
469
    @type prev_log_serial: None or int/long
470
    @param prev_log_serial: Highest log serial number previously received
471
    @type timeout: int/float
472
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
473
                    be capped to that value)
474

475
    """
476
    assert timeout >= 0, "Timeout can not be negative"
477
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
478
                           (job_id, fields, prev_job_info,
479
                            prev_log_serial,
480
                            min(WFJC_TIMEOUT, timeout)))
481

    
482
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
483
    while True:
484
      result = self.WaitForJobChangeOnce(job_id, fields,
485
                                         prev_job_info, prev_log_serial)
486
      if result != constants.JOB_NOTCHANGED:
487
        break
488
    return result
489

    
490
  def QueryJobs(self, job_ids, fields):
491
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
492

    
493
  def QueryInstances(self, names, fields, use_locking):
494
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
495

    
496
  def QueryNodes(self, names, fields, use_locking):
497
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
498

    
499
  def QueryGroups(self, names, fields, use_locking):
500
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
501

    
502
  def QueryExports(self, nodes, use_locking):
503
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
504

    
505
  def QueryClusterInfo(self):
506
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
507

    
508
  def QueryConfigValues(self, fields):
509
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
510

    
511
  def QueryTags(self, kind, name):
512
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
513

    
514
  def QueryLocks(self, fields, sync):
515
    return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))