Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 8d8d650c

History | View | Annotate | Download (12.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import serializer
39
from ganeti import constants
40
from ganeti import errors
41

    
42

    
43
KEY_METHOD = "method"
44
KEY_ARGS = "args"
45
KEY_SUCCESS = "success"
46
KEY_RESULT = "result"
47

    
48
REQ_SUBMIT_JOB = "SubmitJob"
49
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
50
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
51
REQ_CANCEL_JOB = "CancelJob"
52
REQ_ARCHIVE_JOB = "ArchiveJob"
53
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
54
REQ_QUERY_JOBS = "QueryJobs"
55
REQ_QUERY_INSTANCES = "QueryInstances"
56
REQ_QUERY_NODES = "QueryNodes"
57
REQ_QUERY_EXPORTS = "QueryExports"
58
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
59
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
60
REQ_QUERY_TAGS = "QueryTags"
61
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
62
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
63

    
64
DEF_CTMO = 10
65
DEF_RWTO = 60
66

    
67

    
68
class ProtocolError(errors.GenericError):
69
  """Denotes an error in the LUXI protocol"""
70

    
71

    
72
class ConnectionClosedError(ProtocolError):
73
  """Connection closed error"""
74

    
75

    
76
class TimeoutError(ProtocolError):
77
  """Operation timeout error"""
78

    
79

    
80
class RequestError(ProtocolError):
81
  """Error on request
82

83
  This signifies an error in the request format or request handling,
84
  but not (e.g.) an error in starting up an instance.
85

86
  Some common conditions that can trigger this exception:
87
    - job submission failed because the job data was wrong
88
    - query failed because required fields were missing
89

90
  """
91

    
92

    
93
class NoMasterError(ProtocolError):
94
  """The master cannot be reached
95

96
  This means that the master daemon is not running or the socket has
97
  been removed.
98

99
  """
100

    
101

    
102
class Transport:
103
  """Low-level transport class.
104

105
  This is used on the client side.
106

107
  This could be replace by any other class that provides the same
108
  semantics to the Client. This means:
109
    - can send messages and receive messages
110
    - safe for multithreading
111

112
  """
113

    
114
  def __init__(self, address, timeouts=None, eom=None):
115
    """Constructor for the Client class.
116

117
    Arguments:
118
      - address: a valid address the the used transport class
119
      - timeout: a list of timeouts, to be used on connect and read/write
120
      - eom: an identifier to be used as end-of-message which the
121
        upper-layer will guarantee that this identifier will not appear
122
        in any message
123

124
    There are two timeouts used since we might want to wait for a long
125
    time for a response, but the connect timeout should be lower.
126

127
    If not passed, we use a default of 10 and respectively 60 seconds.
128

129
    Note that on reading data, since the timeout applies to an
130
    invidual receive, it might be that the total duration is longer
131
    than timeout value passed (we make a hard limit at twice the read
132
    timeout).
133

134
    """
135
    self.address = address
136
    if timeouts is None:
137
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
138
    else:
139
      self._ctimeout, self._rwtimeout = timeouts
140

    
141
    self.socket = None
142
    self._buffer = ""
143
    self._msgs = collections.deque()
144

    
145
    if eom is None:
146
      self.eom = '\3'
147
    else:
148
      self.eom = eom
149

    
150
    try:
151
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
152
      self.socket.settimeout(self._ctimeout)
153
      try:
154
        self.socket.connect(address)
155
      except socket.timeout, err:
156
        raise TimeoutError("Connect timed out: %s" % str(err))
157
      except socket.error, err:
158
        if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
159
          raise NoMasterError(address)
160
        raise
161
      self.socket.settimeout(self._rwtimeout)
162
    except (socket.error, NoMasterError):
163
      if self.socket is not None:
164
        self.socket.close()
165
      self.socket = None
166
      raise
167

    
168
  def _CheckSocket(self):
169
    """Make sure we are connected.
170

171
    """
172
    if self.socket is None:
173
      raise ProtocolError("Connection is closed")
174

    
175
  def Send(self, msg):
176
    """Send a message.
177

178
    This just sends a message and doesn't wait for the response.
179

180
    """
181
    if self.eom in msg:
182
      raise ProtocolError("Message terminator found in payload")
183

    
184
    self._CheckSocket()
185
    try:
186
      # TODO: sendall is not guaranteed to send everything
187
      self.socket.sendall(msg + self.eom)
188
    except socket.timeout, err:
189
      raise TimeoutError("Sending timeout: %s" % str(err))
190

    
191
  def Recv(self):
192
    """Try to receive a message from the socket.
193

194
    In case we already have messages queued, we just return from the
195
    queue. Otherwise, we try to read data with a _rwtimeout network
196
    timeout, and making sure we don't go over 2x_rwtimeout as a global
197
    limit.
198

199
    """
200
    self._CheckSocket()
201
    etime = time.time() + self._rwtimeout
202
    while not self._msgs:
203
      if time.time() > etime:
204
        raise TimeoutError("Extended receive timeout")
205
      while True:
206
        try:
207
          data = self.socket.recv(4096)
208
        except socket.error, err:
209
          if err.args and err.args[0] == errno.EAGAIN:
210
            continue
211
          raise
212
        except socket.timeout, err:
213
          raise TimeoutError("Receive timeout: %s" % str(err))
214
        break
215
      if not data:
216
        raise ConnectionClosedError("Connection closed while reading")
217
      new_msgs = (self._buffer + data).split(self.eom)
218
      self._buffer = new_msgs.pop()
219
      self._msgs.extend(new_msgs)
220
    return self._msgs.popleft()
221

    
222
  def Call(self, msg):
223
    """Send a message and wait for the response.
224

225
    This is just a wrapper over Send and Recv.
226

227
    """
228
    self.Send(msg)
229
    return self.Recv()
230

    
231
  def Close(self):
232
    """Close the socket"""
233
    if self.socket is not None:
234
      self.socket.close()
235
      self.socket = None
236

    
237

    
238
def ParseRequest(msg):
239
  """Parses a LUXI request message.
240

241
  """
242
  try:
243
    request = serializer.LoadJson(msg)
244
  except ValueError, err:
245
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
246

    
247
  logging.debug("LUXI request: %s", request)
248

    
249
  if not isinstance(request, dict):
250
    logging.error("LUXI request not a dict: %r", msg)
251
    raise ProtocolError("Invalid LUXI request (not a dict)")
252

    
253
  method = request.get(KEY_METHOD, None)
254
  args = request.get(KEY_ARGS, None)
255
  if method is None or args is None:
256
    logging.error("LUXI request missing method or arguments: %r", msg)
257
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
258
                         " in request): %r") % msg)
