Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 19b9ba9a

History | View | Annotate | Download (13.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import serializer
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import utils
42

    
43

    
44
KEY_METHOD = "method"
45
KEY_ARGS = "args"
46
KEY_SUCCESS = "success"
47
KEY_RESULT = "result"
48

    
49
REQ_SUBMIT_JOB = "SubmitJob"
50
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
51
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
52
REQ_CANCEL_JOB = "CancelJob"
53
REQ_ARCHIVE_JOB = "ArchiveJob"
54
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
55
REQ_QUERY_JOBS = "QueryJobs"
56
REQ_QUERY_INSTANCES = "QueryInstances"
57
REQ_QUERY_NODES = "QueryNodes"
58
REQ_QUERY_EXPORTS = "QueryExports"
59
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
60
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
61
REQ_QUERY_TAGS = "QueryTags"
62
REQ_QUERY_LOCKS = "QueryLocks"
63
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
64
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
65

    
66
DEF_CTMO = 10
67
DEF_RWTO = 60
68

    
69
# WaitForJobChange timeout
70
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
71

    
72

    
73
class ProtocolError(errors.GenericError):
74
  """Denotes an error in the LUXI protocol."""
75

    
76

    
77
class ConnectionClosedError(ProtocolError):
78
  """Connection closed error."""
79

    
80

    
81
class TimeoutError(ProtocolError):
82
  """Operation timeout error."""
83

    
84

    
85
class RequestError(ProtocolError):
86
  """Error on request.
87

88
  This signifies an error in the request format or request handling,
89
  but not (e.g.) an error in starting up an instance.
90

91
  Some common conditions that can trigger this exception:
92
    - job submission failed because the job data was wrong
93
    - query failed because required fields were missing
94

95
  """
96

    
97

    
98
class NoMasterError(ProtocolError):
99
  """The master cannot be reached.
100

101
  This means that the master daemon is not running or the socket has
102
  been removed.
103

104
  """
105

    
106

    
107
class PermissionError(ProtocolError):
108
  """Permission denied while connecting to the master socket.
109

110
  This means the user doesn't have the proper rights.
111

112
  """
113

    
114

    
115
class Transport:
116
  """Low-level transport class.
117

118
  This is used on the client side.
119

120
  This could be replace by any other class that provides the same
121
  semantics to the Client. This means:
122
    - can send messages and receive messages
123
    - safe for multithreading
124

125
  """
126

    
127
  def __init__(self, address, timeouts=None):
128
    """Constructor for the Client class.
129

130
    Arguments:
131
      - address: a valid address the the used transport class
132
      - timeout: a list of timeouts, to be used on connect and read/write
133

134
    There are two timeouts used since we might want to wait for a long
135
    time for a response, but the connect timeout should be lower.
136

137
    If not passed, we use a default of 10 and respectively 60 seconds.
138

139
    Note that on reading data, since the timeout applies to an
140
    invidual receive, it might be that the total duration is longer
141
    than timeout value passed (we make a hard limit at twice the read
142
    timeout).
143

144
    """
145
    self.address = address
146
    if timeouts is None:
147
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
148
    else:
149
      self._ctimeout, self._rwtimeout = timeouts
150

    
151
    self.socket = None
152
    self._buffer = ""
153
    self._msgs = collections.deque()
154

    
155
    try:
156
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
157

    
158
      # Try to connect
159
      try:
160
        utils.Retry(self._Connect, 1.0, self._ctimeout,
161
                    args=(self.socket, address, self._ctimeout))
162
      except utils.RetryTimeout:
163
        raise TimeoutError("Connect timed out")
164

    
165
      self.socket.settimeout(self._rwtimeout)
166
    except (socket.error, NoMasterError):
167
      if self.socket is not None:
168
        self.socket.close()
169
      self.socket = None
170
      raise
171

    
172
  @staticmethod
173
  def _Connect(sock, address, timeout):
174
    sock.settimeout(timeout)
175
    try:
176
      sock.connect(address)
177
    except socket.timeout, err:
178
      raise TimeoutError("Connect timed out: %s" % str(err))
179
    except socket.error, err:
180
      error_code = err.args[0]
181
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
182
        raise NoMasterError(address)
183
      elif error_code in (errno.EPERM, errno.EACCES):
184
        raise PermissionError(address)
185
      elif error_code == errno.EAGAIN:
186
        # Server's socket backlog is full at the moment
187
        raise utils.RetryAgain()
188
      raise
189

    
190
  def _CheckSocket(self):
191
    """Make sure we are connected.
192

193
    """
194
    if self.socket is None:
195
      raise ProtocolError("Connection is closed")
196

    
197
  def Send(self, msg):
198
    """Send a message.
199

200
    This just sends a message and doesn't wait for the response.
201

202
    """
203
    if constants.LUXI_EOM in msg:
204
      raise ProtocolError("Message terminator found in payload")
205

    
206
    self._CheckSocket()
207
    try:
208
      # TODO: sendall is not guaranteed to send everything
209
      self.socket.sendall(msg + constants.LUXI_EOM)
210
    except socket.timeout, err:
211
      raise TimeoutError("Sending timeout: %s" % str(err))
212

    
213
  def Recv(self):
214
    """Try to receive a message from the socket.
215

216
    In case we already have messages queued, we just return from the
217
    queue. Otherwise, we try to read data with a _rwtimeout network
218
    timeout, and making sure we don't go over 2x_rwtimeout as a global
219
    limit.
220

221
    """
222
    self._CheckSocket()
223
    etime = time.time() + self._rwtimeout
224
    while not self._msgs:
225
      if time.time() > etime:
226
        raise TimeoutError("Extended receive timeout")
227
      while True:
228
        try:
229
          data = self.socket.recv(4096)
230
        except socket.error, err:
231
          if err.args and err.args[0] == errno.EAGAIN:
232
            continue
233
          raise
234
        except socket.timeout, err:
235
          raise TimeoutError("Receive timeout: %s" % str(err))
236
        break
237
      if not data:
238
        raise ConnectionClosedError("Connection closed while reading")
239
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
240
      self._buffer = new_msgs.pop()
241
      self._msgs.extend(new_msgs)
242
    return self._msgs.popleft()
243

    
244
  def Call(self, msg):
245
    """Send a message and wait for the response.
246

247
    This is just a wrapper over Send and Recv.
248

249
    """
250
    self.Send(msg)
251
    return self.Recv()
252

    
253
  def Close(self):
254
    """Close the socket"""
255
    if self.socket is not None:
256
      self.socket.close()
257
      self.socket = None
258

    
259

    
260
def ParseRequest(msg):
261
  """Parses a LUXI request message.
262

263
  """
264
  try:
265
    request = serializer.LoadJson(msg)
266
  except ValueError, err:
267
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
268

    
269
  logging.debug("LUXI request: %s", request)
270

    
271
  if not isinstance(request, dict):
272
    logging.error("LUXI request not a dict: %r", msg)
273
    raise ProtocolError("Invalid LUXI request (not a dict)")
274

    
275
  method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
276
  args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
277

    
278
  if method is None or args is None:
279
    logging.error("LUXI request missing method or arguments: %r", msg)
280
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
281
                         " in request): %r") % msg)
