Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 231db3a5

History | View | Annotate | Download (12 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import serializer
39
from ganeti import constants
40
from ganeti import errors
41

    
42

    
43
KEY_METHOD = "method"
44
KEY_ARGS = "args"
45
KEY_SUCCESS = "success"
46
KEY_RESULT = "result"
47

    
48
REQ_SUBMIT_JOB = "SubmitJob"
49
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
50
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
51
REQ_CANCEL_JOB = "CancelJob"
52
REQ_ARCHIVE_JOB = "ArchiveJob"
53
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
54
REQ_QUERY_JOBS = "QueryJobs"
55
REQ_QUERY_INSTANCES = "QueryInstances"
56
REQ_QUERY_NODES = "QueryNodes"
57
REQ_QUERY_EXPORTS = "QueryExports"
58
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
59
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
60
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
61
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
62

    
63
DEF_CTMO = 10
64
DEF_RWTO = 60
65

    
66

    
67
class ProtocolError(errors.GenericError):
68
  """Denotes an error in the LUXI protocol"""
69

    
70

    
71
class ConnectionClosedError(ProtocolError):
72
  """Connection closed error"""
73

    
74

    
75
class TimeoutError(ProtocolError):
76
  """Operation timeout error"""
77

    
78

    
79
class RequestError(ProtocolError):
80
  """Error on request
81

82
  This signifies an error in the request format or request handling,
83
  but not (e.g.) an error in starting up an instance.
84

85
  Some common conditions that can trigger this exception:
86
    - job submission failed because the job data was wrong
87
    - query failed because required fields were missing
88

89
  """
90

    
91

    
92
class NoMasterError(ProtocolError):
93
  """The master cannot be reached
94

95
  This means that the master daemon is not running or the socket has
96
  been removed.
97

98
  """
99

    
100

    
101
class Transport:
102
  """Low-level transport class.
103

104
  This is used on the client side.
105

106
  This could be replace by any other class that provides the same
107
  semantics to the Client. This means:
108
    - can send messages and receive messages
109
    - safe for multithreading
110

111
  """
112

    
113
  def __init__(self, address, timeouts=None, eom=None):
114
    """Constructor for the Client class.
115

116
    Arguments:
117
      - address: a valid address the the used transport class
118
      - timeout: a list of timeouts, to be used on connect and read/write
119
      - eom: an identifier to be used as end-of-message which the
120
        upper-layer will guarantee that this identifier will not appear
121
        in any message
122

123
    There are two timeouts used since we might want to wait for a long
124
    time for a response, but the connect timeout should be lower.
125

126
    If not passed, we use a default of 10 and respectively 60 seconds.
127

128
    Note that on reading data, since the timeout applies to an
129
    invidual receive, it might be that the total duration is longer
130
    than timeout value passed (we make a hard limit at twice the read
131
    timeout).
132

133
    """
134
    self.address = address
135
    if timeouts is None:
136
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
137
    else:
138
      self._ctimeout, self._rwtimeout = timeouts
139

    
140
    self.socket = None
141
    self._buffer = ""
142
    self._msgs = collections.deque()
143

    
144
    if eom is None:
145
      self.eom = '\3'
146
    else:
147
      self.eom = eom
148

    
149
    try:
150
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
151
      self.socket.settimeout(self._ctimeout)
152
      try:
153
        self.socket.connect(address)
154
      except socket.timeout, err:
155
        raise TimeoutError("Connect timed out: %s" % str(err))
156
      except socket.error, err:
157
        if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
158
          raise NoMasterError(address)
159
        raise
160
      self.socket.settimeout(self._rwtimeout)
161
    except (socket.error, NoMasterError):
162
      if self.socket is not None:
163
        self.socket.close()
164
      self.socket = None
165
      raise
166

    
167
  def _CheckSocket(self):
168
    """Make sure we are connected.
169

170
    """
171
    if self.socket is None:
172
      raise ProtocolError("Connection is closed")
173

    
174
  def Send(self, msg):
175
    """Send a message.
176

177
    This just sends a message and doesn't wait for the response.
178

179
    """
180
    if self.eom in msg:
181
      raise ProtocolError("Message terminator found in payload")
182

    
183
    self._CheckSocket()
184
    try:
185
      # TODO: sendall is not guaranteed to send everything
186
      self.socket.sendall(msg + self.eom)
187
    except socket.timeout, err:
188
      raise TimeoutError("Sending timeout: %s" % str(err))
189

    
190
  def Recv(self):
191
    """Try to receive a message from the socket.
192

193
    In case we already have messages queued, we just return from the
194
    queue. Otherwise, we try to read data with a _rwtimeout network
195
    timeout, and making sure we don't go over 2x_rwtimeout as a global
196
    limit.
197

198
    """
199
    self._CheckSocket()
200
    etime = time.time() + self._rwtimeout
201
    while not self._msgs:
202
      if time.time() > etime:
203
        raise TimeoutError("Extended receive timeout")
204
      while True:
205
        try:
206
          data = self.socket.recv(4096)
207
        except socket.error, err:
208
          if err.args and err.args[0] == errno.EAGAIN:
209
            continue
210
          raise
211
        except socket.timeout, err:
212
          raise TimeoutError("Receive timeout: %s" % str(err))
213
        break
214
      if not data:
215
        raise ConnectionClosedError("Connection closed while reading")
216
      new_msgs = (self._buffer + data).split(self.eom)
217
      self._buffer = new_msgs.pop()
218
      self._msgs.extend(new_msgs)
219
    return self._msgs.popleft()
220

    
221
  def Call(self, msg):
222
    """Send a message and wait for the response.
223

224
    This is just a wrapper over Send and Recv.
225

226
    """
227
    self.Send(msg)
228
    return self.Recv()
229

    
230
  def Close(self):
231
    """Close the socket"""
232
    if self.socket is not None:
233
      self.socket.close()
234
      self.socket = None
235

    
236

    
237
def ParseRequest(msg):
238
  """Parses a LUXI request message.
239

240
  """
241
  try:
242
    request = serializer.LoadJson(msg)
243
  except ValueError, err:
244
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
245

    
246
  logging.debug("LUXI request: %s", request)
247

    
248
  if not isinstance(request, dict):
249
    logging.error("LUXI request not a dict: %r", msg)
250
    raise ProtocolError("Invalid LUXI request (not a dict)")
251

    
252
  method = request.get(KEY_METHOD, None)
253
  args = request.get(KEY_ARGS, None)
254
  if method is None or args is None:
255
    logging.error("LUXI request missing method or arguments: %r", msg)
256
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
257
                         " in request): %r") % msg)
