Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 1a2eb2dc

History | View | Annotate | Download (16.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import serializer
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import utils
42
from ganeti import objects
43
from ganeti import pathutils
44

    
45

    
46
KEY_METHOD = "method"
47
KEY_ARGS = "args"
48
KEY_SUCCESS = "success"
49
KEY_RESULT = "result"
50
KEY_VERSION = "version"
51

    
52
REQ_SUBMIT_JOB = "SubmitJob"
53
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
54
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
55
REQ_CANCEL_JOB = "CancelJob"
56
REQ_ARCHIVE_JOB = "ArchiveJob"
57
REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority"
58
REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
59
REQ_QUERY = "Query"
60
REQ_QUERY_FIELDS = "QueryFields"
61
REQ_QUERY_JOBS = "QueryJobs"
62
REQ_QUERY_INSTANCES = "QueryInstances"
63
REQ_QUERY_NODES = "QueryNodes"
64
REQ_QUERY_GROUPS = "QueryGroups"
65
REQ_QUERY_EXPORTS = "QueryExports"
66
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
67
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
68
REQ_QUERY_TAGS = "QueryTags"
69
REQ_SET_DRAIN_FLAG = "SetDrainFlag"
70
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
71

    
72
#: List of all LUXI requests
73
REQ_ALL = frozenset([
74
  REQ_ARCHIVE_JOB,
75
  REQ_AUTO_ARCHIVE_JOBS,
76
  REQ_CANCEL_JOB,
77
  REQ_CHANGE_JOB_PRIORITY,
78
  REQ_QUERY,
79
  REQ_QUERY_CLUSTER_INFO,
80
  REQ_QUERY_CONFIG_VALUES,
81
  REQ_QUERY_EXPORTS,
82
  REQ_QUERY_FIELDS,
83
  REQ_QUERY_GROUPS,
84
  REQ_QUERY_INSTANCES,
85
  REQ_QUERY_JOBS,
86
  REQ_QUERY_NODES,
87
  REQ_QUERY_TAGS,
88
  REQ_SET_DRAIN_FLAG,
89
  REQ_SET_WATCHER_PAUSE,
90
  REQ_SUBMIT_JOB,
91
  REQ_SUBMIT_MANY_JOBS,
92
  REQ_WAIT_FOR_JOB_CHANGE,
93
  ])
94

    
95
DEF_CTMO = 10
96
DEF_RWTO = 60
97

    
98
# WaitForJobChange timeout
99
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
100

    
101

    
102
class ProtocolError(errors.LuxiError):
103
  """Denotes an error in the LUXI protocol."""
104

    
105

    
106
class ConnectionClosedError(ProtocolError):
107
  """Connection closed error."""
108

    
109

    
110
class TimeoutError(ProtocolError):
111
  """Operation timeout error."""
112

    
113

    
114
class RequestError(ProtocolError):
115
  """Error on request.
116

117
  This signifies an error in the request format or request handling,
118
  but not (e.g.) an error in starting up an instance.
119

120
  Some common conditions that can trigger this exception:
121
    - job submission failed because the job data was wrong
122
    - query failed because required fields were missing
123

124
  """
125

    
126

    
127
class NoMasterError(ProtocolError):
128
  """The master cannot be reached.
129

130
  This means that the master daemon is not running or the socket has
131
  been removed.
132

133
  """
134

    
135

    
136
class PermissionError(ProtocolError):
137
  """Permission denied while connecting to the master socket.
138

139
  This means the user doesn't have the proper rights.
140

141
  """
142

    
143

    
144
class Transport:
145
  """Low-level transport class.
146

147
  This is used on the client side.
148

149
  This could be replace by any other class that provides the same
150
  semantics to the Client. This means:
151
    - can send messages and receive messages
152
    - safe for multithreading
153

154
  """
155

    
156
  def __init__(self, address, timeouts=None):
157
    """Constructor for the Client class.
158

159
    Arguments:
160
      - address: a valid address the the used transport class
161
      - timeout: a list of timeouts, to be used on connect and read/write
162

163
    There are two timeouts used since we might want to wait for a long
164
    time for a response, but the connect timeout should be lower.
165

166
    If not passed, we use a default of 10 and respectively 60 seconds.
167

168
    Note that on reading data, since the timeout applies to an
169
    invidual receive, it might be that the total duration is longer
170
    than timeout value passed (we make a hard limit at twice the read
171
    timeout).
172

173
    """
174
    self.address = address
175
    if timeouts is None:
176
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
177
    else:
178
      self._ctimeout, self._rwtimeout = timeouts
179

    
180
    self.socket = None
181
    self._buffer = ""
182
    self._msgs = collections.deque()
183

    
184
    try:
185
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
186

    
187
      # Try to connect
188
      try:
189
        utils.Retry(self._Connect, 1.0, self._ctimeout,
190
                    args=(self.socket, address, self._ctimeout))
191
      except utils.RetryTimeout:
192
        raise TimeoutError("Connect timed out")
193

    
194
      self.socket.settimeout(self._rwtimeout)
195
    except (socket.error, NoMasterError):
196
      if self.socket is not None:
197
        self.socket.close()
198
      self.socket = None
199
      raise
200

    
201
  @staticmethod
202
  def _Connect(sock, address, timeout):
203
    sock.settimeout(timeout)
204
    try:
205
      sock.connect(address)
206
    except socket.timeout, err:
207
      raise TimeoutError("Connect timed out: %s" % str(err))
208
    except socket.error, err:
209
      error_code = err.args[0]
210
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
211
        raise NoMasterError(address)
212
      elif error_code in (errno.EPERM, errno.EACCES):
213
        raise PermissionError(address)
214
      elif error_code == errno.EAGAIN:
215
        # Server's socket backlog is full at the moment
216
        raise utils.RetryAgain()
217
      raise
218

    
219
  def _CheckSocket(self):
220
    """Make sure we are connected.
221

222
    """
223
    if self.socket is None:
224
      raise ProtocolError("Connection is closed")
225

    
226
  def Send(self, msg):
227
    """Send a message.
228

229
    This just sends a message and doesn't wait for the response.
230

231
    """
232
    if constants.LUXI_EOM in msg:
233
      raise ProtocolError("Message terminator found in payload")
234

    
235
    self._CheckSocket()
236
    try:
237
      # TODO: sendall is not guaranteed to send everything
238
      self.socket.sendall(msg + constants.LUXI_EOM)
239
    except socket.timeout, err:
240
      raise TimeoutError("Sending timeout: %s" % str(err))
241

    
242
  def Recv(self):
243
    """Try to receive a message from the socket.
244

245
    In case we already have messages queued, we just return from the
246
    queue. Otherwise, we try to read data with a _rwtimeout network
247
    timeout, and making sure we don't go over 2x_rwtimeout as a global
248
    limit.
249

250
    """
251
    self._CheckSocket()
252
    etime = time.time() + self._rwtimeout
253
    while not self._msgs:
254
      if time.time() > etime:
255
        raise TimeoutError("Extended receive timeout")
256
      while True:
257
        try:
258
          data = self.socket.recv(4096)
259
        except socket.timeout, err:
260
          raise TimeoutError("Receive timeout: %s" % str(err))
261
        except socket.error, err:
262
          if err.args and err.args[0] == errno.EAGAIN:
263
            continue
264
          raise
265
        break
266
      if not data:
267
        raise ConnectionClosedError("Connection closed while reading")
268
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
269
      self._buffer = new_msgs.pop()
270
      self._msgs.extend(new_msgs)
271
    return self._msgs.popleft()
272

    
273
  def Call(self, msg):
274
    """Send a message and wait for the response.
275

276
    This is just a wrapper over Send and Recv.
277

278
    """
279
    self.Send(msg)
280
    return self.Recv()
281

    
282
  def Close(self):
283
    """Close the socket"""
284
    if self.socket is not None:
285
      self.socket.close()
286
      self.socket = None
287

    
288

    
289
def ParseRequest(msg):
290
  """Parses a LUXI request message.
291

292
  """
293
  try:
294
    request = serializer.LoadJson(msg)
295
  except ValueError, err:
296
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
297

    
298
  logging.debug("LUXI request: %s", request)
299

    
300
  if not isinstance(request, dict):
301
    logging.error("LUXI request not a dict: %r", msg)
302
    raise ProtocolError("Invalid LUXI request (not a dict)")
303

    
304
  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
305
  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
306
  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
307

    
308
  if method is None or args is None:
309
    logging.error("LUXI request missing method or arguments: %r", msg)
310
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
311
                         " in request): %r") % msg)
