Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 734a2a7c

History | View | Annotate | Download (16 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import serializer
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import utils
42
from ganeti import objects
43

    
44

    
45
KEY_METHOD = "method"
46
KEY_ARGS = "args"
47
KEY_SUCCESS = "success"
48
KEY_RESULT = "result"
49
KEY_VERSION = "version"
50

    
51
REQ_SUBMIT_JOB = "SubmitJob"
52
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
53
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
54
REQ_CANCEL_JOB = "CancelJob"
55
REQ_ARCHIVE_JOB = "ArchiveJob"
56
REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
57
REQ_QUERY = "Query"
58
REQ_QUERY_FIELDS = "QueryFields"
59
REQ_QUERY_JOBS = "QueryJobs"
60
REQ_QUERY_INSTANCES = "QueryInstances"
61
REQ_QUERY_NODES = "QueryNodes"
62
REQ_QUERY_GROUPS = "QueryGroups"
63
REQ_QUERY_EXPORTS = "QueryExports"
64
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
65
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
66
REQ_QUERY_TAGS = "QueryTags"
67
REQ_SET_DRAIN_FLAG = "SetDrainFlag"
68
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
69

    
70
#: List of all LUXI requests
71
REQ_ALL = frozenset([
72
  REQ_ARCHIVE_JOB,
73
  REQ_AUTO_ARCHIVE_JOBS,
74
  REQ_CANCEL_JOB,
75
  REQ_QUERY,
76
  REQ_QUERY_CLUSTER_INFO,
77
  REQ_QUERY_CONFIG_VALUES,
78
  REQ_QUERY_EXPORTS,
79
  REQ_QUERY_FIELDS,
80
  REQ_QUERY_GROUPS,
81
  REQ_QUERY_INSTANCES,
82
  REQ_QUERY_JOBS,
83
  REQ_QUERY_NODES,
84
  REQ_QUERY_TAGS,
85
  REQ_SET_DRAIN_FLAG,
86
  REQ_SET_WATCHER_PAUSE,
87
  REQ_SUBMIT_JOB,
88
  REQ_SUBMIT_MANY_JOBS,
89
  REQ_WAIT_FOR_JOB_CHANGE,
90
  ])
91

    
92
DEF_CTMO = 10
93
DEF_RWTO = 60
94

    
95
# WaitForJobChange timeout
96
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
97

    
98

    
99
class ProtocolError(errors.LuxiError):
100
  """Denotes an error in the LUXI protocol."""
101

    
102

    
103
class ConnectionClosedError(ProtocolError):
104
  """Connection closed error."""
105

    
106

    
107
class TimeoutError(ProtocolError):
108
  """Operation timeout error."""
109

    
110

    
111
class RequestError(ProtocolError):
112
  """Error on request.
113

114
  This signifies an error in the request format or request handling,
115
  but not (e.g.) an error in starting up an instance.
116

117
  Some common conditions that can trigger this exception:
118
    - job submission failed because the job data was wrong
119
    - query failed because required fields were missing
120

121
  """
122

    
123

    
124
class NoMasterError(ProtocolError):
125
  """The master cannot be reached.
126

127
  This means that the master daemon is not running or the socket has
128
  been removed.
129

130
  """
131

    
132

    
133
class PermissionError(ProtocolError):
134
  """Permission denied while connecting to the master socket.
135

136
  This means the user doesn't have the proper rights.
137

138
  """
139

    
140

    
141
class Transport:
142
  """Low-level transport class.
143

144
  This is used on the client side.
145

146
  This could be replace by any other class that provides the same
147
  semantics to the Client. This means:
148
    - can send messages and receive messages
149
    - safe for multithreading
150

151
  """
152

    
153
  def __init__(self, address, timeouts=None):
154
    """Constructor for the Client class.
155

156
    Arguments:
157
      - address: a valid address the the used transport class
158
      - timeout: a list of timeouts, to be used on connect and read/write
159

160
    There are two timeouts used since we might want to wait for a long
161
    time for a response, but the connect timeout should be lower.
162

163
    If not passed, we use a default of 10 and respectively 60 seconds.
164

165
    Note that on reading data, since the timeout applies to an
166
    invidual receive, it might be that the total duration is longer
167
    than timeout value passed (we make a hard limit at twice the read
168
    timeout).
169

170
    """
171
    self.address = address
172
    if timeouts is None:
173
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
174
    else:
175
      self._ctimeout, self._rwtimeout = timeouts
176

    
177
    self.socket = None
178
    self._buffer = ""
179
    self._msgs = collections.deque()
180

    
181
    try:
182
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
183

    
184
      # Try to connect
185
      try:
186
        utils.Retry(self._Connect, 1.0, self._ctimeout,
187
                    args=(self.socket, address, self._ctimeout))
188
      except utils.RetryTimeout:
189
        raise TimeoutError("Connect timed out")
190

    
191
      self.socket.settimeout(self._rwtimeout)
192
    except (socket.error, NoMasterError):
193
      if self.socket is not None:
194
        self.socket.close()
195
      self.socket = None
196
      raise
197

    
198
  @staticmethod
199
  def _Connect(sock, address, timeout):
200
    sock.settimeout(timeout)
201
    try:
202
      sock.connect(address)
203
    except socket.timeout, err:
204
      raise TimeoutError("Connect timed out: %s" % str(err))
205
    except socket.error, err:
206
      error_code = err.args[0]
207
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
208
        raise NoMasterError(address)
209
      elif error_code in (errno.EPERM, errno.EACCES):
210
        raise PermissionError(address)
211
      elif error_code == errno.EAGAIN:
212
        # Server's socket backlog is full at the moment
213
        raise utils.RetryAgain()
214
      raise
215

    
216
  def _CheckSocket(self):
217
    """Make sure we are connected.
218

219
    """
220
    if self.socket is None:
221
      raise ProtocolError("Connection is closed")
222

    
223
  def Send(self, msg):
224
    """Send a message.
225

226
    This just sends a message and doesn't wait for the response.
227

228
    """
229
    if constants.LUXI_EOM in msg:
230
      raise ProtocolError("Message terminator found in payload")
231

    
232
    self._CheckSocket()
233
    try:
234
      # TODO: sendall is not guaranteed to send everything
235
      self.socket.sendall(msg + constants.LUXI_EOM)
236
    except socket.timeout, err:
237
      raise TimeoutError("Sending timeout: %s" % str(err))
238

    
239
  def Recv(self):
240
    """Try to receive a message from the socket.
241

242
    In case we already have messages queued, we just return from the
243
    queue. Otherwise, we try to read data with a _rwtimeout network
244
    timeout, and making sure we don't go over 2x_rwtimeout as a global
245
    limit.
246

247
    """
248
    self._CheckSocket()
249
    etime = time.time() + self._rwtimeout
250
    while not self._msgs:
251
      if time.time() > etime:
252
        raise TimeoutError("Extended receive timeout")
253
      while True:
254
        try:
255
          data = self.socket.recv(4096)
256
        except socket.timeout, err:
257
          raise TimeoutError("Receive timeout: %s" % str(err))
258
        except socket.error, err:
259
          if err.args and err.args[0] == errno.EAGAIN:
260
            continue
261
          raise
262
        break
263
      if not data:
264
        raise ConnectionClosedError("Connection closed while reading")
265
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
266
      self._buffer = new_msgs.pop()
267
      self._msgs.extend(new_msgs)
268
    return self._msgs.popleft()
269

    
270
  def Call(self, msg):
271
    """Send a message and wait for the response.
272

273
    This is just a wrapper over Send and Recv.
274

275
    """
276
    self.Send(msg)
277
    return self.Recv()
278

    
279
  def Close(self):
280
    """Close the socket"""
281
    if self.socket is not None:
282
      self.socket.close()
283
      self.socket = None
284

    
285

    
286
def ParseRequest(msg):
287
  """Parses a LUXI request message.
288

289
  """
290
  try:
291
    request = serializer.LoadJson(msg)
292
  except ValueError, err:
293
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
294

    
295
  logging.debug("LUXI request: %s", request)
296

    
297
  if not isinstance(request, dict):
298
    logging.error("LUXI request not a dict: %r", msg)
299
    raise ProtocolError("Invalid LUXI request (not a dict)")
300

    
301
  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
302
  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
303
  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
304

    
305
  if method is None or args is None:
306
    logging.error("LUXI request missing method or arguments: %r", msg)
307
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
308
                         " in request): %r") % msg)
