Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ b459a848

History | View | Annotate | Download (15.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37
import warnings
38

    
39
from ganeti import serializer
40
from ganeti import constants
41
from ganeti import errors
42
from ganeti import utils
43
from ganeti import objects
44

    
45

    
46
KEY_METHOD = "method"
47
KEY_ARGS = "args"
48
KEY_SUCCESS = "success"
49
KEY_RESULT = "result"
50
KEY_VERSION = "version"
51

    
52
REQ_SUBMIT_JOB = "SubmitJob"
53
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
54
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
55
REQ_CANCEL_JOB = "CancelJob"
56
REQ_ARCHIVE_JOB = "ArchiveJob"
57
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
58
REQ_QUERY = "Query"
59
REQ_QUERY_FIELDS = "QueryFields"
60
REQ_QUERY_JOBS = "QueryJobs"
61
REQ_QUERY_INSTANCES = "QueryInstances"
62
REQ_QUERY_NODES = "QueryNodes"
63
REQ_QUERY_GROUPS = "QueryGroups"
64
REQ_QUERY_EXPORTS = "QueryExports"
65
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
66
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
67
REQ_QUERY_TAGS = "QueryTags"
68
REQ_QUERY_LOCKS = "QueryLocks"
69
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
70
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
71

    
72
DEF_CTMO = 10
73
DEF_RWTO = 60
74

    
75
# WaitForJobChange timeout
76
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
77

    
78

    
79
class ProtocolError(errors.LuxiError):
80
  """Denotes an error in the LUXI protocol."""
81

    
82

    
83
class ConnectionClosedError(ProtocolError):
84
  """Connection closed error."""
85

    
86

    
87
class TimeoutError(ProtocolError):
88
  """Operation timeout error."""
89

    
90

    
91
class RequestError(ProtocolError):
92
  """Error on request.
93

94
  This signifies an error in the request format or request handling,
95
  but not (e.g.) an error in starting up an instance.
96

97
  Some common conditions that can trigger this exception:
98
    - job submission failed because the job data was wrong
99
    - query failed because required fields were missing
100

101
  """
102

    
103

    
104
class NoMasterError(ProtocolError):
105
  """The master cannot be reached.
106

107
  This means that the master daemon is not running or the socket has
108
  been removed.
109

110
  """
111

    
112

    
113
class PermissionError(ProtocolError):
114
  """Permission denied while connecting to the master socket.
115

116
  This means the user doesn't have the proper rights.
117

118
  """
119

    
120

    
121
class Transport:
122
  """Low-level transport class.
123

124
  This is used on the client side.
125

126
  This could be replace by any other class that provides the same
127
  semantics to the Client. This means:
128
    - can send messages and receive messages
129
    - safe for multithreading
130

131
  """
132

    
133
  def __init__(self, address, timeouts=None):
134
    """Constructor for the Client class.
135

136
    Arguments:
137
      - address: a valid address the the used transport class
138
      - timeout: a list of timeouts, to be used on connect and read/write
139

140
    There are two timeouts used since we might want to wait for a long
141
    time for a response, but the connect timeout should be lower.
142

143
    If not passed, we use a default of 10 and respectively 60 seconds.
144

145
    Note that on reading data, since the timeout applies to an
146
    invidual receive, it might be that the total duration is longer
147
    than timeout value passed (we make a hard limit at twice the read
148
    timeout).
149

150
    """
151
    self.address = address
152
    if timeouts is None:
153
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
154
    else:
155
      self._ctimeout, self._rwtimeout = timeouts
156

    
157
    self.socket = None
158
    self._buffer = ""
159
    self._msgs = collections.deque()
160

    
161
    try:
162
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
163

    
164
      # Try to connect
165
      try:
166
        utils.Retry(self._Connect, 1.0, self._ctimeout,
167
                    args=(self.socket, address, self._ctimeout))
168
      except utils.RetryTimeout:
169
        raise TimeoutError("Connect timed out")
170

    
171
      self.socket.settimeout(self._rwtimeout)
172
    except (socket.error, NoMasterError):
173
      if self.socket is not None:
174
        self.socket.close()
175
      self.socket = None
176
      raise
177

    
178
  @staticmethod
179
  def _Connect(sock, address, timeout):
180
    sock.settimeout(timeout)
181
    try:
182
      sock.connect(address)
183
    except socket.timeout, err:
184
      raise TimeoutError("Connect timed out: %s" % str(err))
185
    except socket.error, err:
186
      error_code = err.args[0]
187
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
188
        raise NoMasterError(address)
189
      elif error_code in (errno.EPERM, errno.EACCES):
190
        raise PermissionError(address)
191
      elif error_code == errno.EAGAIN:
192
        # Server's socket backlog is full at the moment
193
        raise utils.RetryAgain()
194
      raise
195

    
196
  def _CheckSocket(self):
197
    """Make sure we are connected.
198

199
    """
200
    if self.socket is None:
201
      raise ProtocolError("Connection is closed")
202

    
203
  def Send(self, msg):
204
    """Send a message.
205

206
    This just sends a message and doesn't wait for the response.
207

208
    """
209
    if constants.LUXI_EOM in msg:
210
      raise ProtocolError("Message terminator found in payload")
211

    
212
    self._CheckSocket()
213
    try:
214
      # TODO: sendall is not guaranteed to send everything
215
      self.socket.sendall(msg + constants.LUXI_EOM)
216
    except socket.timeout, err:
217
      raise TimeoutError("Sending timeout: %s" % str(err))
218

    
219
  def Recv(self):
220
    """Try to receive a message from the socket.
221

222
    In case we already have messages queued, we just return from the
223
    queue. Otherwise, we try to read data with a _rwtimeout network
224
    timeout, and making sure we don't go over 2x_rwtimeout as a global
225
    limit.
226

227
    """
228
    self._CheckSocket()
229
    etime = time.time() + self._rwtimeout
230
    while not self._msgs:
231
      if time.time() > etime:
232
        raise TimeoutError("Extended receive timeout")
233
      while True:
234
        try:
235
          data = self.socket.recv(4096)
236
        except socket.timeout, err:
237
          raise TimeoutError("Receive timeout: %s" % str(err))
238
        except socket.error, err:
239
          if err.args and err.args[0] == errno.EAGAIN:
240
            continue
241
          raise
242
        break
243
      if not data:
244
        raise ConnectionClosedError("Connection closed while reading")
245
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
246
      self._buffer = new_msgs.pop()
247
      self._msgs.extend(new_msgs)
248
    return self._msgs.popleft()
249

    
250
  def Call(self, msg):
251
    """Send a message and wait for the response.
252

253
    This is just a wrapper over Send and Recv.
254

255
    """
256
    self.Send(msg)
257
    return self.Recv()
258

    
259
  def Close(self):
260
    """Close the socket"""
261
    if self.socket is not None:
262
      self.socket.close()
263
      self.socket = None
264

    
265

    
266
def ParseRequest(msg):
267
  """Parses a LUXI request message.
268

269
  """
270
  try:
271
    request = serializer.LoadJson(msg)
272
  except ValueError, err:
273
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
274

    
275
  logging.debug("LUXI request: %s", request)
276

    
277
  if not isinstance(request, dict):
278
    logging.error("LUXI request not a dict: %r", msg)
279
    raise ProtocolError("Invalid LUXI request (not a dict)")
280

    
281
  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
282
  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
283
  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
284

    
285
  if method is None or args is None:
286
    logging.error("LUXI request missing method or arguments: %r", msg)
287
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
288
                         " in request): %r") % msg)