282

    
283
  return (method, args)
284

    
285

    
286
def ParseResponse(msg):
287
  """Parses a LUXI response message.
288

289
  """
290
  # Parse the result
291
  try:
292
    data = serializer.LoadJson(msg)
293
  except Exception, err:
294
    raise ProtocolError("Error while deserializing response: %s" % str(err))
295

    
296
  # Validate response
297
  if not (isinstance(data, dict) and
298
          KEY_SUCCESS in data and
299
          KEY_RESULT in data):
300
    raise ProtocolError("Invalid response from server: %r" % data)
301

    
302
  return (data[KEY_SUCCESS], data[KEY_RESULT])
303

    
304

    
305
def FormatResponse(success, result):
306
  """Formats a LUXI response message.
307

308
  """
309
  response = {
310
    KEY_SUCCESS: success,
311
    KEY_RESULT: result,
312
    }
313

    
314
  logging.debug("LUXI response: %s", response)
315

    
316
  return serializer.DumpJson(response)
317

    
318

    
319
def FormatRequest(method, args):
320
  """Formats a LUXI request message.
321

322
  """
323
  # Build request
324
  request = {
325
    KEY_METHOD: method,
326
    KEY_ARGS: args,
327
    }
328

    
329
  # Serialize the request
330
  return serializer.DumpJson(request, indent=False)
331

    
332

    
333
def CallLuxiMethod(transport_cb, method, args):
334
  """Send a LUXI request via a transport and return the response.
335

336
  """
337
  assert callable(transport_cb)
338

    
339
  request_msg = FormatRequest(method, args)
340

    
341
  # Send request and wait for response
