Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 7eb01378

History | View | Annotate | Download (16.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import serializer
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import utils
42
from ganeti import objects
43

    
44

    
45
KEY_METHOD = "method"
46
KEY_ARGS = "args"
47
KEY_SUCCESS = "success"
48
KEY_RESULT = "result"
49
KEY_VERSION = "version"
50

    
51
REQ_SUBMIT_JOB = "SubmitJob"
52
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
53
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
54
REQ_CANCEL_JOB = "CancelJob"
55
REQ_ARCHIVE_JOB = "ArchiveJob"
56
REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
57
REQ_QUERY = "Query"
58
REQ_QUERY_FIELDS = "QueryFields"
59
REQ_QUERY_JOBS = "QueryJobs"
60
REQ_QUERY_INSTANCES = "QueryInstances"
61
REQ_QUERY_NODES = "QueryNodes"
62
REQ_QUERY_GROUPS = "QueryGroups"
63
REQ_QUERY_NETWORKS = "QueryNetworks"
64
REQ_QUERY_EXPORTS = "QueryExports"
65
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
66
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
67
REQ_QUERY_TAGS = "QueryTags"
68
REQ_SET_DRAIN_FLAG = "SetDrainFlag"
69
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
70

    
71
#: List of all LUXI requests
72
REQ_ALL = frozenset([
73
  REQ_ARCHIVE_JOB,
74
  REQ_AUTO_ARCHIVE_JOBS,
75
  REQ_CANCEL_JOB,
76
  REQ_QUERY,
77
  REQ_QUERY_CLUSTER_INFO,
78
  REQ_QUERY_CONFIG_VALUES,
79
  REQ_QUERY_EXPORTS,
80
  REQ_QUERY_FIELDS,
81
  REQ_QUERY_GROUPS,
82
  REQ_QUERY_INSTANCES,
83
  REQ_QUERY_JOBS,
84
  REQ_QUERY_NODES,
85
  REQ_QUERY_TAGS,
86
  REQ_SET_DRAIN_FLAG,
87
  REQ_SET_WATCHER_PAUSE,
88
  REQ_SUBMIT_JOB,
89
  REQ_SUBMIT_MANY_JOBS,
90
  REQ_WAIT_FOR_JOB_CHANGE,
91
  ])
92

    
93
DEF_CTMO = 10
94
DEF_RWTO = 60
95

    
96
# WaitForJobChange timeout
97
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
98

    
99

    
100
class ProtocolError(errors.LuxiError):
101
  """Denotes an error in the LUXI protocol."""
102

    
103

    
104
class ConnectionClosedError(ProtocolError):
105
  """Connection closed error."""
106

    
107

    
108
class TimeoutError(ProtocolError):
109
  """Operation timeout error."""
110

    
111

    
112
class RequestError(ProtocolError):
113
  """Error on request.
114

115
  This signifies an error in the request format or request handling,
116
  but not (e.g.) an error in starting up an instance.
117

118
  Some common conditions that can trigger this exception:
119
    - job submission failed because the job data was wrong
120
    - query failed because required fields were missing
121

122
  """
123

    
124

    
125
class NoMasterError(ProtocolError):
126
  """The master cannot be reached.
127

128
  This means that the master daemon is not running or the socket has
129
  been removed.
130

131
  """
132

    
133

    
134
class PermissionError(ProtocolError):
135
  """Permission denied while connecting to the master socket.
136

137
  This means the user doesn't have the proper rights.
138

139
  """
140

    
141

    
142
class Transport:
143
  """Low-level transport class.
144

145
  This is used on the client side.
146

147
  This could be replace by any other class that provides the same
148
  semantics to the Client. This means:
149
    - can send messages and receive messages
150
    - safe for multithreading
151

152
  """
153

    
154
  def __init__(self, address, timeouts=None):
155
    """Constructor for the Client class.
156

157
    Arguments:
158
      - address: a valid address the the used transport class
159
      - timeout: a list of timeouts, to be used on connect and read/write
160

161
    There are two timeouts used since we might want to wait for a long
162
    time for a response, but the connect timeout should be lower.
163

164
    If not passed, we use a default of 10 and respectively 60 seconds.
165

166
    Note that on reading data, since the timeout applies to an
167
    invidual receive, it might be that the total duration is longer
168
    than timeout value passed (we make a hard limit at twice the read
169
    timeout).
170

171
    """
172
    self.address = address
173
    if timeouts is None:
174
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
175
    else:
176
      self._ctimeout, self._rwtimeout = timeouts
177

    
178
    self.socket = None
179
    self._buffer = ""
180
    self._msgs = collections.deque()
181

    
182
    try:
183
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
184

    
185
      # Try to connect
186
      try:
187
        utils.Retry(self._Connect, 1.0, self._ctimeout,
188
                    args=(self.socket, address, self._ctimeout))
189
      except utils.RetryTimeout:
190
        raise TimeoutError("Connect timed out")
191

    
192
      self.socket.settimeout(self._rwtimeout)
193
    except (socket.error, NoMasterError):
194
      if self.socket is not None:
195
        self.socket.close()
196
      self.socket = None
197
      raise
198

    
199
  @staticmethod
200
  def _Connect(sock, address, timeout):
201
    sock.settimeout(timeout)
202
    try:
203
      sock.connect(address)
204
    except socket.timeout, err:
205
      raise TimeoutError("Connect timed out: %s" % str(err))
206
    except socket.error, err:
207
      error_code = err.args[0]
208
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
209
        raise NoMasterError(address)
210
      elif error_code in (errno.EPERM, errno.EACCES):
211
        raise PermissionError(address)
212
      elif error_code == errno.EAGAIN:
213
        # Server's socket backlog is full at the moment
214
        raise utils.RetryAgain()
215
      raise
216

    
217
  def _CheckSocket(self):
218
    """Make sure we are connected.
219

220
    """
221
    if self.socket is None:
222
      raise ProtocolError("Connection is closed")
223

    
224
  def Send(self, msg):
225
    """Send a message.
226

227
    This just sends a message and doesn't wait for the response.
228

229
    """
230
    if constants.LUXI_EOM in msg:
231
      raise ProtocolError("Message terminator found in payload")
232

    
233
    self._CheckSocket()
234
    try:
235
      # TODO: sendall is not guaranteed to send everything
236
      self.socket.sendall(msg + constants.LUXI_EOM)
237
    except socket.timeout, err:
238
      raise TimeoutError("Sending timeout: %s" % str(err))
239

    
240
  def Recv(self):
241
    """Try to receive a message from the socket.
242

243
    In case we already have messages queued, we just return from the
244
    queue. Otherwise, we try to read data with a _rwtimeout network
245
    timeout, and making sure we don't go over 2x_rwtimeout as a global
246
    limit.
247

248
    """
249
    self._CheckSocket()
250
    etime = time.time() + self._rwtimeout
251
    while not self._msgs:
252
      if time.time() > etime:
253
        raise TimeoutError("Extended receive timeout")
254
      while True:
255
        try:
256
          data = self.socket.recv(4096)
257
        except socket.timeout, err:
258
          raise TimeoutError("Receive timeout: %s" % str(err))
259
        except socket.error, err:
260
          if err.args and err.args[0] == errno.EAGAIN:
261
            continue
262
          raise
263
        break
264
      if not data:
265
        raise ConnectionClosedError("Connection closed while reading")
266
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
267
      self._buffer = new_msgs.pop()
268
      self._msgs.extend(new_msgs)
269
    return self._msgs.popleft()
270

    
271
  def Call(self, msg):
272
    """Send a message and wait for the response.
273

274
    This is just a wrapper over Send and Recv.
275

276
    """
277
    self.Send(msg)
278
    return self.Recv()
279

    
280
  def Close(self):
281
    """Close the socket"""
282
    if self.socket is not None:
283
      self.socket.close()
284
      self.socket = None
285

    
286

    
287
def ParseRequest(msg):
288
  """Parses a LUXI request message.
289

290
  """
291
  try:
292
    request = serializer.LoadJson(msg)
293
  except ValueError, err:
294
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
295

    
296
  logging.debug("LUXI request: %s", request)
297

    
298
  if not isinstance(request, dict):
299
    logging.error("LUXI request not a dict: %r", msg)
300
    raise ProtocolError("Invalid LUXI request (not a dict)")
301

    
302
  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
303
  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
304
  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
305

    
306
  if method is None or args is None:
307
    logging.error("LUXI request missing method or arguments: %r", msg)
308
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
309
                         " in request): %r") % msg)