289

    
290
  return (method, args, version)
291

    
292

    
293
def ParseResponse(msg):
294
  """Parses a LUXI response message.
295

296
  """
297
  # Parse the result
298
  try:
299
    data = serializer.LoadJson(msg)
300
  except KeyboardInterrupt:
301
    raise
302
  except Exception, err:
303
    raise ProtocolError("Error while deserializing response: %s" % str(err))
304

    
305
  # Validate response
306
  if not (isinstance(data, dict) and
307
          KEY_SUCCESS in data and
308
          KEY_RESULT in data):
309
    raise ProtocolError("Invalid response from server: %r" % data)
310

    
311
  return (data[KEY_SUCCESS], data[KEY_RESULT],
312
          data.get(KEY_VERSION, None)) # pylint: disable=E1103
313

    
314

    
315
def FormatResponse(success, result, version=None):
316
  """Formats a LUXI response message.
317

318
  """
319
  response = {
320
    KEY_SUCCESS: success,
321
    KEY_RESULT: result,
322
    }
323

    
324
  if version is not None:
325
    response[KEY_VERSION] = version
326

    
327
  logging.debug("LUXI response: %s", response)
328

    
329
  return serializer.DumpJson(response)
330

    
331

    
332
def FormatRequest(method, args, version=None):
333
  """Formats a LUXI request message.
334

335
  """
