Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 61e062dd

History | View | Annotate | Download (16.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import serializer
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import utils
42
from ganeti import objects
43
from ganeti import pathutils
44

    
45

    
46
KEY_METHOD = "method"
47
KEY_ARGS = "args"
48
KEY_SUCCESS = "success"
49
KEY_RESULT = "result"
50
KEY_VERSION = "version"
51

    
52
REQ_SUBMIT_JOB = "SubmitJob"
53
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
54
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
55
REQ_CANCEL_JOB = "CancelJob"
56
REQ_ARCHIVE_JOB = "ArchiveJob"
57
REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority"
58
REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
59
REQ_QUERY = "Query"
60
REQ_QUERY_FIELDS = "QueryFields"
61
REQ_QUERY_JOBS = "QueryJobs"
62
REQ_QUERY_INSTANCES = "QueryInstances"
63
REQ_QUERY_NODES = "QueryNodes"
64
REQ_QUERY_GROUPS = "QueryGroups"
65
REQ_QUERY_NETWORKS = "QueryNetworks"
66
REQ_QUERY_EXPORTS = "QueryExports"
67
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
68
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
69
REQ_QUERY_TAGS = "QueryTags"
70
REQ_SET_DRAIN_FLAG = "SetDrainFlag"
71
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
72

    
73
#: List of all LUXI requests
74
REQ_ALL = frozenset([
75
  REQ_ARCHIVE_JOB,
76
  REQ_AUTO_ARCHIVE_JOBS,
77
  REQ_CANCEL_JOB,
78
  REQ_CHANGE_JOB_PRIORITY,
79
  REQ_QUERY,
80
  REQ_QUERY_CLUSTER_INFO,
81
  REQ_QUERY_CONFIG_VALUES,
82
  REQ_QUERY_EXPORTS,
83
  REQ_QUERY_FIELDS,
84
  REQ_QUERY_GROUPS,
85
  REQ_QUERY_INSTANCES,
86
  REQ_QUERY_JOBS,
87
  REQ_QUERY_NODES,
88
  REQ_QUERY_TAGS,
89
  REQ_SET_DRAIN_FLAG,
90
  REQ_SET_WATCHER_PAUSE,
91
  REQ_SUBMIT_JOB,
92
  REQ_SUBMIT_MANY_JOBS,
93
  REQ_WAIT_FOR_JOB_CHANGE,
94
  ])
95

    
96
DEF_CTMO = 10
97
DEF_RWTO = 60
98

    
99
# WaitForJobChange timeout
100
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
101

    
102

    
103
class ProtocolError(errors.LuxiError):
104
  """Denotes an error in the LUXI protocol."""
105

    
106

    
107
class ConnectionClosedError(ProtocolError):
108
  """Connection closed error."""
109

    
110

    
111
class TimeoutError(ProtocolError):
112
  """Operation timeout error."""
113

    
114

    
115
class RequestError(ProtocolError):
116
  """Error on request.
117

118
  This signifies an error in the request format or request handling,
119
  but not (e.g.) an error in starting up an instance.
120

121
  Some common conditions that can trigger this exception:
122
    - job submission failed because the job data was wrong
123
    - query failed because required fields were missing
124

125
  """
126

    
127

    
128
class NoMasterError(ProtocolError):
129
  """The master cannot be reached.
130

131
  This means that the master daemon is not running or the socket has
132
  been removed.
133

134
  """
135

    
136

    
137
class PermissionError(ProtocolError):
138
  """Permission denied while connecting to the master socket.
139

140
  This means the user doesn't have the proper rights.
141

142
  """
143

    
144

    
145
class Transport:
146
  """Low-level transport class.
147

148
  This is used on the client side.
149

150
  This could be replace by any other class that provides the same
151
  semantics to the Client. This means:
152
    - can send messages and receive messages
153
    - safe for multithreading
154

155
  """
156

    
157
  def __init__(self, address, timeouts=None):
158
    """Constructor for the Client class.
159

160
    Arguments:
161
      - address: a valid address the the used transport class
162
      - timeout: a list of timeouts, to be used on connect and read/write
163

164
    There are two timeouts used since we might want to wait for a long
165
    time for a response, but the connect timeout should be lower.
166

167
    If not passed, we use a default of 10 and respectively 60 seconds.
168

169
    Note that on reading data, since the timeout applies to an
170
    invidual receive, it might be that the total duration is longer
171
    than timeout value passed (we make a hard limit at twice the read
172
    timeout).
173

174
    """
175
    self.address = address
176
    if timeouts is None:
177
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
178
    else:
179
      self._ctimeout, self._rwtimeout = timeouts
180

    
181
    self.socket = None
182
    self._buffer = ""
183
    self._msgs = collections.deque()
184

    
185
    try:
186
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
187

    
188
      # Try to connect
189
      try:
190
        utils.Retry(self._Connect, 1.0, self._ctimeout,
191
                    args=(self.socket, address, self._ctimeout))
192
      except utils.RetryTimeout:
193
        raise TimeoutError("Connect timed out")
194

    
195
      self.socket.settimeout(self._rwtimeout)
196
    except (socket.error, NoMasterError):
197
      if self.socket is not None:
198
        self.socket.close()
199
      self.socket = None
200
      raise
201

    
202
  @staticmethod
203
  def _Connect(sock, address, timeout):
204
    sock.settimeout(timeout)
205
    try:
206
      sock.connect(address)
207
    except socket.timeout, err:
208
      raise TimeoutError("Connect timed out: %s" % str(err))
209
    except socket.error, err:
210
      error_code = err.args[0]
211
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
212
        raise NoMasterError(address)
213
      elif error_code in (errno.EPERM, errno.EACCES):
214
        raise PermissionError(address)
215
      elif error_code == errno.EAGAIN:
216
        # Server's socket backlog is full at the moment
217
        raise utils.RetryAgain()
218
      raise
219

    
220
  def _CheckSocket(self):
221
    """Make sure we are connected.
222

223
    """
224
    if self.socket is None:
225
      raise ProtocolError("Connection is closed")
226

    
227
  def Send(self, msg):
228
    """Send a message.
229

230
    This just sends a message and doesn't wait for the response.
231

232
    """
233
    if constants.LUXI_EOM in msg:
234
      raise ProtocolError("Message terminator found in payload")
235

    
236
    self._CheckSocket()
237
    try:
238
      # TODO: sendall is not guaranteed to send everything
239
      self.socket.sendall(msg + constants.LUXI_EOM)
240
    except socket.timeout, err:
241
      raise TimeoutError("Sending timeout: %s" % str(err))
242

    
243
  def Recv(self):
244
    """Try to receive a message from the socket.
245

246
    In case we already have messages queued, we just return from the
247
    queue. Otherwise, we try to read data with a _rwtimeout network
248
    timeout, and making sure we don't go over 2x_rwtimeout as a global
249
    limit.
250

251
    """
252
    self._CheckSocket()
253
    etime = time.time() + self._rwtimeout
254
    while not self._msgs:
255
      if time.time() > etime:
256
        raise TimeoutError("Extended receive timeout")
257
      while True:
258
        try:
259
          data = self.socket.recv(4096)
260
        except socket.timeout, err:
261
          raise TimeoutError("Receive timeout: %s" % str(err))
262
        except socket.error, err:
263
          if err.args and err.args[0] == errno.EAGAIN:
264
            continue
265
          raise
266
        break
267
      if not data:
268
        raise ConnectionClosedError("Connection closed while reading")
269
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
270
      self._buffer = new_msgs.pop()
271
      self._msgs.extend(new_msgs)
272
    return self._msgs.popleft()
273

    
274
  def Call(self, msg):
275
    """Send a message and wait for the response.
276

277
    This is just a wrapper over Send and Recv.
278

279
    """
280
    self.Send(msg)
281
    return self.Recv()
282

    
283
  def Close(self):
284
    """Close the socket"""
285
    if self.socket is not None:
286
      self.socket.close()
287
      self.socket = None
288

    
289

    
290
def ParseRequest(msg):
291
  """Parses a LUXI request message.
292

293
  """
294
  try:
295
    request = serializer.LoadJson(msg)
296
  except ValueError, err:
297
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
298

    
299
  logging.debug("LUXI request: %s", request)
300

    
301
  if not isinstance(request, dict):
302
    logging.error("LUXI request not a dict: %r", msg)
303
    raise ProtocolError("Invalid LUXI request (not a dict)")
304

    
305
  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
306
  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
307
  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
308

    
309
  if method is None or args is None:
310
    logging.error("LUXI request missing method or arguments: %r", msg)
311
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
312
                         " in request): %r") % msg)
