Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 7142485a

History | View | Annotate | Download (15.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import serializer
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import utils
42
from ganeti import objects
43

    
44

    
45
KEY_METHOD = "method"
46
KEY_ARGS = "args"
47
KEY_SUCCESS = "success"
48
KEY_RESULT = "result"
49
KEY_VERSION = "version"
50

    
51
REQ_SUBMIT_JOB = "SubmitJob"
52
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
53
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
54
REQ_CANCEL_JOB = "CancelJob"
55
REQ_ARCHIVE_JOB = "ArchiveJob"
56
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
57
REQ_QUERY = "Query"
58
REQ_QUERY_FIELDS = "QueryFields"
59
REQ_QUERY_JOBS = "QueryJobs"
60
REQ_QUERY_INSTANCES = "QueryInstances"
61
REQ_QUERY_NODES = "QueryNodes"
62
REQ_QUERY_GROUPS = "QueryGroups"
63
REQ_QUERY_EXPORTS = "QueryExports"
64
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
65
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
66
REQ_QUERY_TAGS = "QueryTags"
67
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
68
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
69

    
70
DEF_CTMO = 10
71
DEF_RWTO = 60
72

    
73
# WaitForJobChange timeout
74
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
75

    
76

    
77
class ProtocolError(errors.LuxiError):
78
  """Denotes an error in the LUXI protocol."""
79

    
80

    
81
class ConnectionClosedError(ProtocolError):
82
  """Connection closed error."""
83

    
84

    
85
class TimeoutError(ProtocolError):
86
  """Operation timeout error."""
87

    
88

    
89
class RequestError(ProtocolError):
90
  """Error on request.
91

92
  This signifies an error in the request format or request handling,
93
  but not (e.g.) an error in starting up an instance.
94

95
  Some common conditions that can trigger this exception:
96
    - job submission failed because the job data was wrong
97
    - query failed because required fields were missing
98

99
  """
100

    
101

    
102
class NoMasterError(ProtocolError):
103
  """The master cannot be reached.
104

105
  This means that the master daemon is not running or the socket has
106
  been removed.
107

108
  """
109

    
110

    
111
class PermissionError(ProtocolError):
112
  """Permission denied while connecting to the master socket.
113

114
  This means the user doesn't have the proper rights.
115

116
  """
117

    
118

    
119
class Transport:
120
  """Low-level transport class.
121

122
  This is used on the client side.
123

124
  This could be replace by any other class that provides the same
125
  semantics to the Client. This means:
126
    - can send messages and receive messages
127
    - safe for multithreading
128

129
  """
130

    
131
  def __init__(self, address, timeouts=None):
132
    """Constructor for the Client class.
133

134
    Arguments:
135
      - address: a valid address the the used transport class
136
      - timeout: a list of timeouts, to be used on connect and read/write
137

138
    There are two timeouts used since we might want to wait for a long
139
    time for a response, but the connect timeout should be lower.
140

141
    If not passed, we use a default of 10 and respectively 60 seconds.
142

143
    Note that on reading data, since the timeout applies to an
144
    invidual receive, it might be that the total duration is longer
145
    than timeout value passed (we make a hard limit at twice the read
146
    timeout).
147

148
    """
149
    self.address = address
150
    if timeouts is None:
151
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
152
    else:
153
      self._ctimeout, self._rwtimeout = timeouts
154

    
155
    self.socket = None
156
    self._buffer = ""
157
    self._msgs = collections.deque()
158

    
159
    try:
160
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
161

    
162
      # Try to connect
163
      try:
164
        utils.Retry(self._Connect, 1.0, self._ctimeout,
165
                    args=(self.socket, address, self._ctimeout))
166
      except utils.RetryTimeout:
167
        raise TimeoutError("Connect timed out")
168

    
169
      self.socket.settimeout(self._rwtimeout)
170
    except (socket.error, NoMasterError):
171
      if self.socket is not None:
172
        self.socket.close()
173
      self.socket = None
174
      raise
175

    
176
  @staticmethod
177
  def _Connect(sock, address, timeout):
178
    sock.settimeout(timeout)
179
    try:
180
      sock.connect(address)
181
    except socket.timeout, err:
182
      raise TimeoutError("Connect timed out: %s" % str(err))
183
    except socket.error, err:
184
      error_code = err.args[0]
185
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
186
        raise NoMasterError(address)
187
      elif error_code in (errno.EPERM, errno.EACCES):
188
        raise PermissionError(address)
189
      elif error_code == errno.EAGAIN:
190
        # Server's socket backlog is full at the moment
191
        raise utils.RetryAgain()
192
      raise
193

    
194
  def _CheckSocket(self):
195
    """Make sure we are connected.
196

197
    """
198
    if self.socket is None:
199
      raise ProtocolError("Connection is closed")
200

    
201
  def Send(self, msg):
202
    """Send a message.
203

204
    This just sends a message and doesn't wait for the response.
205

206
    """
207
    if constants.LUXI_EOM in msg:
208
      raise ProtocolError("Message terminator found in payload")
209

    
210
    self._CheckSocket()
211
    try:
212
      # TODO: sendall is not guaranteed to send everything
213
      self.socket.sendall(msg + constants.LUXI_EOM)
214
    except socket.timeout, err:
215
      raise TimeoutError("Sending timeout: %s" % str(err))
216

    
217
  def Recv(self):
218
    """Try to receive a message from the socket.
219

220
    In case we already have messages queued, we just return from the
221
    queue. Otherwise, we try to read data with a _rwtimeout network
222
    timeout, and making sure we don't go over 2x_rwtimeout as a global
223
    limit.
224

225
    """
226
    self._CheckSocket()
227
    etime = time.time() + self._rwtimeout
228
    while not self._msgs:
229
      if time.time() > etime:
230
        raise TimeoutError("Extended receive timeout")
231
      while True:
232
        try:
233
          data = self.socket.recv(4096)
234
        except socket.timeout, err:
235
          raise TimeoutError("Receive timeout: %s" % str(err))
236
        except socket.error, err:
237
          if err.args and err.args[0] == errno.EAGAIN:
238
            continue
239
          raise
240
        break
241
      if not data:
242
        raise ConnectionClosedError("Connection closed while reading")
243
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
244
      self._buffer = new_msgs.pop()
245
      self._msgs.extend(new_msgs)
246
    return self._msgs.popleft()
247

    
248
  def Call(self, msg):
249
    """Send a message and wait for the response.
250

251
    This is just a wrapper over Send and Recv.
252

253
    """
254
    self.Send(msg)
255
    return self.Recv()
256

    
257
  def Close(self):
258
    """Close the socket"""
259
    if self.socket is not None:
260
      self.socket.close()
261
      self.socket = None
262

    
263

    
264
def ParseRequest(msg):
265
  """Parses a LUXI request message.
266

267
  """
268
  try:
269
    request = serializer.LoadJson(msg)
270
  except ValueError, err:
271
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
272

    
273
  logging.debug("LUXI request: %s", request)
274

    
275
  if not isinstance(request, dict):
276
    logging.error("LUXI request not a dict: %r", msg)
277
    raise ProtocolError("Invalid LUXI request (not a dict)")
278

    
279
  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
280
  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
281
  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
282

    
283
  if method is None or args is None:
284
    logging.error("LUXI request missing method or arguments: %r", msg)
285
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
286
                         " in request): %r") % msg)
