Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 5a1c22fe

History | View | Annotate | Download (13.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import serializer
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import utils
42

    
43

    
44
KEY_METHOD = "method"
45
KEY_ARGS = "args"
46
KEY_SUCCESS = "success"
47
KEY_RESULT = "result"
48

    
49
REQ_SUBMIT_JOB = "SubmitJob"
50
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
51
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
52
REQ_CANCEL_JOB = "CancelJob"
53
REQ_ARCHIVE_JOB = "ArchiveJob"
54
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
55
REQ_QUERY_JOBS = "QueryJobs"
56
REQ_QUERY_INSTANCES = "QueryInstances"
57
REQ_QUERY_NODES = "QueryNodes"
58
REQ_QUERY_EXPORTS = "QueryExports"
59
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
60
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
61
REQ_QUERY_TAGS = "QueryTags"
62
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
63
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
64

    
65
DEF_CTMO = 10
66
DEF_RWTO = 60
67

    
68
# WaitForJobChange timeout
69
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
70

    
71

    
72
class ProtocolError(errors.GenericError):
73
  """Denotes an error in the LUXI protocol."""
74

    
75

    
76
class ConnectionClosedError(ProtocolError):
77
  """Connection closed error."""
78

    
79

    
80
class TimeoutError(ProtocolError):
81
  """Operation timeout error."""
82

    
83

    
84
class RequestError(ProtocolError):
85
  """Error on request.
86

87
  This signifies an error in the request format or request handling,
88
  but not (e.g.) an error in starting up an instance.
89

90
  Some common conditions that can trigger this exception:
91
    - job submission failed because the job data was wrong
92
    - query failed because required fields were missing
93

94
  """
95

    
96

    
97
class NoMasterError(ProtocolError):
98
  """The master cannot be reached.
99

100
  This means that the master daemon is not running or the socket has
101
  been removed.
102

103
  """
104

    
105

    
106
class PermissionError(ProtocolError):
107
  """Permission denied while connecting to the master socket.
108

109
  This means the user doesn't have the proper rights.
110

111
  """
112

    
113

    
114
class Transport:
115
  """Low-level transport class.
116

117
  This is used on the client side.
118

119
  This could be replace by any other class that provides the same
120
  semantics to the Client. This means:
121
    - can send messages and receive messages
122
    - safe for multithreading
123

124
  """
125

    
126
  def __init__(self, address, timeouts=None):
127
    """Constructor for the Client class.
128

129
    Arguments:
130
      - address: a valid address the the used transport class
131
      - timeout: a list of timeouts, to be used on connect and read/write
132

133
    There are two timeouts used since we might want to wait for a long
134
    time for a response, but the connect timeout should be lower.
135

136
    If not passed, we use a default of 10 and respectively 60 seconds.
137

138
    Note that on reading data, since the timeout applies to an
139
    invidual receive, it might be that the total duration is longer
140
    than timeout value passed (we make a hard limit at twice the read
141
    timeout).
142

143
    """
144
    self.address = address
145
    if timeouts is None:
146
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
147
    else:
148
      self._ctimeout, self._rwtimeout = timeouts
149

    
150
    self.socket = None
151
    self._buffer = ""
152
    self._msgs = collections.deque()
153

    
154
    try:
155
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
156

    
157
      # Try to connect
158
      try:
159
        utils.Retry(self._Connect, 1.0, self._ctimeout,
160
                    args=(self.socket, address, self._ctimeout))
161
      except utils.RetryTimeout:
162
        raise TimeoutError("Connect timed out")
163

    
164
      self.socket.settimeout(self._rwtimeout)
165
    except (socket.error, NoMasterError):
166
      if self.socket is not None:
167
        self.socket.close()
168
      self.socket = None
169
      raise
170

    
171
  @staticmethod
172
  def _Connect(sock, address, timeout):
173
    sock.settimeout(timeout)
174
    try:
175
      sock.connect(address)
176
    except socket.timeout, err:
177
      raise TimeoutError("Connect timed out: %s" % str(err))
178
    except socket.error, err:
179
      error_code = err.args[0]
180
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
181
        raise NoMasterError(address)
182
      elif error_code in (errno.EPERM, errno.EACCES):
183
        raise PermissionError(address)
184
      elif error_code == errno.EAGAIN:
185
        # Server's socket backlog is full at the moment
186
        raise utils.RetryAgain()
187
      raise
188

    
189
  def _CheckSocket(self):
190
    """Make sure we are connected.
191

192
    """
193
    if self.socket is None:
194
      raise ProtocolError("Connection is closed")
195

    
196
  def Send(self, msg):
197
    """Send a message.
198

199
    This just sends a message and doesn't wait for the response.
200

201
    """
202
    if constants.LUXI_EOM in msg:
203
      raise ProtocolError("Message terminator found in payload")
204

    
205
    self._CheckSocket()
206
    try:
207
      # TODO: sendall is not guaranteed to send everything
208
      self.socket.sendall(msg + constants.LUXI_EOM)
209
    except socket.timeout, err:
210
      raise TimeoutError("Sending timeout: %s" % str(err))
211

    
212
  def Recv(self):
213
    """Try to receive a message from the socket.
214

215
    In case we already have messages queued, we just return from the
216
    queue. Otherwise, we try to read data with a _rwtimeout network
217
    timeout, and making sure we don't go over 2x_rwtimeout as a global
218
    limit.
219

220
    """
221
    self._CheckSocket()
222
    etime = time.time() + self._rwtimeout
223
    while not self._msgs:
224
      if time.time() > etime:
225
        raise TimeoutError("Extended receive timeout")
226
      while True:
227
        try:
228
          data = self.socket.recv(4096)
229
        except socket.error, err:
230
          if err.args and err.args[0] == errno.EAGAIN:
231
            continue
232
          raise
233
        except socket.timeout, err:
234
          raise TimeoutError("Receive timeout: %s" % str(err))
235
        break
236
      if not data:
237
        raise ConnectionClosedError("Connection closed while reading")
238
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
239
      self._buffer = new_msgs.pop()
240
      self._msgs.extend(new_msgs)
241
    return self._msgs.popleft()
242

    
243
  def Call(self, msg):
244
    """Send a message and wait for the response.
245

246
    This is just a wrapper over Send and Recv.
247

248
    """
249
    self.Send(msg)
250
    return self.Recv()
251

    
252
  def Close(self):
253
    """Close the socket"""
254
    if self.socket is not None:
255
      self.socket.close()
256
      self.socket = None
257

    
258

    
259
def ParseRequest(msg):
260
  """Parses a LUXI request message.
261

262
  """
263
  try:
264
    request = serializer.LoadJson(msg)
265
  except ValueError, err:
266
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
267

    
268
  logging.debug("LUXI request: %s", request)
269

    
270
  if not isinstance(request, dict):
271
    logging.error("LUXI request not a dict: %r", msg)
272
    raise ProtocolError("Invalid LUXI request (not a dict)")
273

    
274
  method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
275
  args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
276

    
277
  if method is None or args is None:
278
    logging.error("LUXI request missing method or arguments: %r", msg)
279
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
280
                         " in request): %r") % msg)