258

    
259
  return (method, args)
260

    
261

    
262
def ParseResponse(msg):
263
  """Parses a LUXI response message.
264

265
  """
266
  # Parse the result
267
  try:
268
    data = serializer.LoadJson(msg)
269
  except Exception, err:
270
    raise ProtocolError("Error while deserializing response: %s" % str(err))
271

    
272
  # Validate response
273
  if not (isinstance(data, dict) and
274
          KEY_SUCCESS in data and
275
          KEY_RESULT in data):
276
    raise ProtocolError("Invalid response from server: %r" % data)
277

    
278
  return (data[KEY_SUCCESS], data[KEY_RESULT])
279

    
280

    
281
def FormatResponse(success, result):
282
  """Formats a LUXI response message.
283

284
  """
285
  response = {
286
    KEY_SUCCESS: success,
287
    KEY_RESULT: result,
288
    }
289

    
290
  logging.debug("LUXI response: %s", response)
291

    
292
  return serializer.DumpJson(response)
293

    
294

    
295
def FormatRequest(method, args):
296
  """Formats a LUXI request message.
297

298
  """
299
  # Build request
300
  request = {
301
    KEY_METHOD: method,
302
    KEY_ARGS: args,
303
    }
304

    
305
  # Serialize the request
306
  return serializer.DumpJson(request, indent=False)
