Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ fbeb41e6

History | View | Annotate | Download (16.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import serializer
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import utils
42
from ganeti import objects
43
from ganeti import pathutils
44

    
45

    
46
KEY_METHOD = constants.LUXI_KEY_METHOD
47
KEY_ARGS = constants.LUXI_KEY_ARGS
48
KEY_SUCCESS = constants.LUXI_KEY_SUCCESS
49
KEY_RESULT = constants.LUXI_KEY_RESULT
50
KEY_VERSION = constants.LUXI_KEY_VERSION
51

    
52
REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB
53
REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE
54
REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS
55
REQ_PICKUP_JOB = constants.LUXI_REQ_PICKUP_JOB
56
REQ_WAIT_FOR_JOB_CHANGE = constants.LUXI_REQ_WAIT_FOR_JOB_CHANGE
57
REQ_CANCEL_JOB = constants.LUXI_REQ_CANCEL_JOB
58
REQ_ARCHIVE_JOB = constants.LUXI_REQ_ARCHIVE_JOB
59
REQ_CHANGE_JOB_PRIORITY = constants.LUXI_REQ_CHANGE_JOB_PRIORITY
60
REQ_AUTO_ARCHIVE_JOBS = constants.LUXI_REQ_AUTO_ARCHIVE_JOBS
61
REQ_QUERY = constants.LUXI_REQ_QUERY
62
REQ_QUERY_FIELDS = constants.LUXI_REQ_QUERY_FIELDS
63
REQ_QUERY_JOBS = constants.LUXI_REQ_QUERY_JOBS
64
REQ_QUERY_INSTANCES = constants.LUXI_REQ_QUERY_INSTANCES
65
REQ_QUERY_NODES = constants.LUXI_REQ_QUERY_NODES
66
REQ_QUERY_GROUPS = constants.LUXI_REQ_QUERY_GROUPS
67
REQ_QUERY_NETWORKS = constants.LUXI_REQ_QUERY_NETWORKS
68
REQ_QUERY_EXPORTS = constants.LUXI_REQ_QUERY_EXPORTS
69
REQ_QUERY_CONFIG_VALUES = constants.LUXI_REQ_QUERY_CONFIG_VALUES
70
REQ_QUERY_CLUSTER_INFO = constants.LUXI_REQ_QUERY_CLUSTER_INFO
71
REQ_QUERY_TAGS = constants.LUXI_REQ_QUERY_TAGS
72
REQ_SET_DRAIN_FLAG = constants.LUXI_REQ_SET_DRAIN_FLAG
73
REQ_SET_WATCHER_PAUSE = constants.LUXI_REQ_SET_WATCHER_PAUSE
74
REQ_ALL = constants.LUXI_REQ_ALL
75

    
76
DEF_CTMO = constants.LUXI_DEF_CTMO
77
DEF_RWTO = constants.LUXI_DEF_RWTO
78
WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT
79

    
80

    
81
class ProtocolError(errors.LuxiError):
82
  """Denotes an error in the LUXI protocol."""
83

    
84

    
85
class ConnectionClosedError(ProtocolError):
86
  """Connection closed error."""
87

    
88

    
89
class TimeoutError(ProtocolError):
90
  """Operation timeout error."""
91

    
92

    
93
class RequestError(ProtocolError):
94
  """Error on request.
95

96
  This signifies an error in the request format or request handling,
97
  but not (e.g.) an error in starting up an instance.
98

99
  Some common conditions that can trigger this exception:
100
    - job submission failed because the job data was wrong
101
    - query failed because required fields were missing
102

103
  """
104

    
105

    
106
class NoMasterError(ProtocolError):
107
  """The master cannot be reached.
108

109
  This means that the master daemon is not running or the socket has
110
  been removed.
111

112
  """
113

    
114

    
115
class PermissionError(ProtocolError):
116
  """Permission denied while connecting to the master socket.
117

118
  This means the user doesn't have the proper rights.
119

120
  """
121

    
122

    
123
class Transport:
124
  """Low-level transport class.
125

126
  This is used on the client side.
127

128
  This could be replace by any other class that provides the same
129
  semantics to the Client. This means:
130
    - can send messages and receive messages
131
    - safe for multithreading
132

133
  """
134

    
135
  def __init__(self, address, timeouts=None):
136
    """Constructor for the Client class.
137

138
    Arguments:
139
      - address: a valid address the the used transport class
140
      - timeout: a list of timeouts, to be used on connect and read/write
141

142
    There are two timeouts used since we might want to wait for a long
143
    time for a response, but the connect timeout should be lower.
144

145
    If not passed, we use a default of 10 and respectively 60 seconds.
146

147
    Note that on reading data, since the timeout applies to an
148
    invidual receive, it might be that the total duration is longer
149
    than timeout value passed (we make a hard limit at twice the read
150
    timeout).
151

152
    """
153
    self.address = address
154
    if timeouts is None:
155
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
156
    else:
157
      self._ctimeout, self._rwtimeout = timeouts
158

    
159
    self.socket = None
160
    self._buffer = ""
161
    self._msgs = collections.deque()
162

    
163
    try:
164
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
165

    
166
      # Try to connect
167
      try:
168
        utils.Retry(self._Connect, 1.0, self._ctimeout,
169
                    args=(self.socket, address, self._ctimeout))
170
      except utils.RetryTimeout:
171
        raise TimeoutError("Connect timed out")
172

    
173
      self.socket.settimeout(self._rwtimeout)
174
    except (socket.error, NoMasterError):
175
      if self.socket is not None:
176
        self.socket.close()
177
      self.socket = None
178
      raise
179

    
180
  @staticmethod
181
  def _Connect(sock, address, timeout):
182
    sock.settimeout(timeout)
183
    try:
184
      sock.connect(address)
185
    except socket.timeout, err:
186
      raise TimeoutError("Connect timed out: %s" % str(err))
187
    except socket.error, err:
188
      error_code = err.args[0]
189
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
190
        raise NoMasterError(address)
191
      elif error_code in (errno.EPERM, errno.EACCES):
192
        raise PermissionError(address)
193
      elif error_code == errno.EAGAIN:
194
        # Server's socket backlog is full at the moment
195
        raise utils.RetryAgain()
196
      raise
197

    
198
  def _CheckSocket(self):
199
    """Make sure we are connected.
200

201
    """
202
    if self.socket is None:
203
      raise ProtocolError("Connection is closed")
204

    
205
  def Send(self, msg):
206
    """Send a message.
207

208
    This just sends a message and doesn't wait for the response.
209

210
    """
211
    if constants.LUXI_EOM in msg:
212
      raise ProtocolError("Message terminator found in payload")
213

    
214
    self._CheckSocket()
215
    try:
216
      # TODO: sendall is not guaranteed to send everything
217
      self.socket.sendall(msg + constants.LUXI_EOM)
218
    except socket.timeout, err:
219
      raise TimeoutError("Sending timeout: %s" % str(err))
220

    
221
  def Recv(self):
222
    """Try to receive a message from the socket.
223

224
    In case we already have messages queued, we just return from the
225
    queue. Otherwise, we try to read data with a _rwtimeout network
226
    timeout, and making sure we don't go over 2x_rwtimeout as a global
227
    limit.
228

229
    """
230
    self._CheckSocket()
231
    etime = time.time() + self._rwtimeout
232
    while not self._msgs:
233
      if time.time() > etime:
234
        raise TimeoutError("Extended receive timeout")
235
      while True:
236
        try:
237
          data = self.socket.recv(4096)
238
        except socket.timeout, err:
239
          raise TimeoutError("Receive timeout: %s" % str(err))
240
        except socket.error, err:
241
          if err.args and err.args[0] == errno.EAGAIN:
242
            continue
243
          raise
244
        break
245
      if not data:
246
        raise ConnectionClosedError("Connection closed while reading")
247
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
248
      self._buffer = new_msgs.pop()
249
      self._msgs.extend(new_msgs)
250
    return self._msgs.popleft()
251

    
252
  def Call(self, msg):
253
    """Send a message and wait for the response.
254

255
    This is just a wrapper over Send and Recv.
256

257
    """
258
    self.Send(msg)
259
    return self.Recv()
260

    
261
  def Close(self):
262
    """Close the socket"""
263
    if self.socket is not None:
264
      self.socket.close()
265
      self.socket = None
266

    
267

    
268
def ParseRequest(msg):
269
  """Parses a LUXI request message.
270

271
  """
272
  try:
273
    request = serializer.LoadJson(msg)
274
  except ValueError, err:
275
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
276

    
277
  logging.debug("LUXI request: %s", request)
278

    
279
  if not isinstance(request, dict):
280
    logging.error("LUXI request not a dict: %r", msg)
281
    raise ProtocolError("Invalid LUXI request (not a dict)")
282

    
283
  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
284
  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
285
  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
286

    
287
  if method is None or args is None:
288
    logging.error("LUXI request missing method or arguments: %r", msg)
289
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
290
                         " in request): %r") % msg)
