Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 84a12e40

History | View | Annotate | Download (13.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import serializer
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import utils
42

    
43

    
44
KEY_METHOD = "method"
45
KEY_ARGS = "args"
46
KEY_SUCCESS = "success"
47
KEY_RESULT = "result"
48

    
49
REQ_SUBMIT_JOB = "SubmitJob"
50
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
51
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
52
REQ_CANCEL_JOB = "CancelJob"
53
REQ_ARCHIVE_JOB = "ArchiveJob"
54
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
55
REQ_QUERY_JOBS = "QueryJobs"
56
REQ_QUERY_INSTANCES = "QueryInstances"
57
REQ_QUERY_NODES = "QueryNodes"
58
REQ_QUERY_EXPORTS = "QueryExports"
59
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
60
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
61
REQ_QUERY_TAGS = "QueryTags"
62
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
63
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
64

    
65
DEF_CTMO = 10
66
DEF_RWTO = 60
67

    
68
# WaitForJobChange timeout
69
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
70

    
71

    
72
class ProtocolError(errors.GenericError):
73
  """Denotes an error in the LUXI protocol"""
74

    
75

    
76
class ConnectionClosedError(ProtocolError):
77
  """Connection closed error"""
78

    
79

    
80
class TimeoutError(ProtocolError):
81
  """Operation timeout error"""
82

    
83

    
84
class RequestError(ProtocolError):
85
  """Error on request
86

87
  This signifies an error in the request format or request handling,
88
  but not (e.g.) an error in starting up an instance.
89

90
  Some common conditions that can trigger this exception:
91
    - job submission failed because the job data was wrong
92
    - query failed because required fields were missing
93

94
  """
95

    
96

    
97
class NoMasterError(ProtocolError):
98
  """The master cannot be reached
99

100
  This means that the master daemon is not running or the socket has
101
  been removed.
102

103
  """
104

    
105

    
106
class Transport:
107
  """Low-level transport class.
108

109
  This is used on the client side.
110

111
  This could be replace by any other class that provides the same
112
  semantics to the Client. This means:
113
    - can send messages and receive messages
114
    - safe for multithreading
115

116
  """
117

    
118
  def __init__(self, address, timeouts=None):
119
    """Constructor for the Client class.
120

121
    Arguments:
122
      - address: a valid address the the used transport class
123
      - timeout: a list of timeouts, to be used on connect and read/write
124

125
    There are two timeouts used since we might want to wait for a long
126
    time for a response, but the connect timeout should be lower.
127

128
    If not passed, we use a default of 10 and respectively 60 seconds.
129

130
    Note that on reading data, since the timeout applies to an
131
    invidual receive, it might be that the total duration is longer
132
    than timeout value passed (we make a hard limit at twice the read
133
    timeout).
134

135
    """
136
    self.address = address
137
    if timeouts is None:
138
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
139
    else:
140
      self._ctimeout, self._rwtimeout = timeouts
141

    
142
    self.socket = None
143
    self._buffer = ""
144
    self._msgs = collections.deque()
145

    
146
    try:
147
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
148

    
149
      # Try to connect
150
      try:
151
        utils.Retry(self._Connect, 1.0, self._ctimeout,
152
                    args=(self.socket, address, self._ctimeout))
153
      except utils.RetryTimeout:
154
        raise TimeoutError("Connect timed out")
155

    
156
      self.socket.settimeout(self._rwtimeout)
157
    except (socket.error, NoMasterError):
158
      if self.socket is not None:
159
        self.socket.close()
160
      self.socket = None
161
      raise
162

    
163
  @staticmethod
164
  def _Connect(sock, address, timeout):
165
    sock.settimeout(timeout)
166
    try:
167
      sock.connect(address)
168
    except socket.timeout, err:
169
      raise TimeoutError("Connect timed out: %s" % str(err))
170
    except socket.error, err:
171
      if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
172
        raise NoMasterError(address)
173
      if err.args[0] == errno.EAGAIN:
174
        # Server's socket backlog is full at the moment
175
        raise utils.RetryAgain()
176
      raise
177

    
178
  def _CheckSocket(self):
179
    """Make sure we are connected.
180

181
    """
182
    if self.socket is None:
183
      raise ProtocolError("Connection is closed")
184

    
185
  def Send(self, msg):
186
    """Send a message.
187

188
    This just sends a message and doesn't wait for the response.
189

190
    """
191
    if constants.LUXI_EOM in msg:
192
      raise ProtocolError("Message terminator found in payload")
193

    
194
    self._CheckSocket()
195
    try:
196
      # TODO: sendall is not guaranteed to send everything
197
      self.socket.sendall(msg + constants.LUXI_EOM)
198
    except socket.timeout, err:
199
      raise TimeoutError("Sending timeout: %s" % str(err))
200

    
201
  def Recv(self):
202
    """Try to receive a message from the socket.
203

204
    In case we already have messages queued, we just return from the
205
    queue. Otherwise, we try to read data with a _rwtimeout network
206
    timeout, and making sure we don't go over 2x_rwtimeout as a global
207
    limit.
208

209
    """
210
    self._CheckSocket()
211
    etime = time.time() + self._rwtimeout
212
    while not self._msgs:
213
      if time.time() > etime:
214
        raise TimeoutError("Extended receive timeout")
215
      while True:
216
        try:
217
          data = self.socket.recv(4096)
218
        except socket.error, err:
219
          if err.args and err.args[0] == errno.EAGAIN:
220
            continue
221
          raise
222
        except socket.timeout, err:
223
          raise TimeoutError("Receive timeout: %s" % str(err))
224
        break
225
      if not data:
226
        raise ConnectionClosedError("Connection closed while reading")
227
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
228
      self._buffer = new_msgs.pop()
229
      self._msgs.extend(new_msgs)
230
    return self._msgs.popleft()
231

    
232
  def Call(self, msg):
233
    """Send a message and wait for the response.
234

235
    This is just a wrapper over Send and Recv.
236

237
    """
238
    self.Send(msg)
239
    return self.Recv()
240

    
241
  def Close(self):
242
    """Close the socket"""
243
    if self.socket is not None:
244
      self.socket.close()
245
      self.socket = None
246

    
247

    
248
def ParseRequest(msg):
249
  """Parses a LUXI request message.
250

251
  """
252
  try:
253
    request = serializer.LoadJson(msg)
254
  except ValueError, err:
255
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
256

    
257
  logging.debug("LUXI request: %s", request)
258

    
259
  if not isinstance(request, dict):
260
    logging.error("LUXI request not a dict: %r", msg)
261
    raise ProtocolError("Invalid LUXI request (not a dict)")
262

    
263
  method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
264
  args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
265

    
266
  if method is None or args is None:
267
    logging.error("LUXI request missing method or arguments: %r", msg)
268
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
269
                         " in request): %r") % msg)