342
  response_msg = transport_cb(request_msg)
343

    
344
  (success, result) = ParseResponse(response_msg)
345

    
346
  if success:
347
    return result
348

    
349
  errors.MaybeRaise(result)
350
  raise RequestError(result)
351

    
352

    
353
class Client(object):
354
  """High-level client implementation.
355

356
  This uses a backing Transport-like class on top of which it
357
  implements data serialization/deserialization.
358

359
  """
360
  def __init__(self, address=None, timeouts=None, transport=Transport):
361
    """Constructor for the Client class.
362

363
    Arguments:
364
      - address: a valid address the the used transport class
365
      - timeout: a list of timeouts, to be used on connect and read/write
366
      - transport: a Transport-like class
367

368

369
    If timeout is not passed, the default timeouts of the transport
370
    class are used.
371

372
    """
373
    if address is None:
374
      address = constants.MASTER_SOCKET
375
    self.address = address
376
    self.timeouts = timeouts
377
    self.transport_class = transport
378
    self.transport = None
379
    self._InitTransport()
380

    
381
  def _InitTransport(self):
382
    """(Re)initialize the transport if needed.
383

384
    """
385
    if self.transport is None:
386
      self.transport = self.transport_class(self.address,
387
                                            timeouts=self.timeouts)
388

    
389
  def _CloseTransport(self):
390
    """Close the transport, ignoring errors.
391

392
    """
393
    if self.transport is None:
394
      return
395
    try:
396
      old_transp = self.transport
397
      self.transport = None
398
      old_transp.Close()
399
    except Exception: # pylint: disable-msg=W0703
400
      pass
401

    
402
  def _SendMethodCall(self, data):
403
    # Send request and wait for response
404
    try:
405
      self._InitTransport()
406
      return self.transport.Call(data)
407
    except Exception:
408
      self._CloseTransport()
409
      raise
410

    
411
  def CallMethod(self, method, args):
412
    """Send a generic request and return the response.
413

414
    """
415
    return CallLuxiMethod(self._SendMethodCall, method, args)
416

    
417
  def SetQueueDrainFlag(self, drain_flag):
418
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
419

    
420
  def SetWatcherPause(self, until):
421
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
422

    
423
  def SubmitJob(self, ops):
424
    ops_state = map(lambda op: op.__getstate__(), ops)
425
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
426

    
427
  def SubmitManyJobs(self, jobs):
428
    jobs_state = []
429
    for ops in jobs:
430
      jobs_state.append([op.__getstate__() for op in ops])
431
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
432

    
433
  def CancelJob(self, job_id):
434
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
435

    
436
  def ArchiveJob(self, job_id):
437
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
438

    
439
  def AutoArchiveJobs(self, age):
440
    timeout = (DEF_RWTO - 1) / 2
441
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
442

    
443
  def WaitForJobChangeOnce(self, job_id, fields,
444
                           prev_job_info, prev_log_serial,
445
                           timeout=WFJC_TIMEOUT):
446
    """Waits for changes on a job.
447

448
    @param job_id: Job ID
449
    @type fields: list
450
    @param fields: List of field names to be observed
451
    @type prev_job_info: None or list
452
    @param prev_job_info: Previously received job information
453
    @type prev_log_serial: None or int/long
454
    @param prev_log_serial: Highest log serial number previously received
455
    @type timeout: int/float
456
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
457
                    be capped to that value)
458

459
    """
460
    assert timeout >= 0, "Timeout can not be negative"
461
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
462
                           (job_id, fields, prev_job_info,
463
                            prev_log_serial,
464
                            min(WFJC_TIMEOUT, timeout)))
465

    
466
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
467
    while True:
468
      result = self.WaitForJobChangeOnce(job_id, fields,
469
                                         prev_job_info, prev_log_serial)
470
      if result != constants.JOB_NOTCHANGED:
471
        break
472
    return result
473

    
474
  def QueryJobs(self, job_ids, fields):
475
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
476

    
477
  def QueryInstances(self, names, fields, use_locking):
478
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
479

    
480
  def QueryNodes(self, names, fields, use_locking):
481
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
482

    
483
  def QueryExports(self, nodes, use_locking):
484
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
485

    
486
  def QueryClusterInfo(self):
487
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
488

    
489
  def QueryConfigValues(self, fields):
490
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
491

    
492
  def QueryTags(self, kind, name):
493
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
494

    
495
  def QueryLocks(self, fields, sync):
496
    return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))