281

    
282
  return (method, args)
283

    
284

    
285
def ParseResponse(msg):
286
  """Parses a LUXI response message.
287

288
  """
289
  # Parse the result
290
  try:
291
    data = serializer.LoadJson(msg)
292
  except Exception, err:
293
    raise ProtocolError("Error while deserializing response: %s" % str(err))
294

    
295
  # Validate response
296
  if not (isinstance(data, dict) and
297
          KEY_SUCCESS in data and
298
          KEY_RESULT in data):
299
    raise ProtocolError("Invalid response from server: %r" % data)
300

    
301
  return (data[KEY_SUCCESS], data[KEY_RESULT])
302

    
303

    
304
def FormatResponse(success, result):
305
  """Formats a LUXI response message.
306

307
  """
308
  response = {
309
    KEY_SUCCESS: success,
310
    KEY_RESULT: result,
311
    }
312

    
313
  logging.debug("LUXI response: %s", response)
314

    
315
  return serializer.DumpJson(response)
316

    
317

    
318
def FormatRequest(method, args):
319
  """Formats a LUXI request message.
320

321
  """
322
  # Build request
323
  request = {
324
    KEY_METHOD: method,
325
    KEY_ARGS: args,
326
    }
327

    
328
  # Serialize the request
329
  return serializer.DumpJson(request, indent=False)
330

    
331

    
332
def CallLuxiMethod(transport_cb, method, args):
333
  """Send a LUXI request via a transport and return the response.
334

335
  """
336
  assert callable(transport_cb)
337

    
338
  request_msg = FormatRequest(method, args)