291

    
292
  return (method, args, version)
293

    
294

    
295
def ParseResponse(msg):
296
  """Parses a LUXI response message.
297

298
  """
299
  # Parse the result
300
  try:
301
    data = serializer.LoadJson(msg)
302
  except KeyboardInterrupt:
303
    raise
304
  except Exception, err:
305
    raise ProtocolError("Error while deserializing response: %s" % str(err))
306

    
307
  # Validate response
308
  if not (isinstance(data, dict) and
309
          KEY_SUCCESS in data and
310
          KEY_RESULT in data):
311
    raise ProtocolError("Invalid response from server: %r" % data)
312

    
313
  return (data[KEY_SUCCESS], data[KEY_RESULT],
314
          data.get(KEY_VERSION, None)) # pylint: disable=E1103
315

    
316

    
317
def FormatResponse(success, result, version=None):
318
  """Formats a LUXI response message.
319

320
  """
321
  response = {
322
    KEY_SUCCESS: success,
323
    KEY_RESULT: result,
324
    }
325

    
326
  if version is not None:
327
    response[KEY_VERSION] = version
328

    
329
  logging.debug("LUXI response: %s", response)
330

    
331
  return serializer.DumpJson(response)
332

    
333

    
334
def FormatRequest(method, args, version=None):
335
  """Formats a LUXI request message.
336

337
  """