309

    
310
  return (method, args, version)
311

    
312

    
313
def ParseResponse(msg):
314
  """Parses a LUXI response message.
315

316
  """
317
  # Parse the result
318
  try:
319
    data = serializer.LoadJson(msg)
320
  except KeyboardInterrupt:
321
    raise
322
  except Exception, err:
323
    raise ProtocolError("Error while deserializing response: %s" % str(err))
324

    
325
  # Validate response
326
  if not (isinstance(data, dict) and
327
          KEY_SUCCESS in data and
328
          KEY_RESULT in data):
329
    raise ProtocolError("Invalid response from server: %r" % data)
330

    
331
  return (data[KEY_SUCCESS], data[KEY_RESULT],
332
          data.get(KEY_VERSION, None)) # pylint: disable=E1103
333

    
334

    
335
def FormatResponse(success, result, version=None):
336
  """Formats a LUXI response message.
337

338
  """
339
  response = {
340
    KEY_SUCCESS: success,
341
    KEY_RESULT: result,
342
    }
343

    
344
  if version is not None:
345
    response[KEY_VERSION] = version
346

    
347
  logging.debug("LUXI response: %s", response)
348

    
349
  return serializer.DumpJson(response)
350

    
351

    
352
def FormatRequest(method, args, version=None):
353
  """Formats a LUXI request message.
354

355
  """