339

    
340
  # Send request and wait for response
341
  response_msg = transport_cb(request_msg)
342

    
343
  (success, result) = ParseResponse(response_msg)
344

    
345
  if success:
346
    return result
347

    
348
  errors.MaybeRaise(result)
349
  raise RequestError(result)
350

    
351

    
352
class Client(object):
353
  """High-level client implementation.
354

355
  This uses a backing Transport-like class on top of which it
356
  implements data serialization/deserialization.
357

358
  """
359
  def __init__(self, address=None, timeouts=None, transport=Transport):
360
    """Constructor for the Client class.
361

362
    Arguments:
363
      - address: a valid address the the used transport class
364
      - timeout: a list of timeouts, to be used on connect and read/write
365
      - transport: a Transport-like class
366

367

368
    If timeout is not passed, the default timeouts of the transport
369
    class are used.
370

371
    """
372
    if address is None:
373
      address = constants.MASTER_SOCKET
374
    self.address = address
375
    self.timeouts = timeouts
376
    self.transport_class = transport
377
    self.transport = None
378
    self._InitTransport()
379

    
380
  def _InitTransport(self):
381
    """(Re)initialize the transport if needed.
382

383
    """
384
    if self.transport is None:
385
      self.transport = self.transport_class(self.address,
386
                                            timeouts=self.timeouts)
387

    
388
  def _CloseTransport(self):
389
    """Close the transport, ignoring errors.
390

391
    """
392
    if self.transport is None:
393
      return
394
    try:
395
      old_transp = self.transport
396
      self.transport = None
397
      old_transp.Close()
398
    except Exception: # pylint: disable-msg=W0703
399
      pass
400

    
401
  def _SendMethodCall(self, data):
402
    # Send request and wait for response
403
    try:
404
      self._InitTransport()
405
      return self.transport.Call(data)
406
    except Exception:
407
      self._CloseTransport()
408
      raise
409

    
410
  def CallMethod(self, method, args):
411
    """Send a generic request and return the response.
412

413
    """
414
    return CallLuxiMethod(self._SendMethodCall, method, args)
415

    
416
  def SetQueueDrainFlag(self, drain_flag):
417
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
418

    
419
  def SetWatcherPause(self, until):
420
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
421

    
422
  def SubmitJob(self, ops):
423
    ops_state = map(lambda op: op.__getstate__(), ops)
424
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
425

    
426
  def SubmitManyJobs(self, jobs):
427
    jobs_state = []
428
    for ops in jobs:
429
      jobs_state.append([op.__getstate__() for op in ops])
430
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
431

    
432
  def CancelJob(self, job_id):
433
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
434

    
435
  def ArchiveJob(self, job_id):
436
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
437

    
438
  def AutoArchiveJobs(self, age):
439
    timeout = (DEF_RWTO - 1) / 2
440
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
441

    
442
  def WaitForJobChangeOnce(self, job_id, fields,
443
                           prev_job_info, prev_log_serial,
444
                           timeout=WFJC_TIMEOUT):
445
    """Waits for changes on a job.
446

447
    @param job_id: Job ID
448
    @type fields: list
449
    @param fields: List of field names to be observed
450
    @type prev_job_info: None or list
451
    @param prev_job_info: Previously received job information
452
    @type prev_log_serial: None or int/long
453
    @param prev_log_serial: Highest log serial number previously received
454
    @type timeout: int/float
455
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
456
                    be capped to that value)
457

458
    """
459
    assert timeout >= 0, "Timeout can not be negative"
460
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
461
                           (job_id, fields, prev_job_info,
462
                            prev_log_serial,
463
                            min(WFJC_TIMEOUT, timeout)))
464

    
465
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
466
    while True:
467
      result = self.WaitForJobChangeOnce(job_id, fields,
468
                                         prev_job_info, prev_log_serial)
469
      if result != constants.JOB_NOTCHANGED:
470
        break
471
    return result
472

    
473
  def QueryJobs(self, job_ids, fields):
474
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
475

    
476
  def QueryInstances(self, names, fields, use_locking):
477
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
478

    
479
  def QueryNodes(self, names, fields, use_locking):
480
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
481

    
482
  def QueryExports(self, nodes, use_locking):
483
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
484

    
485
  def QueryClusterInfo(self):
486
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
487

    
488
  def QueryConfigValues(self, fields):
489
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
490

    
491
  def QueryTags(self, kind, name):
492
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))