312

    
313
  return (method, args, version)
314

    
315

    
316
def ParseResponse(msg):
317
  """Parses a LUXI response message.
318

319
  """
320
  # Parse the result
321
  try:
322
    data = serializer.LoadJson(msg)
323
  except KeyboardInterrupt:
324
    raise
325
  except Exception, err:
326
    raise ProtocolError("Error while deserializing response: %s" % str(err))
327

    
328
  # Validate response
329
  if not (isinstance(data, dict) and
330
          KEY_SUCCESS in data and
331
          KEY_RESULT in data):
332
    raise ProtocolError("Invalid response from server: %r" % data)
333

    
334
  return (data[KEY_SUCCESS], data[KEY_RESULT],
335
          data.get(KEY_VERSION, None)) # pylint: disable=E1103
336

    
337

    
338
def FormatResponse(success, result, version=None):
339
  """Formats a LUXI response message.
340

341
  """
342
  response = {
343
    KEY_SUCCESS: success,
344
    KEY_RESULT: result,
345
    }
346

    
347
  if version is not None:
348
    response[KEY_VERSION] = version
349

    
350
  logging.debug("LUXI response: %s", response)
351

    
352
  return serializer.DumpJson(response)
353

    
354

    
355
def FormatRequest(method, args, version=None):
356
  """Formats a LUXI request message.
357

358
  """