313

    
314
  return (method, args, version)
315

    
316

    
317
def ParseResponse(msg):
318
  """Parses a LUXI response message.
319

320
  """
321
  # Parse the result
322
  try:
323
    data = serializer.LoadJson(msg)
324
  except KeyboardInterrupt:
325
    raise
326
  except Exception, err:
327
    raise ProtocolError("Error while deserializing response: %s" % str(err))
328

    
329
  # Validate response
330
  if not (isinstance(data, dict) and
331
          KEY_SUCCESS in data and
332
          KEY_RESULT in data):
333
    raise ProtocolError("Invalid response from server: %r" % data)
334

    
335
  return (data[KEY_SUCCESS], data[KEY_RESULT],
336
          data.get(KEY_VERSION, None)) # pylint: disable=E1103
337

    
338

    
339
def FormatResponse(success, result, version=None):
340
  """Formats a LUXI response message.
341

342
  """
343
  response = {
344
    KEY_SUCCESS: success,
345
    KEY_RESULT: result,
346
    }
347

    
348
  if version is not None:
349
    response[KEY_VERSION] = version
350

    
351
  logging.debug("LUXI response: %s", response)
352

    
353
  return serializer.DumpJson(response)
354

    
355

    
356
def FormatRequest(method, args, version=None):
357
  """Formats a LUXI request message.
358

359
  """