287

    
288
  return (method, args, version)
289

    
290

    
291
def ParseResponse(msg):
292
  """Parses a LUXI response message.
293

294
  """
295
  # Parse the result
296
  try:
297
    data = serializer.LoadJson(msg)
298
  except KeyboardInterrupt:
299
    raise
300
  except Exception, err:
301
    raise ProtocolError("Error while deserializing response: %s" % str(err))
302

    
303
  # Validate response
304
  if not (isinstance(data, dict) and
305
          KEY_SUCCESS in data and
306
          KEY_RESULT in data):
307
    raise ProtocolError("Invalid response from server: %r" % data)
308

    
309
  return (data[KEY_SUCCESS], data[KEY_RESULT],
310
          data.get(KEY_VERSION, None)) # pylint: disable=E1103
311

    
312

    
313
def FormatResponse(success, result, version=None):
314
  """Formats a LUXI response message.
315

316
  """
317
  response = {
318
    KEY_SUCCESS: success,
319
    KEY_RESULT: result,
320
    }
321

    
322
  if version is not None:
323
    response[KEY_VERSION] = version
324

    
325
  logging.debug("LUXI response: %s", response)
326

    
327
  return serializer.DumpJson(response)
328

    
329

    
330
def FormatRequest(method, args, version=None):
331
  """Formats a LUXI request message.
332

333
  """
334
  # Build request
335
  request = {
336
    KEY_METHOD: method,
337
    KEY_ARGS: args,
338
    }
339

    
340
  if version is not None:
341
    request[KEY_VERSION] = version
342

    
343
  # Serialize the request
344
  return serializer.DumpJson(request)
345

    
346

    
347
def CallLuxiMethod(transport_cb, method, args, version=None):
348
  """Send a LUXI request via a transport and return the response.
349

350
  """
351
  assert callable(transport_cb)
352

    
353
  request_msg = FormatRequest(method, args, version=version)
354

    
355
  # Send request and wait for response
356
  response_msg = transport_cb(request_msg)
357

    
358
  (success, result, resp_version) = ParseResponse(response_msg)
359

    
360
  # Verify version if there was one in the response
361
  if resp_version is not None and resp_version != version:
362
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
363
                           (version, resp_version))
364

    
365
  if success:
366
    return result
367

    
368
  errors.MaybeRaise(result)
369
  raise RequestError(result)
370

    
371

    
372
class Client(object):
373
  """High-level client implementation.
374

375
  This uses a backing Transport-like class on top of which it
376
  implements data serialization/deserialization.
377

378
  """
379
  def __init__(self, address=None, timeouts=None, transport=Transport):