359
  # Build request
360
  request = {
361
    KEY_METHOD: method,
362
    KEY_ARGS: args,
363
    }
364

    
365
  if version is not None:
366
    request[KEY_VERSION] = version
367

    
368
  # Serialize the request
369
  return serializer.DumpJson(request)
370

    
371

    
372
def CallLuxiMethod(transport_cb, method, args, version=None):
373
  """Send a LUXI request via a transport and return the response.
374

375
  """
376
  assert callable(transport_cb)
377

    
378
  request_msg = FormatRequest(method, args, version=version)
379

    
380
  # Send request and wait for response
381
  response_msg = transport_cb(request_msg)
382

    
383
  (success, result, resp_version) = ParseResponse(response_msg)
384

    
385
  # Verify version if there was one in the response
386
  if resp_version is not None and resp_version != version:
387
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
388
                           (version, resp_version))
389

    
390
  if success:
391
    return result
392

    
393
  errors.MaybeRaise(result)
394
  raise RequestError(result)
395

    
396

    
397
class Client(object):
398
  """High-level client implementation.
399

400
  This uses a backing Transport-like class on top of which it
401
  implements data serialization/deserialization.
402

403
  """
404
  def __init__(self, address=None, timeouts=None, transport=Transport):
405
    """Constructor for the Client class.
406

407
    Arguments:
408
      - address: a valid address the the used transport class
409
      - timeout: a list of timeouts, to be used on connect and read/write
410
      - transport: a Transport-like class
411

412

413
    If timeout is not passed, the default timeouts of the transport
414
    class are used.
415

416
    """
417
    if address is None:
418
      address = pathutils.MASTER_SOCKET
419
    self.address = address
420
    self.timeouts = timeouts
421
    self.transport_class = transport
422
    self.transport = None
423
    self._InitTransport()
424

    
425
  def _InitTransport(self):
426
    """(Re)initialize the transport if needed.
427

428
    """
429
    if self.transport is None:
430
      self.transport = self.transport_class(self.address,
431
                                            timeouts=self.timeouts)
432

    
433
  def _CloseTransport(self):
434
    """Close the transport, ignoring errors.
435

436
    """
437
    if self.transport is None:
438
      return
439
    try:
440
      old_transp = self.transport
441
      self.transport = None
442
      old_transp.Close()