338
  # Build request
339
  request = {
340
    KEY_METHOD: method,
341
    KEY_ARGS: args,
342
    }
343

    
344
  if version is not None:
345
    request[KEY_VERSION] = version
346

    
347
  # Serialize the request
348
  return serializer.DumpJson(request)
349

    
350

    
351
def CallLuxiMethod(transport_cb, method, args, version=None):
352
  """Send a LUXI request via a transport and return the response.
353

354
  """
355
  assert callable(transport_cb)
356

    
357
  request_msg = FormatRequest(method, args, version=version)
358

    
359
  # Send request and wait for response
360
  response_msg = transport_cb(request_msg)
361

    
362
  (success, result, resp_version) = ParseResponse(response_msg)
363

    
364
  # Verify version if there was one in the response
365
  if resp_version is not None and resp_version != version:
366
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
367
                           (version, resp_version))
368

    
369
  if success:
370
    return result
371

    
372
  errors.MaybeRaise(result)
373
  raise RequestError(result)
374

    
375

    
376
class Client(object):
377
  """High-level client implementation.
378

379
  This uses a backing Transport-like class on top of which it
380
  implements data serialization/deserialization.
381

382
  """
383
  def __init__(self, address=None, timeouts=None, transport=Transport):
384
    """Constructor for the Client class.
385

386
    Arguments:
387
      - address: a valid address the the used transport class
388
      - timeout: a list of timeouts, to be used on connect and read/write
389
      - transport: a Transport-like class
390

391

392
    If timeout is not passed, the default timeouts of the transport
393
    class are used.
394

395
    """
396
    if address is None:
397
      address = pathutils.MASTER_SOCKET
398
    self.address = address
399
    self.timeouts = timeouts
400
    self.transport_class = transport
401
    self.transport = None
402
    self._InitTransport()
403

    
404
  def _InitTransport(self):
405
    """(Re)initialize the transport if needed.
406

407
    """
408
    if self.transport is None:
409
      self.transport = self.transport_class(self.address,
410
                                            timeouts=self.timeouts)
411

    
412
  def _CloseTransport(self):
413
    """Close the transport, ignoring errors.
414

415
    """
416
    if self.transport is None:
417
      return
418
    try:
419
      old_transp = self.transport
420
      self.transport = None
421
      old_transp.Close()
422
    except Exception: # pylint: disable=W0703
423
      pass
424

    
425
  def _SendMethodCall(self, data):
426
    # Send request and wait for response
427
    try:
428
      self._InitTransport()
429
      return self.transport.Call(data)
430
    except Exception:
431
      self._CloseTransport()
432
      raise
433

    
434
  def Close(self):
435
    """Close the underlying connection.
436

437
    """
438
    self._CloseTransport()
439

    
440
  def CallMethod(self, method, args):
441
    """Send a generic request and return the response.
442

443
    """
444
    if not isinstance(args, (list, tuple)):
445
      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
446
                                   " expected list, got %s" % type(args))
447
    return CallLuxiMethod(self._SendMethodCall, method, args,
448
                          version=constants.LUXI_VERSION)
449

    
450
  def SetQueueDrainFlag(self, drain_flag):
451
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
452

    
453
  def SetWatcherPause(self, until):
454
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
455

    
456
  def PickupJob(self, job):
457
    return self.CallMethod(REQ_PICKUP_JOB, (job,))
458

    
459
  def SubmitJob(self, ops):
460
    ops_state = map(lambda op: op.__getstate__(), ops)
461
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
462

    
463
  def SubmitJobToDrainedQueue(self, ops):
464
    ops_state = map(lambda op: op.__getstate__(), ops)
465
    return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
466

    
467
  def SubmitManyJobs(self, jobs):
468
    jobs_state = []
469
    for ops in jobs:
470
      jobs_state.append([op.__getstate__() for op in ops])
471
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
472

    
473
  def CancelJob(self, job_id):
474
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
475

    
476
  def ArchiveJob(self, job_id):
477
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
478

    
479
  def ChangeJobPriority(self, job_id, priority):
480
    return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
481

    
482
  def AutoArchiveJobs(self, age):
483
    timeout = (DEF_RWTO - 1) / 2
484
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
485

    
486
  def WaitForJobChangeOnce(self, job_id, fields,
487
                           prev_job_info, prev_log_serial,
488
                           timeout=WFJC_TIMEOUT):
489
    """Waits for changes on a job.
490

491
    @param job_id: Job ID
492
    @type fields: list
493
    @param fields: List of field names to be observed
494
    @type prev_job_info: None or list
495
    @param prev_job_info: Previously received job information
496
    @type prev_log_serial: None or int/long
497
    @param prev_log_serial: Highest log serial number previously received
498
    @type timeout: int/float
499
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
500
                    be capped to that value)
501

502
    """
503
    assert timeout >= 0, "Timeout can not be negative"
504
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
505
                           (job_id, fields, prev_job_info,
506
                            prev_log_serial,
507
                            min(WFJC_TIMEOUT, timeout)))
508

    
509
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
510
    while True:
511
      result = self.WaitForJobChangeOnce(job_id, fields,
512
                                         prev_job_info, prev_log_serial)
513
      if result != constants.JOB_NOTCHANGED:
514
        break
515
    return result
516

    
517
  def Query(self, what, fields, qfilter):
518
    """Query for resources/items.
519

520
    @param what: One of L{constants.QR_VIA_LUXI}
521
    @type fields: List of strings
522
    @param fields: List of requested fields
523
    @type qfilter: None or list
524
    @param qfilter: Query filter
525
    @rtype: L{objects.QueryResponse}
526

527
    """
528
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
529
    return objects.QueryResponse.FromDict(result)
530

    
531
  def QueryFields(self, what, fields):
532
    """Query for available fields.
533

534
    @param what: One of L{constants.QR_VIA_LUXI}
535
    @type fields: None or list of strings
536
    @param fields: List of requested fields
537
    @rtype: L{objects.QueryFieldsResponse}
538

539
    """
540
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
541
    return objects.QueryFieldsResponse.FromDict(result)
542

    
543
  def QueryJobs(self, job_ids, fields):
544
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
545

    
546
  def QueryInstances(self, names, fields, use_locking):
547
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
548

    
549
  def QueryNodes(self, names, fields, use_locking):
550
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
551

    
552
  def QueryGroups(self, names, fields, use_locking):
553
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
554

    
555
  def QueryNetworks(self, names, fields, use_locking):
556
    return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
557

    
558
  def QueryExports(self, nodes, use_locking):
559
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
560

    
561
  def QueryClusterInfo(self):
562
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
563

    
564
  def QueryConfigValues(self, fields):
565
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
566

    
567
  def QueryTags(self, kind, name):
568
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))