Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 28b71a76

History | View | Annotate | Download (15.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import serializer
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import utils
42
from ganeti import objects
43

    
44

    
45
KEY_METHOD = "method"
46
KEY_ARGS = "args"
47
KEY_SUCCESS = "success"
48
KEY_RESULT = "result"
49
KEY_VERSION = "version"
50

    
51
REQ_SUBMIT_JOB = "SubmitJob"
52
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
53
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
54
REQ_CANCEL_JOB = "CancelJob"
55
REQ_ARCHIVE_JOB = "ArchiveJob"
56
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
57
REQ_QUERY = "Query"
58
REQ_QUERY_FIELDS = "QueryFields"
59
REQ_QUERY_JOBS = "QueryJobs"
60
REQ_QUERY_INSTANCES = "QueryInstances"
61
REQ_QUERY_NODES = "QueryNodes"
62
REQ_QUERY_GROUPS = "QueryGroups"
63
REQ_QUERY_EXPORTS = "QueryExports"
64
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
65
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
66
REQ_QUERY_TAGS = "QueryTags"
67
REQ_QUERY_LOCKS = "QueryLocks"
68
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
69
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
70

    
71
DEF_CTMO = 10
72
DEF_RWTO = 60
73

    
74
# WaitForJobChange timeout
75
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
76

    
77

    
78
class ProtocolError(errors.LuxiError):
79
  """Denotes an error in the LUXI protocol."""
80

    
81

    
82
class ConnectionClosedError(ProtocolError):
83
  """Connection closed error."""
84

    
85

    
86
class TimeoutError(ProtocolError):
87
  """Operation timeout error."""
88

    
89

    
90
class RequestError(ProtocolError):
91
  """Error on request.
92

93
  This signifies an error in the request format or request handling,
94
  but not (e.g.) an error in starting up an instance.
95

96
  Some common conditions that can trigger this exception:
97
    - job submission failed because the job data was wrong
98
    - query failed because required fields were missing
99

100
  """
101

    
102

    
103
class NoMasterError(ProtocolError):
104
  """The master cannot be reached.
105

106
  This means that the master daemon is not running or the socket has
107
  been removed.
108

109
  """
110

    
111

    
112
class PermissionError(ProtocolError):
113
  """Permission denied while connecting to the master socket.
114

115
  This means the user doesn't have the proper rights.
116

117
  """
118

    
119

    
120
class Transport:
121
  """Low-level transport class.
122

123
  This is used on the client side.
124

125
  This could be replace by any other class that provides the same
126
  semantics to the Client. This means:
127
    - can send messages and receive messages
128
    - safe for multithreading
129

130
  """
131

    
132
  def __init__(self, address, timeouts=None):
133
    """Constructor for the Client class.
134

135
    Arguments:
136
      - address: a valid address the the used transport class
137
      - timeout: a list of timeouts, to be used on connect and read/write
138

139
    There are two timeouts used since we might want to wait for a long
140
    time for a response, but the connect timeout should be lower.
141

142
    If not passed, we use a default of 10 and respectively 60 seconds.
143

144
    Note that on reading data, since the timeout applies to an
145
    invidual receive, it might be that the total duration is longer
146
    than timeout value passed (we make a hard limit at twice the read
147
    timeout).
148

149
    """
150
    self.address = address
151
    if timeouts is None:
152
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
153
    else:
154
      self._ctimeout, self._rwtimeout = timeouts
155

    
156
    self.socket = None
157
    self._buffer = ""
158
    self._msgs = collections.deque()
159

    
160
    try:
161
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
162

    
163
      # Try to connect
164
      try:
165
        utils.Retry(self._Connect, 1.0, self._ctimeout,
166
                    args=(self.socket, address, self._ctimeout))
167
      except utils.RetryTimeout:
168
        raise TimeoutError("Connect timed out")
169

    
170
      self.socket.settimeout(self._rwtimeout)
171
    except (socket.error, NoMasterError):
172
      if self.socket is not None:
173
        self.socket.close()
174
      self.socket = None
175
      raise
176

    
177
  @staticmethod
178
  def _Connect(sock, address, timeout):
179
    sock.settimeout(timeout)
180
    try:
181
      sock.connect(address)
182
    except socket.timeout, err:
183
      raise TimeoutError("Connect timed out: %s" % str(err))
184
    except socket.error, err:
185
      error_code = err.args[0]
186
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
187
        raise NoMasterError(address)
188
      elif error_code in (errno.EPERM, errno.EACCES):
189
        raise PermissionError(address)
190
      elif error_code == errno.EAGAIN:
191
        # Server's socket backlog is full at the moment
192
        raise utils.RetryAgain()
193
      raise
194

    
195
  def _CheckSocket(self):
196
    """Make sure we are connected.
197

198
    """
199
    if self.socket is None:
200
      raise ProtocolError("Connection is closed")
201

    
202
  def Send(self, msg):
203
    """Send a message.
204

205
    This just sends a message and doesn't wait for the response.
206

207
    """
208
    if constants.LUXI_EOM in msg:
209
      raise ProtocolError("Message terminator found in payload")
210

    
211
    self._CheckSocket()
212
    try:
213
      # TODO: sendall is not guaranteed to send everything
214
      self.socket.sendall(msg + constants.LUXI_EOM)
215
    except socket.timeout, err:
216
      raise TimeoutError("Sending timeout: %s" % str(err))
217

    
218
  def Recv(self):
219
    """Try to receive a message from the socket.
220

221
    In case we already have messages queued, we just return from the
222
    queue. Otherwise, we try to read data with a _rwtimeout network
223
    timeout, and making sure we don't go over 2x_rwtimeout as a global
224
    limit.
225

226
    """
227
    self._CheckSocket()
228
    etime = time.time() + self._rwtimeout
229
    while not self._msgs:
230
      if time.time() > etime:
231
        raise TimeoutError("Extended receive timeout")
232
      while True:
233
        try:
234
          data = self.socket.recv(4096)
235
        except socket.error, err:
236
          if err.args and err.args[0] == errno.EAGAIN:
237
            continue
238
          raise
239
        except socket.timeout, err:
240
          raise TimeoutError("Receive timeout: %s" % str(err))
241
        break
242
      if not data:
243
        raise ConnectionClosedError("Connection closed while reading")
244
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
245
      self._buffer = new_msgs.pop()
246
      self._msgs.extend(new_msgs)
247
    return self._msgs.popleft()
248

    
249
  def Call(self, msg):
250
    """Send a message and wait for the response.
251

252
    This is just a wrapper over Send and Recv.
253

254
    """
255
    self.Send(msg)
256
    return self.Recv()
257

    
258
  def Close(self):
259
    """Close the socket"""
260
    if self.socket is not None:
261
      self.socket.close()
262
      self.socket = None
263

    
264

    
265
def ParseRequest(msg):
266
  """Parses a LUXI request message.
267

268
  """
269
  try:
270
    request = serializer.LoadJson(msg)
271
  except ValueError, err:
272
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
273

    
274
  logging.debug("LUXI request: %s", request)
275

    
276
  if not isinstance(request, dict):
277
    logging.error("LUXI request not a dict: %r", msg)
278
    raise ProtocolError("Invalid LUXI request (not a dict)")
279

    
280
  method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
281
  args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
282
  version = request.get(KEY_VERSION, None) # pylint: disable-msg=E1103
283

    
284
  if method is None or args is None:
285
    logging.error("LUXI request missing method or arguments: %r", msg)
286
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
287
                         " in request): %r") % msg)