310

    
311
  return (method, args, version)
312

    
313

    
314
def ParseResponse(msg):
315
  """Parses a LUXI response message.
316

317
  """
318
  # Parse the result
319
  try:
320
    data = serializer.LoadJson(msg)
321
  except KeyboardInterrupt:
322
    raise
323
  except Exception, err:
324
    raise ProtocolError("Error while deserializing response: %s" % str(err))
325

    
326
  # Validate response
327
  if not (isinstance(data, dict) and
328
          KEY_SUCCESS in data and
329
          KEY_RESULT in data):
330
    raise ProtocolError("Invalid response from server: %r" % data)
331

    
332
  return (data[KEY_SUCCESS], data[KEY_RESULT],
333
          data.get(KEY_VERSION, None)) # pylint: disable=E1103
334

    
335

    
336
def FormatResponse(success, result, version=None):
337
  """Formats a LUXI response message.
338

339
  """
340
  response = {
341
    KEY_SUCCESS: success,
342
    KEY_RESULT: result,
343
    }
344

    
345
  if version is not None:
346
    response[KEY_VERSION] = version
347

    
348
  logging.debug("LUXI response: %s", response)
349

    
350
  return serializer.DumpJson(response)
351

    
352

    
353
def FormatRequest(method, args, version=None):
354
  """Formats a LUXI request message.
355

356
  """