307

    
308

    
309
def CallLuxiMethod(transport_cb, method, args):
310
  """Send a LUXI request via a transport and return the response.
311

312
  """
313
  assert callable(transport_cb)
314

    
315
  request_msg = FormatRequest(method, args)
316

    
317
  # Send request and wait for response
318
  response_msg = transport_cb(request_msg)
319

    
320
  (success, result) = ParseResponse(response_msg)
321

    
322
  if success:
323
    return result
324

    
325
  errors.MaybeRaise(result)
326
  raise RequestError(result)
327

    
328

    
329
class Client(object):
330
  """High-level client implementation.
331

332
  This uses a backing Transport-like class on top of which it
333
  implements data serialization/deserialization.
334

335
  """
336
  def __init__(self, address=None, timeouts=None, transport=Transport):
337
    """Constructor for the Client class.
338

339
    Arguments:
340
      - address: a valid address the the used transport class
341
      - timeout: a list of timeouts, to be used on connect and read/write
342
      - transport: a Transport-like class
343

344

345
    If timeout is not passed, the default timeouts of the transport
346
    class are used.
347

348
    """
349
    if address is None:
350
      address = constants.MASTER_SOCKET
351
    self.address = address
352
    self.timeouts = timeouts
353
    self.transport_class = transport
354
    self.transport = None
355
    self._InitTransport()
356

    
357
  def _InitTransport(self):
358
    """(Re)initialize the transport if needed.
359

360
    """
361
    if self.transport is None:
362
      self.transport = self.transport_class(self.address,
363
                                            timeouts=self.timeouts)
364

    
365
  def _CloseTransport(self):
366
    """Close the transport, ignoring errors.
367

368
    """
369
    if self.transport is None:
370
      return
371
    try:
372
      old_transp = self.transport
373
      self.transport = None
374
      old_transp.Close()
375
    except Exception: # pylint: disable-msg=W0703
376
      pass
377

    
378
  def _SendMethodCall(self, data):
379
    # Send request and wait for response
380
    try:
381
      self._InitTransport()
382
      return self.transport.Call(data)
383
    except Exception:
384
      self._CloseTransport()
385
      raise
386

    
387
  def CallMethod(self, method, args):
388
    """Send a generic request and return the response.
389

390
    """
391
    return CallLuxiMethod(self._SendMethodCall, method, args)
392

    
393
  def SetQueueDrainFlag(self, drain_flag):
394
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
395

    
396
  def SetWatcherPause(self, until):
397
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
398

    
399
  def SubmitJob(self, ops):
400
    ops_state = map(lambda op: op.__getstate__(), ops)
401
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
402

    
403
  def SubmitManyJobs(self, jobs):
404
    jobs_state = []
405
    for ops in jobs:
406
      jobs_state.append([op.__getstate__() for op in ops])
407
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
408

    
409
  def CancelJob(self, job_id):
410
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
411

    
412
  def ArchiveJob(self, job_id):
413
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
414

    
415
  def AutoArchiveJobs(self, age):
416
    timeout = (DEF_RWTO - 1) / 2
417
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
418

    
419
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
420
    timeout = (DEF_RWTO - 1) / 2
421
    while True:
422
      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
423
                               (job_id, fields, prev_job_info,
424
                                prev_log_serial, timeout))
425
      if result != constants.JOB_NOTCHANGED:
426
        break
427
    return result
428

    
429
  def QueryJobs(self, job_ids, fields):
430
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
431

    
432
  def QueryInstances(self, names, fields, use_locking):
433
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
434

    
435
  def QueryNodes(self, names, fields, use_locking):
436
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
437

    
438
  def QueryExports(self, nodes, use_locking):
439
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
440

    
441
  def QueryClusterInfo(self):
442
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
443

    
444
  def QueryConfigValues(self, fields):
445
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
446

    
447

    
448
# TODO: class Server(object)