270

    
271
  return (method, args)
272

    
273

    
274
def ParseResponse(msg):
275
  """Parses a LUXI response message.
276

277
  """
278
  # Parse the result
279
  try:
280
    data = serializer.LoadJson(msg)
281
  except Exception, err:
282
    raise ProtocolError("Error while deserializing response: %s" % str(err))
283

    
284
  # Validate response
285
  if not (isinstance(data, dict) and
286
          KEY_SUCCESS in data and
287
          KEY_RESULT in data):
288
    raise ProtocolError("Invalid response from server: %r" % data)
289

    
290
  return (data[KEY_SUCCESS], data[KEY_RESULT])
291

    
292

    
293
def FormatResponse(success, result):
294
  """Formats a LUXI response message.
295

296
  """
297
  response = {
298
    KEY_SUCCESS: success,
299
    KEY_RESULT: result,
300
    }
301

    
302
  logging.debug("LUXI response: %s", response)
303

    
304
  return serializer.DumpJson(response)
305

    
306

    
307
def FormatRequest(method, args):
308
  """Formats a LUXI request message.
309

310
  """
311
  # Build request
312
  request = {
313
    KEY_METHOD: method,
314
    KEY_ARGS: args,
315
    }
316

    
317
  # Serialize the request
318
  return serializer.DumpJson(request, indent=False)
319

    
320

    
321
def CallLuxiMethod(transport_cb, method, args):
322
  """Send a LUXI request via a transport and return the response.
323

324
  """
325
  assert callable(transport_cb)
326

    
327
  request_msg = FormatRequest(method, args)
328

    
329
  # Send request and wait for response
330
  response_msg = transport_cb(request_msg)