380
    """Constructor for the Client class.
381

382
    Arguments:
383
      - address: a valid address the the used transport class
384
      - timeout: a list of timeouts, to be used on connect and read/write
385
      - transport: a Transport-like class
386

387

388
    If timeout is not passed, the default timeouts of the transport
389
    class are used.
390

391
    """
392
    if address is None:
393
      address = constants.MASTER_SOCKET
394
    self.address = address
395
    self.timeouts = timeouts
396
    self.transport_class = transport
397
    self.transport = None
398
    self._InitTransport()
399

    
400
  def _InitTransport(self):
401
    """(Re)initialize the transport if needed.
402

403
    """
404
    if self.transport is None:
405
      self.transport = self.transport_class(self.address,
406
                                            timeouts=self.timeouts)
407

    
408
  def _CloseTransport(self):
409
    """Close the transport, ignoring errors.
410

411
    """
412
    if self.transport is None:
413
      return
414
    try:
415
      old_transp = self.transport
416
      self.transport = None
417
      old_transp.Close()
418
    except Exception: # pylint: disable=W0703
419
      pass
420

    
421
  def _SendMethodCall(self, data):
422
    # Send request and wait for response
423
    try:
424
      self._InitTransport()
425
      return self.transport.Call(data)
426
    except Exception:
427
      self._CloseTransport()
428
      raise
429

    
430
  def Close(self):
431
    """Close the underlying connection.
432

433
    """
434
    self._CloseTransport()
435

    
436
  def CallMethod(self, method, args):
437
    """Send a generic request and return the response.
438

439
    """
440
    if not isinstance(args, (list, tuple)):
441
      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
442
                                   " expected list, got %s" % type(args))
443
    return CallLuxiMethod(self._SendMethodCall, method, args,
444
                          version=constants.LUXI_VERSION)
445

    
446
  def SetQueueDrainFlag(self, drain_flag):
447
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, (drain_flag, ))
448

    
449
  def SetWatcherPause(self, until):
450
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
451

    
452
  def SubmitJob(self, ops):
453
    ops_state = map(lambda op: op.__getstate__(), ops)
454
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
455

    
456
  def SubmitManyJobs(self, jobs):
457
    jobs_state = []
458
    for ops in jobs:
459
      jobs_state.append([op.__getstate__() for op in ops])
460
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
461

    
462
  def CancelJob(self, job_id):
463
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
464

    
465
  def ArchiveJob(self, job_id):
466
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
467

    
468
  def AutoArchiveJobs(self, age):
469
    timeout = (DEF_RWTO - 1) / 2
470
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
471

    
472
  def WaitForJobChangeOnce(self, job_id, fields,
473
                           prev_job_info, prev_log_serial,
474
                           timeout=WFJC_TIMEOUT):
475
    """Waits for changes on a job.
476

477
    @param job_id: Job ID
478
    @type fields: list
479
    @param fields: List of field names to be observed
480
    @type prev_job_info: None or list
481
    @param prev_job_info: Previously received job information
482
    @type prev_log_serial: None or int/long
483
    @param prev_log_serial: Highest log serial number previously received
484
    @type timeout: int/float
485
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
486
                    be capped to that value)
487

488
    """
489
    assert timeout >= 0, "Timeout can not be negative"
490
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
491
                           (job_id, fields, prev_job_info,
492
                            prev_log_serial,
493
                            min(WFJC_TIMEOUT, timeout)))
494

    
495
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
496
    while True:
497
      result = self.WaitForJobChangeOnce(job_id, fields,
498
                                         prev_job_info, prev_log_serial)
499
      if result != constants.JOB_NOTCHANGED:
500
        break
501
    return result
502

    
503
  def Query(self, what, fields, qfilter):
504
    """Query for resources/items.
505

506
    @param what: One of L{constants.QR_VIA_LUXI}
507
    @type fields: List of strings
508
    @param fields: List of requested fields
509
    @type qfilter: None or list
510
    @param qfilter: Query filter
511
    @rtype: L{objects.QueryResponse}
512

513
    """
514
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
515
    return objects.QueryResponse.FromDict(result)
516

    
517
  def QueryFields(self, what, fields):
518
    """Query for available fields.
519

520
    @param what: One of L{constants.QR_VIA_LUXI}
521
    @type fields: None or list of strings
522
    @param fields: List of requested fields
523
    @rtype: L{objects.QueryFieldsResponse}
524

525
    """
526
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
527
    return objects.QueryFieldsResponse.FromDict(result)
528

    
529
  def QueryJobs(self, job_ids, fields):
530
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
531

    
532
  def QueryInstances(self, names, fields, use_locking):
533
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
534

    
535
  def QueryNodes(self, names, fields, use_locking):
536
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
537

    
538
  def QueryGroups(self, names, fields, use_locking):
539
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
540

    
541
  def QueryExports(self, nodes, use_locking):
542
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
543

    
544
  def QueryClusterInfo(self):
545
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
546

    
547
  def QueryConfigValues(self, fields):
548
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
549

    
550
  def QueryTags(self, kind, name):
551
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))