259

    
260
  return (method, args)
261

    
262

    
263
def ParseResponse(msg):
264
  """Parses a LUXI response message.
265

266
  """
267
  # Parse the result
268
  try:
269
    data = serializer.LoadJson(msg)
270
  except Exception, err:
271
    raise ProtocolError("Error while deserializing response: %s" % str(err))
272

    
273
  # Validate response
274
  if not (isinstance(data, dict) and
275
          KEY_SUCCESS in data and
276
          KEY_RESULT in data):
277
    raise ProtocolError("Invalid response from server: %r" % data)
278

    
279
  return (data[KEY_SUCCESS], data[KEY_RESULT])
280

    
281

    
282
def FormatResponse(success, result):
283
  """Formats a LUXI response message.
284

285
  """
286
  response = {
287
    KEY_SUCCESS: success,
288
    KEY_RESULT: result,
289
    }
290

    
291
  logging.debug("LUXI response: %s", response)
292

    
293
  return serializer.DumpJson(response)
294

    
295

    
296
def FormatRequest(method, args):
297
  """Formats a LUXI request message.
298

299
  """
300
  # Build request
301
  request = {
302
    KEY_METHOD: method,
303
    KEY_ARGS: args,
304
    }
305

    
306
  # Serialize the request
307
  return serializer.DumpJson(request, indent=False)