331

    
332
  (success, result) = ParseResponse(response_msg)
333

    
334
  if success:
335
    return result
336

    
337
  errors.MaybeRaise(result)
338
  raise RequestError(result)
339

    
340

    
341
class Client(object):
342
  """High-level client implementation.
343

344
  This uses a backing Transport-like class on top of which it
345
  implements data serialization/deserialization.
346

347
  """
348
  def __init__(self, address=None, timeouts=None, transport=Transport):
349
    """Constructor for the Client class.
350

351
    Arguments:
352
      - address: a valid address the the used transport class
353
      - timeout: a list of timeouts, to be used on connect and read/write
354
      - transport: a Transport-like class
355

356

357
    If timeout is not passed, the default timeouts of the transport
358
    class are used.
359

360
    """
361
    if address is None:
362
      address = constants.MASTER_SOCKET
363
    self.address = address
364
    self.timeouts = timeouts
365
    self.transport_class = transport
366
    self.transport = None
367
    self._InitTransport()
368

    
369
  def _InitTransport(self):
370
    """(Re)initialize the transport if needed.
371

372
    """
373
    if self.transport is None:
374
      self.transport = self.transport_class(self.address,
375
                                            timeouts=self.timeouts)
376

    
377
  def _CloseTransport(self):
378
    """Close the transport, ignoring errors.
379

380
    """
381
    if self.transport is None:
382
      return
383
    try:
384
      old_transp = self.transport
385
      self.transport = None
386
      old_transp.Close()
387
    except Exception: # pylint: disable-msg=W0703
388
      pass
389

    
390
  def _SendMethodCall(self, data):
391
    # Send request and wait for response
392
    try:
393
      self._InitTransport()
394
      return self.transport.Call(data)
395
    except Exception:
396
      self._CloseTransport()
397
      raise
398

    
399
  def CallMethod(self, method, args):
400
    """Send a generic request and return the response.
401

402
    """
403
    return CallLuxiMethod(self._SendMethodCall, method, args)
404

    
405
  def SetQueueDrainFlag(self, drain_flag):
406
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
407

    
408
  def SetWatcherPause(self, until):
409
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
410

    
411
  def SubmitJob(self, ops):
412
    ops_state = map(lambda op: op.__getstate__(), ops)
413
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
414

    
415
  def SubmitManyJobs(self, jobs):
416
    jobs_state = []
417
    for ops in jobs:
418
      jobs_state.append([op.__getstate__() for op in ops])
419
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
420

    
421
  def CancelJob(self, job_id):
422
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
423

    
424
  def ArchiveJob(self, job_id):
425
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
426

    
427
  def AutoArchiveJobs(self, age):
428
    timeout = (DEF_RWTO - 1) / 2
429
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
430

    
431
  def WaitForJobChangeOnce(self, job_id, fields,
432
                           prev_job_info, prev_log_serial,
433
                           timeout=WFJC_TIMEOUT):
434
    """Waits for changes on a job.
435

436
    @param job_id: Job ID
437
    @type fields: list
438
    @param fields: List of field names to be observed
439
    @type prev_job_info: None or list
440
    @param prev_job_info: Previously received job information
441
    @type prev_log_serial: None or int/long
442
    @param prev_log_serial: Highest log serial number previously received
443
    @type timeout: int/float
444
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
445
                    be capped to that value)
446

447
    """
448
    assert timeout >= 0, "Timeout can not be negative"
449
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
450
                           (job_id, fields, prev_job_info,
451
                            prev_log_serial,
452
                            min(WFJC_TIMEOUT, timeout)))
453

    
454
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
455
    while True:
456
      result = self.WaitForJobChangeOnce(job_id, fields,
457
                                         prev_job_info, prev_log_serial)
458
      if result != constants.JOB_NOTCHANGED:
459
        break
460
    return result
461

    
462
  def QueryJobs(self, job_ids, fields):
463
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
464

    
465
  def QueryInstances(self, names, fields, use_locking):
466
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
467

    
468
  def QueryNodes(self, names, fields, use_locking):
469
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
470

    
471
  def QueryExports(self, nodes, use_locking):
472
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
473

    
474
  def QueryClusterInfo(self):
475
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
476

    
477
  def QueryConfigValues(self, fields):
478
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
479

    
480
  def QueryTags(self, kind, name):
481
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))