357
  # Build request
358
  request = {
359
    KEY_METHOD: method,
360
    KEY_ARGS: args,
361
    }
362

    
363
  if version is not None:
364
    request[KEY_VERSION] = version
365

    
366
  # Serialize the request
367
  return serializer.DumpJson(request)
368

    
369

    
370
def CallLuxiMethod(transport_cb, method, args, version=None):
371
  """Send a LUXI request via a transport and return the response.
372

373
  """
374
  assert callable(transport_cb)
375

    
376
  request_msg = FormatRequest(method, args, version=version)
377

    
378
  # Send request and wait for response
379
  response_msg = transport_cb(request_msg)
380

    
381
  (success, result, resp_version) = ParseResponse(response_msg)
382

    
383
  # Verify version if there was one in the response
384
  if resp_version is not None and resp_version != version:
385
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
386
                           (version, resp_version))
387

    
388
  if success:
389
    return result
390

    
391
  errors.MaybeRaise(result)
392
  raise RequestError(result)
393

    
394

    
395
class Client(object):
396
  """High-level client implementation.
397

398
  This uses a backing Transport-like class on top of which it
399
  implements data serialization/deserialization.
400

401
  """
402
  def __init__(self, address=None, timeouts=None, transport=Transport):
403
    """Constructor for the Client class.
404

405
    Arguments:
406
      - address: a valid address the the used transport class
407
      - timeout: a list of timeouts, to be used on connect and read/write
408
      - transport: a Transport-like class
409

410

411
    If timeout is not passed, the default timeouts of the transport
412
    class are used.
413

414
    """
415
    if address is None:
416
      address = constants.MASTER_SOCKET
417
    self.address = address
418
    self.timeouts = timeouts
419
    self.transport_class = transport
420
    self.transport = None
421
    self._InitTransport()
422

    
423
  def _InitTransport(self):
424
    """(Re)initialize the transport if needed.
425

426
    """
427
    if self.transport is None:
428
      self.transport = self.transport_class(self.address,
429
                                            timeouts=self.timeouts)
430

    
431
  def _CloseTransport(self):
432
    """Close the transport, ignoring errors.
433

434
    """
435
    if self.transport is None:
436
      return
437
    try:
438
      old_transp = self.transport
439
      self.transport = None
440
      old_transp.Close()