308

    
309

    
310
def CallLuxiMethod(transport_cb, method, args):
311
  """Send a LUXI request via a transport and return the response.
312

313
  """
314
  assert callable(transport_cb)
315

    
316
  request_msg = FormatRequest(method, args)
317

    
318
  # Send request and wait for response
319
  response_msg = transport_cb(request_msg)
320

    
321
  (success, result) = ParseResponse(response_msg)
322

    
323
  if success:
324
    return result
325

    
326
  errors.MaybeRaise(result)
327
  raise RequestError(result)
328

    
329

    
330
class Client(object):
331
  """High-level client implementation.
332

333
  This uses a backing Transport-like class on top of which it
334
  implements data serialization/deserialization.
335

336
  """
337
  def __init__(self, address=None, timeouts=None, transport=Transport):
338
    """Constructor for the Client class.
339

340
    Arguments:
341
      - address: a valid address the the used transport class
342
      - timeout: a list of timeouts, to be used on connect and read/write
343
      - transport: a Transport-like class
344

345

346
    If timeout is not passed, the default timeouts of the transport
347
    class are used.
348

349
    """
350
    if address is None:
351
      address = constants.MASTER_SOCKET
352
    self.address = address
353
    self.timeouts = timeouts
354
    self.transport_class = transport
355
    self.transport = None
356
    self._InitTransport()
357

    
358
  def _InitTransport(self):
359
    """(Re)initialize the transport if needed.
360

361
    """
362
    if self.transport is None:
363
      self.transport = self.transport_class(self.address,
364
                                            timeouts=self.timeouts)
365

    
366
  def _CloseTransport(self):
367
    """Close the transport, ignoring errors.
368

369
    """
370
    if self.transport is None:
371
      return
372
    try:
373
      old_transp = self.transport
374
      self.transport = None
375
      old_transp.Close()
376
    except Exception: # pylint: disable-msg=W0703
377
      pass
378

    
379
  def _SendMethodCall(self, data):
380
    # Send request and wait for response
381
    try:
382
      self._InitTransport()
383
      return self.transport.Call(data)
384
    except Exception:
385
      self._CloseTransport()
386
      raise
387

    
388
  def CallMethod(self, method, args):
389
    """Send a generic request and return the response.
390

391
    """
392
    return CallLuxiMethod(self._SendMethodCall, method, args)
393

    
394
  def SetQueueDrainFlag(self, drain_flag):
395
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
396

    
397
  def SetWatcherPause(self, until):
398
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
399

    
400
  def SubmitJob(self, ops):
401
    ops_state = map(lambda op: op.__getstate__(), ops)
402
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
403

    
404
  def SubmitManyJobs(self, jobs):
405
    jobs_state = []
406
    for ops in jobs:
407
      jobs_state.append([op.__getstate__() for op in ops])
408
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
409

    
410
  def CancelJob(self, job_id):
411
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
412

    
413
  def ArchiveJob(self, job_id):
414
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
415

    
416
  def AutoArchiveJobs(self, age):
417
    timeout = (DEF_RWTO - 1) / 2
418
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
419

    
420
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
421
    timeout = (DEF_RWTO - 1) / 2
422
    while True:
423
      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
424
                               (job_id, fields, prev_job_info,
425
                                prev_log_serial, timeout))
426
      if result != constants.JOB_NOTCHANGED:
427
        break
428
    return result
429

    
430
  def QueryJobs(self, job_ids, fields):
431
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
432

    
433
  def QueryInstances(self, names, fields, use_locking):
434
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
435

    
436
  def QueryNodes(self, names, fields, use_locking):
437
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
438

    
439
  def QueryExports(self, nodes, use_locking):
440
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
441

    
442
  def QueryClusterInfo(self):
443
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
444

    
445
  def QueryConfigValues(self, fields):
446
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
447

    
448
  def QueryTags(self, kind, name):
449
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
450

    
451

    
452
# TODO: class Server(object)