Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ c7406bbe

History | View | Annotate | Download (13.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import serializer
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import utils
42

    
43

    
44
KEY_METHOD = "method"
45
KEY_ARGS = "args"
46
KEY_SUCCESS = "success"
47
KEY_RESULT = "result"
48

    
49
REQ_SUBMIT_JOB = "SubmitJob"
50
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
51
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
52
REQ_CANCEL_JOB = "CancelJob"
53
REQ_ARCHIVE_JOB = "ArchiveJob"
54
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
55
REQ_QUERY_JOBS = "QueryJobs"
56
REQ_QUERY_INSTANCES = "QueryInstances"
57
REQ_QUERY_NODES = "QueryNodes"
58
REQ_QUERY_EXPORTS = "QueryExports"
59
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
60
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
61
REQ_QUERY_TAGS = "QueryTags"
62
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
63
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
64

    
65
DEF_CTMO = 10
66
DEF_RWTO = 60
67

    
68
# WaitForJobChange timeout
69
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
70

    
71

    
72
class ProtocolError(errors.GenericError):
73
  """Denotes an error in the LUXI protocol"""
74

    
75

    
76
class ConnectionClosedError(ProtocolError):
77
  """Connection closed error"""
78

    
79

    
80
class TimeoutError(ProtocolError):
81
  """Operation timeout error"""
82

    
83

    
84
class RequestError(ProtocolError):
85
  """Error on request
86

87
  This signifies an error in the request format or request handling,
88
  but not (e.g.) an error in starting up an instance.
89

90
  Some common conditions that can trigger this exception:
91
    - job submission failed because the job data was wrong
92
    - query failed because required fields were missing
93

94
  """
95

    
96

    
97
class NoMasterError(ProtocolError):
98
  """The master cannot be reached
99

100
  This means that the master daemon is not running or the socket has
101
  been removed.
102

103
  """
104

    
105

    
106
class Transport:
107
  """Low-level transport class.
108

109
  This is used on the client side.
110

111
  This could be replace by any other class that provides the same
112
  semantics to the Client. This means:
113
    - can send messages and receive messages
114
    - safe for multithreading
115

116
  """
117

    
118
  def __init__(self, address, timeouts=None, eom=None):
119
    """Constructor for the Client class.
120

121
    Arguments:
122
      - address: a valid address the the used transport class
123
      - timeout: a list of timeouts, to be used on connect and read/write
124
      - eom: an identifier to be used as end-of-message which the
125
        upper-layer will guarantee that this identifier will not appear
126
        in any message
127

128
    There are two timeouts used since we might want to wait for a long
129
    time for a response, but the connect timeout should be lower.
130

131
    If not passed, we use a default of 10 and respectively 60 seconds.
132

133
    Note that on reading data, since the timeout applies to an
134
    invidual receive, it might be that the total duration is longer
135
    than timeout value passed (we make a hard limit at twice the read
136
    timeout).
137

138
    """
139
    self.address = address
140
    if timeouts is None:
141
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
142
    else:
143
      self._ctimeout, self._rwtimeout = timeouts
144

    
145
    self.socket = None
146
    self._buffer = ""
147
    self._msgs = collections.deque()
148

    
149
    if eom is None:
150
      self.eom = '\3'
151
    else:
152
      self.eom = eom
153

    
154
    try:
155
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
156

    
157
      # Try to connect
158
      try:
159
        utils.Retry(self._Connect, 1.0, self._ctimeout,
160
                    args=(self.socket, address, self._ctimeout))
161
      except utils.RetryTimeout:
162
        raise TimeoutError("Connect timed out")
163

    
164
      self.socket.settimeout(self._rwtimeout)
165
    except (socket.error, NoMasterError):
166
      if self.socket is not None:
167
        self.socket.close()
168
      self.socket = None
169
      raise
170

    
171
  @staticmethod
172
  def _Connect(sock, address, timeout):
173
    sock.settimeout(timeout)
174
    try:
175
      sock.connect(address)
176
    except socket.timeout, err:
177
      raise TimeoutError("Connect timed out: %s" % str(err))
178
    except socket.error, err:
179
      if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
180
        raise NoMasterError(address)
181
      if err.args[0] == errno.EAGAIN:
182
        # Server's socket backlog is full at the moment
183
        raise utils.RetryAgain()
184
      raise
185

    
186
  def _CheckSocket(self):
187
    """Make sure we are connected.
188

189
    """
190
    if self.socket is None:
191
      raise ProtocolError("Connection is closed")
192

    
193
  def Send(self, msg):
194
    """Send a message.
195

196
    This just sends a message and doesn't wait for the response.
197

198
    """
199
    if self.eom in msg:
200
      raise ProtocolError("Message terminator found in payload")
201

    
202
    self._CheckSocket()
203
    try:
204
      # TODO: sendall is not guaranteed to send everything
205
      self.socket.sendall(msg + self.eom)
206
    except socket.timeout, err:
207
      raise TimeoutError("Sending timeout: %s" % str(err))
208

    
209
  def Recv(self):
210
    """Try to receive a message from the socket.
211

212
    In case we already have messages queued, we just return from the
213
    queue. Otherwise, we try to read data with a _rwtimeout network
214
    timeout, and making sure we don't go over 2x_rwtimeout as a global
215
    limit.
216

217
    """
218
    self._CheckSocket()
219
    etime = time.time() + self._rwtimeout
220
    while not self._msgs:
221
      if time.time() > etime:
222
        raise TimeoutError("Extended receive timeout")
223
      while True:
224
        try:
225
          data = self.socket.recv(4096)
226
        except socket.error, err:
227
          if err.args and err.args[0] == errno.EAGAIN:
228
            continue
229
          raise
230
        except socket.timeout, err:
231
          raise TimeoutError("Receive timeout: %s" % str(err))
232
        break
233
      if not data:
234
        raise ConnectionClosedError("Connection closed while reading")
235
      new_msgs = (self._buffer + data).split(self.eom)
236
      self._buffer = new_msgs.pop()
237
      self._msgs.extend(new_msgs)
238
    return self._msgs.popleft()
239

    
240
  def Call(self, msg):
241
    """Send a message and wait for the response.
242

243
    This is just a wrapper over Send and Recv.
244

245
    """
246
    self.Send(msg)
247
    return self.Recv()
248

    
249
  def Close(self):
250
    """Close the socket"""
251
    if self.socket is not None:
252
      self.socket.close()
253
      self.socket = None
254

    
255

    
256
def ParseRequest(msg):
257
  """Parses a LUXI request message.
258

259
  """
260
  try:
261
    request = serializer.LoadJson(msg)
262
  except ValueError, err:
263
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
264

    
265
  logging.debug("LUXI request: %s", request)
266

    
267
  if not isinstance(request, dict):
268
    logging.error("LUXI request not a dict: %r", msg)
269
    raise ProtocolError("Invalid LUXI request (not a dict)")
270

    
271
  method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
272
  args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
273

    
274
  if method is None or args is None:
275
    logging.error("LUXI request missing method or arguments: %r", msg)
276
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
277
                         " in request): %r") % msg)