356
  # Build request
357
  request = {
358
    KEY_METHOD: method,
359
    KEY_ARGS: args,
360
    }
361

    
362
  if version is not None:
363
    request[KEY_VERSION] = version
364

    
365
  # Serialize the request
366
  return serializer.DumpJson(request)
367

    
368

    
369
def CallLuxiMethod(transport_cb, method, args, version=None):
370
  """Send a LUXI request via a transport and return the response.
371

372
  """
373
  assert callable(transport_cb)
374

    
375
  request_msg = FormatRequest(method, args, version=version)
376

    
377
  # Send request and wait for response
378
  response_msg = transport_cb(request_msg)
379

    
380
  (success, result, resp_version) = ParseResponse(response_msg)
381

    
382
  # Verify version if there was one in the response
383
  if resp_version is not None and resp_version != version:
384
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
385
                           (version, resp_version))
386

    
387
  if success:
388
    return result
389

    
390
  errors.MaybeRaise(result)
391
  raise RequestError(result)
392

    
393

    
394
class Client(object):
395
  """High-level client implementation.
396

397
  This uses a backing Transport-like class on top of which it
398
  implements data serialization/deserialization.
399

400
  """
401
  def __init__(self, address=None, timeouts=None, transport=Transport):
402
    """Constructor for the Client class.
403

404
    Arguments:
405
      - address: a valid address the the used transport class
406
      - timeout: a list of timeouts, to be used on connect and read/write
407
      - transport: a Transport-like class
408

409

410
    If timeout is not passed, the default timeouts of the transport
411
    class are used.
412

413
    """
414
    if address is None:
415
      address = constants.MASTER_SOCKET
416
    self.address = address
417
    self.timeouts = timeouts
418
    self.transport_class = transport
419
    self.transport = None
420
    self._InitTransport()
421

    
422
  def _InitTransport(self):
423
    """(Re)initialize the transport if needed.
424

425
    """
426
    if self.transport is None:
427
      self.transport = self.transport_class(self.address,
428
                                            timeouts=self.timeouts)
429

    
430
  def _CloseTransport(self):
431
    """Close the transport, ignoring errors.
432

433
    """
434
    if self.transport is None:
435
      return
436
    try:
437
      old_transp = self.transport
438
      self.transport = None