288

    
289
  return (method, args, version)
290

    
291

    
292
def ParseResponse(msg):
293
  """Parses a LUXI response message.
294

295
  """
296
  # Parse the result
297
  try:
298
    data = serializer.LoadJson(msg)
299
  except Exception, err:
300
    raise ProtocolError("Error while deserializing response: %s" % str(err))
301

    
302
  # Validate response
303
  if not (isinstance(data, dict) and
304
          KEY_SUCCESS in data and
305
          KEY_RESULT in data):
306
    raise ProtocolError("Invalid response from server: %r" % data)
307

    
308
  return (data[KEY_SUCCESS], data[KEY_RESULT],
309
          data.get(KEY_VERSION, None)) # pylint: disable-msg=E1103
310

    
311

    
312
def FormatResponse(success, result, version=None):
313
  """Formats a LUXI response message.
314

315
  """
316
  response = {
317
    KEY_SUCCESS: success,
318
    KEY_RESULT: result,
319
    }
320

    
321
  if version is not None:
322
    response[KEY_VERSION] = version
323

    
324
  logging.debug("LUXI response: %s", response)
325

    
326
  return serializer.DumpJson(response)
327

    
328

    
329
def FormatRequest(method, args, version=None):
330
  """Formats a LUXI request message.
331

332
  """
333
  # Build request
334
  request = {
335
    KEY_METHOD: method,
336
    KEY_ARGS: args,
337
    }
338

    
339
  if version is not None:
340
    request[KEY_VERSION] = version
341

    
342
  # Serialize the request
343
  return serializer.DumpJson(request, indent=False)
344

    
345

    
346
def CallLuxiMethod(transport_cb, method, args, version=None):
347
  """Send a LUXI request via a transport and return the response.
348

349
  """
350
  assert callable(transport_cb)
351

    
352
  request_msg = FormatRequest(method, args, version=version)
353

    
354
  # Send request and wait for response
355
  response_msg = transport_cb(request_msg)
356

    
357
  (success, result, resp_version) = ParseResponse(response_msg)
358

    
359
  # Verify version if there was one in the response
360
  if resp_version is not None and resp_version != version:
361
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
362
                           (version, resp_version))
363

    
364
  if success:
365
    return result
366

    
367
  errors.MaybeRaise(result)
368
  raise RequestError(result)
369

    
370

    
371
class Client(object):
372
  """High-level client implementation.
373

374
  This uses a backing Transport-like class on top of which it
375
  implements data serialization/deserialization.
376

377
  """
378
  def __init__(self, address=None, timeouts=None, transport=Transport):