278

    
279
  return (method, args)
280

    
281

    
282
def ParseResponse(msg):
283
  """Parses a LUXI response message.
284

285
  """
286
  # Parse the result
287
  try:
288
    data = serializer.LoadJson(msg)
289
  except Exception, err:
290
    raise ProtocolError("Error while deserializing response: %s" % str(err))
291

    
292
  # Validate response
293
  if not (isinstance(data, dict) and
294
          KEY_SUCCESS in data and
295
          KEY_RESULT in data):
296
    raise ProtocolError("Invalid response from server: %r" % data)
297

    
298
  return (data[KEY_SUCCESS], data[KEY_RESULT])
299

    
300

    
301
def FormatResponse(success, result):
302
  """Formats a LUXI response message.
303

304
  """
305
  response = {
306
    KEY_SUCCESS: success,
307
    KEY_RESULT: result,
308
    }
309

    
310
  logging.debug("LUXI response: %s", response)
311

    
312
  return serializer.DumpJson(response)
313

    
314

    
315
def FormatRequest(method, args):
316
  """Formats a LUXI request message.
317

318
  """
319
  # Build request
320
  request = {
321
    KEY_METHOD: method,
322
    KEY_ARGS: args,
323
    }
324

    
325
  # Serialize the request
326
  return serializer.DumpJson(request, indent=False)
327

    
328

    
329
def CallLuxiMethod(transport_cb, method, args):
330
  """Send a LUXI request via a transport and return the response.
331

332
  """
333
  assert callable(transport_cb)
334

    
335
  request_msg = FormatRequest(method, args)
336

    
337
  # Send request and wait for response