443
    except Exception: # pylint: disable=W0703
444
      pass
445

    
446
  def _SendMethodCall(self, data):
447
    # Send request and wait for response
448
    try:
449
      self._InitTransport()
450
      return self.transport.Call(data)
451
    except Exception:
452
      self._CloseTransport()
453
      raise
454

    
455
  def Close(self):
456
    """Close the underlying connection.
457

458
    """
459
    self._CloseTransport()
460

    
461
  def CallMethod(self, method, args):
462
    """Send a generic request and return the response.
463

464
    """
465
    if not isinstance(args, (list, tuple)):
466
      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
467
                                   " expected list, got %s" % type(args))
468
    return CallLuxiMethod(self._SendMethodCall, method, args,
469
                          version=constants.LUXI_VERSION)
470

    
471
  def SetQueueDrainFlag(self, drain_flag):
472
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
473

    
474
  def SetWatcherPause(self, until):
475
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
476

    
477
  def SubmitJob(self, ops):
478
    ops_state = map(lambda op: op.__getstate__(), ops)
479
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
480

    
481
  def SubmitManyJobs(self, jobs):
482
    jobs_state = []
483
    for ops in jobs:
484
      jobs_state.append([op.__getstate__() for op in ops])
485
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
486

    
487
  def CancelJob(self, job_id):
488
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
489

    
490
  def ArchiveJob(self, job_id):
491
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
492

    
493
  def ChangeJobPriority(self, job_id, priority):
494
    return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
495

    
496
  def AutoArchiveJobs(self, age):
497
    timeout = (DEF_RWTO - 1) / 2
498
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
499

    
500
  def WaitForJobChangeOnce(self, job_id, fields,
501
                           prev_job_info, prev_log_serial,
502
                           timeout=WFJC_TIMEOUT):
503
    """Waits for changes on a job.
504

505
    @param job_id: Job ID
506
    @type fields: list
507
    @param fields: List of field names to be observed
508
    @type prev_job_info: None or list
509
    @param prev_job_info: Previously received job information
510
    @type prev_log_serial: None or int/long
511
    @param prev_log_serial: Highest log serial number previously received
512
    @type timeout: int/float
513
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
514
                    be capped to that value)
515

516
    """
517
    assert timeout >= 0, "Timeout can not be negative"
518
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
519
                           (job_id, fields, prev_job_info,
520
                            prev_log_serial,
521
                            min(WFJC_TIMEOUT, timeout)))
522

    
523
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
524
    while True:
525
      result = self.WaitForJobChangeOnce(job_id, fields,
526
                                         prev_job_info, prev_log_serial)
527
      if result != constants.JOB_NOTCHANGED:
528
        break
529
    return result
530

    
531
  def Query(self, what, fields, qfilter):
532
    """Query for resources/items.
533

534
    @param what: One of L{constants.QR_VIA_LUXI}
535
    @type fields: List of strings
536
    @param fields: List of requested fields
537
    @type qfilter: None or list
538
    @param qfilter: Query filter
539
    @rtype: L{objects.QueryResponse}
540

541
    """
542
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
543
    return objects.QueryResponse.FromDict(result)
544

    
545
  def QueryFields(self, what, fields):
546
    """Query for available fields.
547

548
    @param what: One of L{constants.QR_VIA_LUXI}
549
    @type fields: None or list of strings
550
    @param fields: List of requested fields
551
    @rtype: L{objects.QueryFieldsResponse}
552

553
    """
554
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
555
    return objects.QueryFieldsResponse.FromDict(result)
556

    
557
  def QueryJobs(self, job_ids, fields):
558
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
559

    
560
  def QueryInstances(self, names, fields, use_locking):
561
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
562

    
563
  def QueryNodes(self, names, fields, use_locking):
564
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
565

    
566
  def QueryGroups(self, names, fields, use_locking):
567
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
568

    
569
  def QueryExports(self, nodes, use_locking):
570
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
571

    
572
  def QueryClusterInfo(self):
573
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
574

    
575
  def QueryConfigValues(self, fields):
576
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
577

    
578
  def QueryTags(self, kind, name):
579
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))