379
    """Constructor for the Client class.
380

381
    Arguments:
382
      - address: a valid address the the used transport class
383
      - timeout: a list of timeouts, to be used on connect and read/write
384
      - transport: a Transport-like class
385

386

387
    If timeout is not passed, the default timeouts of the transport
388
    class are used.
389

390
    """
391
    if address is None:
392
      address = constants.MASTER_SOCKET
393
    self.address = address
394
    self.timeouts = timeouts
395
    self.transport_class = transport
396
    self.transport = None
397
    self._InitTransport()
398

    
399
  def _InitTransport(self):
400
    """(Re)initialize the transport if needed.
401

402
    """
403
    if self.transport is None:
404
      self.transport = self.transport_class(self.address,
405
                                            timeouts=self.timeouts)
406

    
407
  def _CloseTransport(self):
408
    """Close the transport, ignoring errors.
409

410
    """
411
    if self.transport is None:
412
      return
413
    try:
414
      old_transp = self.transport
415
      self.transport = None
416
      old_transp.Close()
417
    except Exception: # pylint: disable-msg=W0703
418
      pass
419

    
420
  def _SendMethodCall(self, data):
421
    # Send request and wait for response
422
    try:
423
      self._InitTransport()
424
      return self.transport.Call(data)
425
    except Exception:
426
      self._CloseTransport()
427
      raise
428

    
429
  def CallMethod(self, method, args):
430
    """Send a generic request and return the response.
431

432
    """
433
    return CallLuxiMethod(self._SendMethodCall, method, args,
434
                          version=constants.LUXI_VERSION)
435

    
436
  def SetQueueDrainFlag(self, drain_flag):
437
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
438

    
439
  def SetWatcherPause(self, until):
440
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
441

    
442
  def SubmitJob(self, ops):
443
    ops_state = map(lambda op: op.__getstate__(), ops)
444
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
445

    
446
  def SubmitManyJobs(self, jobs):
447
    jobs_state = []
448
    for ops in jobs:
449
      jobs_state.append([op.__getstate__() for op in ops])
450
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
451

    
452
  def CancelJob(self, job_id):
453
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
454

    
455
  def ArchiveJob(self, job_id):
456
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
457

    
458
  def AutoArchiveJobs(self, age):
459
    timeout = (DEF_RWTO - 1) / 2
460
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
461

    
462
  def WaitForJobChangeOnce(self, job_id, fields,
463
                           prev_job_info, prev_log_serial,
464
                           timeout=WFJC_TIMEOUT):
465
    """Waits for changes on a job.
466

467
    @param job_id: Job ID
468
    @type fields: list
469
    @param fields: List of field names to be observed
470
    @type prev_job_info: None or list
471
    @param prev_job_info: Previously received job information
472
    @type prev_log_serial: None or int/long
473
    @param prev_log_serial: Highest log serial number previously received
474
    @type timeout: int/float
475
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
476
                    be capped to that value)
477

478
    """
479
    assert timeout >= 0, "Timeout can not be negative"
480
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
481
                           (job_id, fields, prev_job_info,
482
                            prev_log_serial,
483
                            min(WFJC_TIMEOUT, timeout)))
484

    
485
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
486
    while True:
487
      result = self.WaitForJobChangeOnce(job_id, fields,
488
                                         prev_job_info, prev_log_serial)
489
      if result != constants.JOB_NOTCHANGED:
490
        break
491
    return result
492

    
493
  def Query(self, what, fields, filter_):
494
    """Query for resources/items.
495

496
    @param what: One of L{constants.QR_OP_LUXI}
497
    @type fields: List of strings
498
    @param fields: List of requested fields
499
    @type filter_: None or list
500
    @param filter_: Query filter
501
    @rtype: L{objects.QueryResponse}
502

503
    """
504
    req = objects.QueryRequest(what=what, fields=fields, filter=filter_)
505
    result = self.CallMethod(REQ_QUERY, req.ToDict())
506
    return objects.QueryResponse.FromDict(result)
507

    
508
  def QueryFields(self, what, fields):
509
    """Query for available fields.
510

511
    @param what: One of L{constants.QR_OP_LUXI}
512
    @type fields: None or list of strings
513
    @param fields: List of requested fields
514
    @rtype: L{objects.QueryFieldsResponse}
515

516
    """
517
    req = objects.QueryFieldsRequest(what=what, fields=fields)
518
    result = self.CallMethod(REQ_QUERY_FIELDS, req.ToDict())
519
    return objects.QueryFieldsResponse.FromDict(result)
520

    
521
  def QueryJobs(self, job_ids, fields):
522
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
523

    
524
  def QueryInstances(self, names, fields, use_locking):
525
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
526

    
527
  def QueryNodes(self, names, fields, use_locking):
528
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
529

    
530
  def QueryGroups(self, names, fields, use_locking):
531
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
532

    
533
  def QueryExports(self, nodes, use_locking):
534
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
535

    
536
  def QueryClusterInfo(self):
537
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
538

    
539
  def QueryConfigValues(self, fields):
540
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
541

    
542
  def QueryTags(self, kind, name):
543
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
544

    
545
  def QueryLocks(self, fields, sync):
546
    return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))