338
  response_msg = transport_cb(request_msg)
339

    
340
  (success, result) = ParseResponse(response_msg)
341

    
342
  if success:
343
    return result
344

    
345
  errors.MaybeRaise(result)
346
  raise RequestError(result)
347

    
348

    
349
class Client(object):
350
  """High-level client implementation.
351

352
  This uses a backing Transport-like class on top of which it
353
  implements data serialization/deserialization.
354

355
  """
356
  def __init__(self, address=None, timeouts=None, transport=Transport):
357
    """Constructor for the Client class.
358

359
    Arguments:
360
      - address: a valid address the the used transport class
361
      - timeout: a list of timeouts, to be used on connect and read/write
362
      - transport: a Transport-like class
363

364

365
    If timeout is not passed, the default timeouts of the transport
366
    class are used.
367

368
    """
369
    if address is None:
370
      address = constants.MASTER_SOCKET
371
    self.address = address
372
    self.timeouts = timeouts
373
    self.transport_class = transport
374
    self.transport = None
375
    self._InitTransport()
376

    
377
  def _InitTransport(self):
378
    """(Re)initialize the transport if needed.
379

380
    """
381
    if self.transport is None:
382
      self.transport = self.transport_class(self.address,
383
                                            timeouts=self.timeouts)
384

    
385
  def _CloseTransport(self):
386
    """Close the transport, ignoring errors.
387

388
    """
389
    if self.transport is None:
390
      return
391
    try:
392
      old_transp = self.transport
393
      self.transport = None
394
      old_transp.Close()
395
    except Exception: # pylint: disable-msg=W0703
396
      pass
397

    
398
  def _SendMethodCall(self, data):
399
    # Send request and wait for response
400
    try:
401
      self._InitTransport()
402
      return self.transport.Call(data)
403
    except Exception:
404
      self._CloseTransport()
405
      raise
406

    
407
  def CallMethod(self, method, args):
408
    """Send a generic request and return the response.
409

410
    """
411
    return CallLuxiMethod(self._SendMethodCall, method, args)
412

    
413
  def SetQueueDrainFlag(self, drain_flag):
414
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
415

    
416
  def SetWatcherPause(self, until):
417
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
418

    
419
  def SubmitJob(self, ops):
420
    ops_state = map(lambda op: op.__getstate__(), ops)
421
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
422

    
423
  def SubmitManyJobs(self, jobs):
424
    jobs_state = []
425
    for ops in jobs:
426
      jobs_state.append([op.__getstate__() for op in ops])
427
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
428

    
429
  def CancelJob(self, job_id):
430
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
431

    
432
  def ArchiveJob(self, job_id):
433
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
434

    
435
  def AutoArchiveJobs(self, age):
436
    timeout = (DEF_RWTO - 1) / 2
437
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
438

    
439
  def WaitForJobChangeOnce(self, job_id, fields,
440
                           prev_job_info, prev_log_serial,
441
                           timeout=WFJC_TIMEOUT):
442
    """Waits for changes on a job.
443

444
    @param job_id: Job ID
445
    @type fields: list
446
    @param fields: List of field names to be observed
447
    @type prev_job_info: None or list
448
    @param prev_job_info: Previously received job information
449
    @type prev_log_serial: None or int/long
450
    @param prev_log_serial: Highest log serial number previously received
451
    @type timeout: int/float
452
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
453
                    be capped to that value)
454

455
    """
456
    assert timeout >= 0, "Timeout can not be negative"
457
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
458
                           (job_id, fields, prev_job_info,
459
                            prev_log_serial,
460
                            min(WFJC_TIMEOUT, timeout)))
461

    
462
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
463
    while True:
464
      result = self.WaitForJobChangeOnce(job_id, fields,
465
                                         prev_job_info, prev_log_serial)
466
      if result != constants.JOB_NOTCHANGED:
467
        break
468
    return result
469

    
470
  def QueryJobs(self, job_ids, fields):
471
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
472

    
473
  def QueryInstances(self, names, fields, use_locking):
474
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
475

    
476
  def QueryNodes(self, names, fields, use_locking):
477
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
478

    
479
  def QueryExports(self, nodes, use_locking):
480
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
481

    
482
  def QueryClusterInfo(self):
483
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
484

    
485
  def QueryConfigValues(self, fields):
486
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
487

    
488
  def QueryTags(self, kind, name):
489
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))