336
  # Build request
337
  request = {
338
    KEY_METHOD: method,
339
    KEY_ARGS: args,
340
    }
341

    
342
  if version is not None:
343
    request[KEY_VERSION] = version
344

    
345
  # Serialize the request
346
  return serializer.DumpJson(request, indent=False)
347

    
348

    
349
def CallLuxiMethod(transport_cb, method, args, version=None):
350
  """Send a LUXI request via a transport and return the response.
351

352
  """
353
  assert callable(transport_cb)
354

    
355
  request_msg = FormatRequest(method, args, version=version)
356

    
357
  # Send request and wait for response
358
  response_msg = transport_cb(request_msg)
359

    
360
  (success, result, resp_version) = ParseResponse(response_msg)
361

    
362
  # Verify version if there was one in the response
363
  if resp_version is not None and resp_version != version:
364
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
365
                           (version, resp_version))
366

    
367
  if success:
368
    return result
369

    
370
  errors.MaybeRaise(result)
371
  raise RequestError(result)
372

    
373

    
374
class Client(object):
375
  """High-level client implementation.
376

377
  This uses a backing Transport-like class on top of which it
378
  implements data serialization/deserialization.
379

380
  """
381
  def __init__(self, address=None, timeouts=None, transport=Transport):
382
    """Constructor for the Client class.
383

384
    Arguments:
385
      - address: a valid address the the used transport class
386
      - timeout: a list of timeouts, to be used on connect and read/write
387
      - transport: a Transport-like class
388

389

390
    If timeout is not passed, the default timeouts of the transport
391
    class are used.
392

393
    """
394
    if address is None:
395
      address = constants.MASTER_SOCKET
396
    self.address = address
397
    self.timeouts = timeouts
398
    self.transport_class = transport
399
    self.transport = None
400
    self._InitTransport()
401

    
402
  def _InitTransport(self):
403
    """(Re)initialize the transport if needed.
404

405
    """
406
    if self.transport is None:
407
      self.transport = self.transport_class(self.address,
408
                                            timeouts=self.timeouts)
409

    
410
  def _CloseTransport(self):
411
    """Close the transport, ignoring errors.
412

413
    """
414
    if self.transport is None:
415
      return
416
    try:
417
      old_transp = self.transport
418
      self.transport = None
419
      old_transp.Close()
420
    except Exception: # pylint: disable=W0703
421
      pass
422

    
423
  def _SendMethodCall(self, data):
424
    # Send request and wait for response
425
    try:
426
      self._InitTransport()
427
      return self.transport.Call(data)
428
    except Exception:
429
      self._CloseTransport()
430
      raise
431

    
432
  def Close(self):
433
    """Close the underlying connection.
434

435
    """
436
    self._CloseTransport()
437

    
438
  def CallMethod(self, method, args):
439
    """Send a generic request and return the response.
440

441
    """