439
      old_transp.Close()
440
    except Exception: # pylint: disable=W0703
441
      pass
442

    
443
  def _SendMethodCall(self, data):
444
    # Send request and wait for response
445
    try:
446
      self._InitTransport()
447
      return self.transport.Call(data)
448
    except Exception:
449
      self._CloseTransport()
450
      raise
451

    
452
  def Close(self):
453
    """Close the underlying connection.
454

455
    """
456
    self._CloseTransport()
457

    
458
  def CallMethod(self, method, args):
459
    """Send a generic request and return the response.
460

461
    """
462
    if not isinstance(args, (list, tuple)):
463
      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
464
                                   " expected list, got %s" % type(args))
465
    return CallLuxiMethod(self._SendMethodCall, method, args,
466
                          version=constants.LUXI_VERSION)
467

    
468
  def SetQueueDrainFlag(self, drain_flag):
469
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
470

    
471
  def SetWatcherPause(self, until):
472
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
473

    
474
  def SubmitJob(self, ops):
475
    ops_state = map(lambda op: op.__getstate__(), ops)
476
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
477

    
478
  def SubmitManyJobs(self, jobs):
479
    jobs_state = []
480
    for ops in jobs:
481
      jobs_state.append([op.__getstate__() for op in ops])
482
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
483

    
484
  def CancelJob(self, job_id):
485
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
486

    
487
  def ArchiveJob(self, job_id):
488
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
489

    
490
  def AutoArchiveJobs(self, age):
491
    timeout = (DEF_RWTO - 1) / 2
492
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
493

    
494
  def WaitForJobChangeOnce(self, job_id, fields,
495
                           prev_job_info, prev_log_serial,
496
                           timeout=WFJC_TIMEOUT):
497
    """Waits for changes on a job.
498

499
    @param job_id: Job ID
500
    @type fields: list
501
    @param fields: List of field names to be observed
502
    @type prev_job_info: None or list
503
    @param prev_job_info: Previously received job information
504
    @type prev_log_serial: None or int/long
505
    @param prev_log_serial: Highest log serial number previously received
506
    @type timeout: int/float
507
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
508
                    be capped to that value)
509

510
    """
511
    assert timeout >= 0, "Timeout can not be negative"
512
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
513
                           (job_id, fields, prev_job_info,
514
                            prev_log_serial,
515
                            min(WFJC_TIMEOUT, timeout)))
516

    
517
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
518
    while True:
519
      result = self.WaitForJobChangeOnce(job_id, fields,
520
                                         prev_job_info, prev_log_serial)
521
      if result != constants.JOB_NOTCHANGED:
522
        break
523
    return result
524

    
525
  def Query(self, what, fields, qfilter):
526
    """Query for resources/items.
527

528
    @param what: One of L{constants.QR_VIA_LUXI}
529
    @type fields: List of strings
530
    @param fields: List of requested fields
531
    @type qfilter: None or list
532
    @param qfilter: Query filter
533
    @rtype: L{objects.QueryResponse}
534

535
    """
536
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
537
    return objects.QueryResponse.FromDict(result)
538

    
539
  def QueryFields(self, what, fields):
540
    """Query for available fields.
541

542
    @param what: One of L{constants.QR_VIA_LUXI}
543
    @type fields: None or list of strings
544
    @param fields: List of requested fields
545
    @rtype: L{objects.QueryFieldsResponse}
546

547
    """
548
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
549
    return objects.QueryFieldsResponse.FromDict(result)
550

    
551
  def QueryJobs(self, job_ids, fields):
552
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
553

    
554
  def QueryInstances(self, names, fields, use_locking):
555
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
556

    
557
  def QueryNodes(self, names, fields, use_locking):
558
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
559

    
560
  def QueryGroups(self, names, fields, use_locking):
561
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
562

    
563
  def QueryExports(self, nodes, use_locking):
564
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
565

    
566
  def QueryClusterInfo(self):
567
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
568

    
569
  def QueryConfigValues(self, fields):
570
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
571

    
572
  def QueryTags(self, kind, name):
573
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))