360
  # Build request
361
  request = {
362
    KEY_METHOD: method,
363
    KEY_ARGS: args,
364
    }
365

    
366
  if version is not None:
367
    request[KEY_VERSION] = version
368

    
369
  # Serialize the request
370
  return serializer.DumpJson(request)
371

    
372

    
373
def CallLuxiMethod(transport_cb, method, args, version=None):
374
  """Send a LUXI request via a transport and return the response.
375

376
  """
377
  assert callable(transport_cb)
378

    
379
  request_msg = FormatRequest(method, args, version=version)
380

    
381
  # Send request and wait for response
382
  response_msg = transport_cb(request_msg)
383

    
384
  (success, result, resp_version) = ParseResponse(response_msg)
385

    
386
  # Verify version if there was one in the response
387
  if resp_version is not None and resp_version != version:
388
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
389
                           (version, resp_version))
390

    
391
  if success:
392
    return result
393

    
394
  errors.MaybeRaise(result)
395
  raise RequestError(result)
396

    
397

    
398
class Client(object):
399
  """High-level client implementation.
400

401
  This uses a backing Transport-like class on top of which it
402
  implements data serialization/deserialization.
403

404
  """
405
  def __init__(self, address=None, timeouts=None, transport=Transport):
406
    """Constructor for the Client class.
407

408
    Arguments:
409
      - address: a valid address the the used transport class
410
      - timeout: a list of timeouts, to be used on connect and read/write
411
      - transport: a Transport-like class
412

413

414
    If timeout is not passed, the default timeouts of the transport
415
    class are used.
416

417
    """
418
    if address is None:
419
      address = pathutils.MASTER_SOCKET
420
    self.address = address
421
    self.timeouts = timeouts
422
    self.transport_class = transport
423
    self.transport = None
424
    self._InitTransport()
425

    
426
  def _InitTransport(self):
427
    """(Re)initialize the transport if needed.
428

429
    """
430
    if self.transport is None:
431
      self.transport = self.transport_class(self.address,
432
                                            timeouts=self.timeouts)
433

    
434
  def _CloseTransport(self):
435
    """Close the transport, ignoring errors.
436

437
    """
438
    if self.transport is None:
439
      return
440
    try:
441
      old_transp = self.transport
442
      self.transport = None
443
      old_transp.Close()
444
    except Exception: # pylint: disable=W0703