442
    return CallLuxiMethod(self._SendMethodCall, method, args,
443
                          version=constants.LUXI_VERSION)
444

    
445
  def SetQueueDrainFlag(self, drain_flag):
446
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
447

    
448
  def SetWatcherPause(self, until):
449
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
450

    
451
  def SubmitJob(self, ops):
452
    ops_state = map(lambda op: op.__getstate__(), ops)
453
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
454

    
455
  def SubmitManyJobs(self, jobs):
456
    jobs_state = []
457
    for ops in jobs:
458
      jobs_state.append([op.__getstate__() for op in ops])
459
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
460

    
461
  def CancelJob(self, job_id):
462
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
463

    
464
  def ArchiveJob(self, job_id):
465
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
466

    
467
  def AutoArchiveJobs(self, age):
468
    timeout = (DEF_RWTO - 1) / 2
469
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
470

    
471
  def WaitForJobChangeOnce(self, job_id, fields,
472
                           prev_job_info, prev_log_serial,
473
                           timeout=WFJC_TIMEOUT):
474
    """Waits for changes on a job.
475

476
    @param job_id: Job ID
477
    @type fields: list
478
    @param fields: List of field names to be observed
479
    @type prev_job_info: None or list
480
    @param prev_job_info: Previously received job information
481
    @type prev_log_serial: None or int/long
482
    @param prev_log_serial: Highest log serial number previously received
483
    @type timeout: int/float
484
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
485
                    be capped to that value)
486

487
    """
488
    assert timeout >= 0, "Timeout can not be negative"
489
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
490
                           (job_id, fields, prev_job_info,
491
                            prev_log_serial,
492
                            min(WFJC_TIMEOUT, timeout)))
493

    
494
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
495
    while True:
496
      result = self.WaitForJobChangeOnce(job_id, fields,
497
                                         prev_job_info, prev_log_serial)
498
      if result != constants.JOB_NOTCHANGED:
499
        break
500
    return result
501

    
502
  def Query(self, what, fields, filter_):
503
    """Query for resources/items.
504

505
    @param what: One of L{constants.QR_VIA_LUXI}
506
    @type fields: List of strings
507
    @param fields: List of requested fields
508
    @type filter_: None or list
509
    @param filter_: Query filter
510
    @rtype: L{objects.QueryResponse}
511

512
    """
513
    req = objects.QueryRequest(what=what, fields=fields, filter=filter_)
514
    result = self.CallMethod(REQ_QUERY, req.ToDict())
515
    return objects.QueryResponse.FromDict(result)
516

    
517
  def QueryFields(self, what, fields):
518
    """Query for available fields.
519

520
    @param what: One of L{constants.QR_VIA_LUXI}
521
    @type fields: None or list of strings
522
    @param fields: List of requested fields
523
    @rtype: L{objects.QueryFieldsResponse}
524

525
    """
526
    req = objects.QueryFieldsRequest(what=what, fields=fields)
527
    result = self.CallMethod(REQ_QUERY_FIELDS, req.ToDict())
528
    return objects.QueryFieldsResponse.FromDict(result)
529

    
530
  def QueryJobs(self, job_ids, fields):
531
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
532

    
533
  def QueryInstances(self, names, fields, use_locking):
534
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
535

    
536
  def QueryNodes(self, names, fields, use_locking):
537
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
538

    
539
  def QueryGroups(self, names, fields, use_locking):
540
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
541

    
542
  def QueryExports(self, nodes, use_locking):
543
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
544

    
545
  def QueryClusterInfo(self):
546
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
547

    
548
  def QueryConfigValues(self, fields):
549
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
550

    
551
  def QueryTags(self, kind, name):
552
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
553

    
554
  def QueryLocks(self, fields, sync):
555
    warnings.warn("This LUXI call is deprecated and will be removed, use"
556
                  " Query(\"%s\", ...) instead" % constants.QR_LOCK)
557
    return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))