441
    except Exception: # pylint: disable=W0703
442
      pass
443

    
444
  def _SendMethodCall(self, data):
445
    # Send request and wait for response
446
    try:
447
      self._InitTransport()
448
      return self.transport.Call(data)
449
    except Exception:
450
      self._CloseTransport()
451
      raise
452

    
453
  def Close(self):
454
    """Close the underlying connection.
455

456
    """
457
    self._CloseTransport()
458

    
459
  def CallMethod(self, method, args):
460
    """Send a generic request and return the response.
461

462
    """
463
    if not isinstance(args, (list, tuple)):
464
      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
465
                                   " expected list, got %s" % type(args))
466
    return CallLuxiMethod(self._SendMethodCall, method, args,
467
                          version=constants.LUXI_VERSION)
468

    
469
  def SetQueueDrainFlag(self, drain_flag):
470
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
471

    
472
  def SetWatcherPause(self, until):
473
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
474

    
475
  def SubmitJob(self, ops):
476
    ops_state = map(lambda op: op.__getstate__(), ops)
477
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
478

    
479
  def SubmitManyJobs(self, jobs):
480
    jobs_state = []
481
    for ops in jobs:
482
      jobs_state.append([op.__getstate__() for op in ops])
483
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
484

    
485
  def CancelJob(self, job_id):
486
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
487

    
488
  def ArchiveJob(self, job_id):
489
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
490

    
491
  def AutoArchiveJobs(self, age):
492
    timeout = (DEF_RWTO - 1) / 2
493
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
494

    
495
  def WaitForJobChangeOnce(self, job_id, fields,
496
                           prev_job_info, prev_log_serial,
497
                           timeout=WFJC_TIMEOUT):
498
    """Waits for changes on a job.
499

500
    @param job_id: Job ID
501
    @type fields: list
502
    @param fields: List of field names to be observed
503
    @type prev_job_info: None or list
504
    @param prev_job_info: Previously received job information
505
    @type prev_log_serial: None or int/long
506
    @param prev_log_serial: Highest log serial number previously received
507
    @type timeout: int/float
508
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
509
                    be capped to that value)
510

511
    """
512
    assert timeout >= 0, "Timeout can not be negative"
513
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
514
                           (job_id, fields, prev_job_info,
515
                            prev_log_serial,
516
                            min(WFJC_TIMEOUT, timeout)))
517

    
518
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
519
    while True:
520
      result = self.WaitForJobChangeOnce(job_id, fields,
521
                                         prev_job_info, prev_log_serial)
522
      if result != constants.JOB_NOTCHANGED:
523
        break
524
    return result
525

    
526
  def Query(self, what, fields, qfilter):
527
    """Query for resources/items.
528

529
    @param what: One of L{constants.QR_VIA_LUXI}
530
    @type fields: List of strings
531
    @param fields: List of requested fields
532
    @type qfilter: None or list
533
    @param qfilter: Query filter
534
    @rtype: L{objects.QueryResponse}
535

536
    """
537
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
538
    return objects.QueryResponse.FromDict(result)
539

    
540
  def QueryFields(self, what, fields):
541
    """Query for available fields.
542

543
    @param what: One of L{constants.QR_VIA_LUXI}
544
    @type fields: None or list of strings
545
    @param fields: List of requested fields
546
    @rtype: L{objects.QueryFieldsResponse}
547

548
    """
549
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
550
    return objects.QueryFieldsResponse.FromDict(result)
551

    
552
  def QueryJobs(self, job_ids, fields):
553
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
554

    
555
  def QueryInstances(self, names, fields, use_locking):
556
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
557

    
558
  def QueryNodes(self, names, fields, use_locking):
559
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
560

    
561
  def QueryGroups(self, names, fields, use_locking):
562
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
563

    
564
  def QueryNetworks(self, names, fields, use_locking):
565
    return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
566

    
567
  def QueryExports(self, nodes, use_locking):
568
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
569

    
570
  def QueryClusterInfo(self):
571
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
572

    
573
  def QueryConfigValues(self, fields):
574
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
575

    
576
  def QueryTags(self, kind, name):
577
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))