445
      pass
446

    
447
  def _SendMethodCall(self, data):
448
    # Send request and wait for response
449
    try:
450
      self._InitTransport()
451
      return self.transport.Call(data)
452
    except Exception:
453
      self._CloseTransport()
454
      raise
455

    
456
  def Close(self):
457
    """Close the underlying connection.
458

459
    """
460
    self._CloseTransport()
461

    
462
  def CallMethod(self, method, args):
463
    """Send a generic request and return the response.
464

465
    """
466
    if not isinstance(args, (list, tuple)):
467
      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
468
                                   " expected list, got %s" % type(args))
469
    return CallLuxiMethod(self._SendMethodCall, method, args,
470
                          version=constants.LUXI_VERSION)
471

    
472
  def SetQueueDrainFlag(self, drain_flag):
473
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
474

    
475
  def SetWatcherPause(self, until):
476
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
477

    
478
  def SubmitJob(self, ops):
479
    ops_state = map(lambda op: op.__getstate__(), ops)
480
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
481

    
482
  def SubmitManyJobs(self, jobs):
483
    jobs_state = []
484
    for ops in jobs:
485
      jobs_state.append([op.__getstate__() for op in ops])
486
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
487

    
488
  def CancelJob(self, job_id):
489
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
490

    
491
  def ArchiveJob(self, job_id):
492
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
493

    
494
  def ChangeJobPriority(self, job_id, priority):
495
    return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
496

    
497
  def AutoArchiveJobs(self, age):
498
    timeout = (DEF_RWTO - 1) / 2
499
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
500

    
501
  def WaitForJobChangeOnce(self, job_id, fields,
502
                           prev_job_info, prev_log_serial,
503
                           timeout=WFJC_TIMEOUT):
504
    """Waits for changes on a job.
505

506
    @param job_id: Job ID
507
    @type fields: list
508
    @param fields: List of field names to be observed
509
    @type prev_job_info: None or list
510
    @param prev_job_info: Previously received job information
511
    @type prev_log_serial: None or int/long
512
    @param prev_log_serial: Highest log serial number previously received
513
    @type timeout: int/float
514
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
515
                    be capped to that value)
516

517
    """
518
    assert timeout >= 0, "Timeout can not be negative"
519
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
520
                           (job_id, fields, prev_job_info,
521
                            prev_log_serial,
522
                            min(WFJC_TIMEOUT, timeout)))
523

    
524
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
525
    while True:
526
      result = self.WaitForJobChangeOnce(job_id, fields,
527
                                         prev_job_info, prev_log_serial)
528
      if result != constants.JOB_NOTCHANGED:
529
        break
530
    return result
531

    
532
  def Query(self, what, fields, qfilter):
533
    """Query for resources/items.
534

535
    @param what: One of L{constants.QR_VIA_LUXI}
536
    @type fields: List of strings
537
    @param fields: List of requested fields
538
    @type qfilter: None or list
539
    @param qfilter: Query filter
540
    @rtype: L{objects.QueryResponse}
541

542
    """
543
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
544
    return objects.QueryResponse.FromDict(result)
545

    
546
  def QueryFields(self, what, fields):
547
    """Query for available fields.
548

549
    @param what: One of L{constants.QR_VIA_LUXI}
550
    @type fields: None or list of strings
551
    @param fields: List of requested fields
552
    @rtype: L{objects.QueryFieldsResponse}
553

554
    """
555
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
556
    return objects.QueryFieldsResponse.FromDict(result)
557

    
558
  def QueryJobs(self, job_ids, fields):
559
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
560

    
561
  def QueryInstances(self, names, fields, use_locking):
562
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
563

    
564
  def QueryNodes(self, names, fields, use_locking):
565
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
566

    
567
  def QueryGroups(self, names, fields, use_locking):
568
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
569

    
570
  def QueryNetworks(self, names, fields, use_locking):
571
    return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
572

    
573
  def QueryExports(self, nodes, use_locking):
574
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
575

    
576
  def QueryClusterInfo(self):
577
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
578

    
579
  def QueryConfigValues(self, fields):
580
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
581

    
582
  def QueryTags(self, kind, name):
583
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))