Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 7c4d6c7b

History | View | Annotate | Download (10.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36

    
37
from ganeti import serializer
38
from ganeti import constants
39
from ganeti import errors
40

    
41

    
42
KEY_METHOD = 'method'
43
KEY_ARGS = 'args'
44
KEY_SUCCESS = "success"
45
KEY_RESULT = "result"
46

    
47
REQ_SUBMIT_JOB = "SubmitJob"
48
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
49
REQ_CANCEL_JOB = "CancelJob"
50
REQ_ARCHIVE_JOB = "ArchiveJob"
51
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
52
REQ_QUERY_JOBS = "QueryJobs"
53
REQ_QUERY_INSTANCES = "QueryInstances"
54
REQ_QUERY_NODES = "QueryNodes"
55
REQ_QUERY_EXPORTS = "QueryExports"
56
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
57
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
58
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
59

    
60
DEF_CTMO = 10
61
DEF_RWTO = 60
62

    
63

    
64
class ProtocolError(Exception):
65
  """Denotes an error in the server communication"""
66

    
67

    
68
class ConnectionClosedError(ProtocolError):
69
  """Connection closed error"""
70

    
71

    
72
class TimeoutError(ProtocolError):
73
  """Operation timeout error"""
74

    
75

    
76
class EncodingError(ProtocolError):
77
  """Encoding failure on the sending side"""
78

    
79

    
80
class DecodingError(ProtocolError):
81
  """Decoding failure on the receiving side"""
82

    
83

    
84
class RequestError(ProtocolError):
85
  """Error on request
86

87
  This signifies an error in the request format or request handling,
88
  but not (e.g.) an error in starting up an instance.
89

90
  Some common conditions that can trigger this exception:
91
    - job submission failed because the job data was wrong
92
    - query failed because required fields were missing
93

94
  """
95

    
96

    
97
class NoMasterError(ProtocolError):
98
  """The master cannot be reached
99

100
  This means that the master daemon is not running or the socket has
101
  been removed.
102

103
  """
104

    
105

    
106
class Transport:
107
  """Low-level transport class.
108

109
  This is used on the client side.
110

111
  This could be replace by any other class that provides the same
112
  semantics to the Client. This means:
113
    - can send messages and receive messages
114
    - safe for multithreading
115

116
  """
117

    
118
  def __init__(self, address, timeouts=None, eom=None):
119
    """Constructor for the Client class.
120

121
    Arguments:
122
      - address: a valid address the the used transport class
123
      - timeout: a list of timeouts, to be used on connect and read/write
124
      - eom: an identifier to be used as end-of-message which the
125
        upper-layer will guarantee that this identifier will not appear
126
        in any message
127

128
    There are two timeouts used since we might want to wait for a long
129
    time for a response, but the connect timeout should be lower.
130

131
    If not passed, we use a default of 10 and respectively 60 seconds.
132

133
    Note that on reading data, since the timeout applies to an
134
    invidual receive, it might be that the total duration is longer
135
    than timeout value passed (we make a hard limit at twice the read
136
    timeout).
137

138
    """
139
    self.address = address
140
    if timeouts is None:
141
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
142
    else:
143
      self._ctimeout, self._rwtimeout = timeouts
144

    
145
    self.socket = None
146
    self._buffer = ""
147
    self._msgs = collections.deque()
148

    
149
    if eom is None:
150
      self.eom = '\3'
151
    else:
152
      self.eom = eom
153

    
154
    try:
155
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
156
      self.socket.settimeout(self._ctimeout)
157
      try:
158
        self.socket.connect(address)
159
      except socket.timeout, err:
160
        raise TimeoutError("Connect timed out: %s" % str(err))
161
      except socket.error, err:
162
        if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
163
          raise NoMasterError((address,))
164
        raise
165
      self.socket.settimeout(self._rwtimeout)
166
    except (socket.error, NoMasterError):
167
      if self.socket is not None:
168
        self.socket.close()
169
      self.socket = None
170
      raise
171

    
172
  def _CheckSocket(self):
173
    """Make sure we are connected.
174

175
    """
176
    if self.socket is None:
177
      raise ProtocolError("Connection is closed")
178

    
179
  def Send(self, msg):
180
    """Send a message.
181

182
    This just sends a message and doesn't wait for the response.
183

184
    """
185
    if self.eom in msg:
186
      raise EncodingError("Message terminator found in payload")
187
    self._CheckSocket()
188
    try:
189
      # TODO: sendall is not guaranteed to send everything
190
      self.socket.sendall(msg + self.eom)
191
    except socket.timeout, err:
192
      raise TimeoutError("Sending timeout: %s" % str(err))
193

    
194
  def Recv(self):
195
    """Try to receive a message from the socket.
196

197
    In case we already have messages queued, we just return from the
198
    queue. Otherwise, we try to read data with a _rwtimeout network
199
    timeout, and making sure we don't go over 2x_rwtimeout as a global
200
    limit.
201

202
    """
203
    self._CheckSocket()
204
    etime = time.time() + self._rwtimeout
205
    while not self._msgs:
206
      if time.time() > etime:
207
        raise TimeoutError("Extended receive timeout")
208
      while True:
209
        try:
210
          data = self.socket.recv(4096)
211
        except socket.error, err:
212
          if err.args and err.args[0] == errno.EAGAIN:
213
            continue
214
          raise
215
        except socket.timeout, err:
216
          raise TimeoutError("Receive timeout: %s" % str(err))
217
        break
218
      if not data:
219
        raise ConnectionClosedError("Connection closed while reading")
220
      new_msgs = (self._buffer + data).split(self.eom)
221
      self._buffer = new_msgs.pop()
222
      self._msgs.extend(new_msgs)
223
    return self._msgs.popleft()
224

    
225
  def Call(self, msg):
226
    """Send a message and wait for the response.
227

228
    This is just a wrapper over Send and Recv.
229

230
    """
231
    self.Send(msg)
232
    return self.Recv()
233

    
234
  def Close(self):
235
    """Close the socket"""
236
    if self.socket is not None:
237
      self.socket.close()
238
      self.socket = None
239

    
240

    
241
class Client(object):
242
  """High-level client implementation.
243

244
  This uses a backing Transport-like class on top of which it
245
  implements data serialization/deserialization.
246

247
  """
248
  def __init__(self, address=None, timeouts=None, transport=Transport):
249
    """Constructor for the Client class.
250

251
    Arguments:
252
      - address: a valid address the the used transport class
253
      - timeout: a list of timeouts, to be used on connect and read/write
254
      - transport: a Transport-like class
255

256

257
    If timeout is not passed, the default timeouts of the transport
258
    class are used.
259

260
    """
261
    if address is None:
262
      address = constants.MASTER_SOCKET
263
    self.address = address
264
    self.timeouts = timeouts
265
    self.transport_class = transport
266
    self.transport = None
267
    self._InitTransport()
268

    
269
  def _InitTransport(self):
270
    """(Re)initialize the transport if needed.
271

272
    """
273
    if self.transport is None:
274
      self.transport = self.transport_class(self.address,
275
                                            timeouts=self.timeouts)
276

    
277
  def _CloseTransport(self):
278
    """Close the transport, ignoring errors.
279

280
    """
281
    if self.transport is None:
282
      return
283
    try:
284
      old_transp = self.transport
285
      self.transport = None
286
      old_transp.Close()
287
    except Exception:
288
      pass
289

    
290
  def CallMethod(self, method, args):
291
    """Send a generic request and return the response.
292

293
    """
294
    # Build request
295
    request = {
296
      KEY_METHOD: method,
297
      KEY_ARGS: args,
298
      }
299

    
300
    # Serialize the request
301
    send_data = serializer.DumpJson(request, indent=False)
302

    
303
    # Send request and wait for response
304
    try:
305
      self._InitTransport()
306
      result = self.transport.Call(send_data)
307
    except Exception:
308
      self._CloseTransport()
309
      raise
310

    
311
    # Parse the result
312
    try:
313
      data = serializer.LoadJson(result)
314
    except Exception, err:
315
      raise ProtocolError("Error while deserializing response: %s" % str(err))
316

    
317
    # Validate response
318
    if (not isinstance(data, dict) or
319
        KEY_SUCCESS not in data or
320
        KEY_RESULT not in data):
321
      raise DecodingError("Invalid response from server: %s" % str(data))
322

    
323
    result = data[KEY_RESULT]
324

    
325
    if not data[KEY_SUCCESS]:
326
      # TODO: decide on a standard exception
327
      if (isinstance(result, (tuple, list)) and len(result) == 2 and
328
          isinstance(result[1], (tuple, list))):
329
        # custom ganeti errors
330
        err_class = errors.GetErrorClass(result[0])
331
        if err_class is not None:
332
          raise err_class, tuple(result[1])
333

    
334
      raise RequestError(result)
335

    
336
    return result
337

    
338
  def SetQueueDrainFlag(self, drain_flag):
339
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
340

    
341
  def SubmitJob(self, ops):
342
    ops_state = map(lambda op: op.__getstate__(), ops)
343
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
344

    
345
  def CancelJob(self, job_id):
346
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
347

    
348
  def ArchiveJob(self, job_id):
349
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
350

    
351
  def AutoArchiveJobs(self, age):
352
    timeout = (DEF_RWTO - 1) / 2
353
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
354

    
355
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
356
    timeout = (DEF_RWTO - 1) / 2
357
    while True:
358
      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
359
                               (job_id, fields, prev_job_info,
360
                                prev_log_serial, timeout))
361
      if result != constants.JOB_NOTCHANGED:
362
        break
363
    return result
364

    
365
  def QueryJobs(self, job_ids, fields):
366
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
367

    
368
  def QueryInstances(self, names, fields, use_locking):
369
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
370

    
371
  def QueryNodes(self, names, fields, use_locking):
372
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
373

    
374
  def QueryExports(self, nodes, use_locking):
375
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
376

    
377
  def QueryClusterInfo(self):
378
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
379

    
380
  def QueryConfigValues(self, fields):
381
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
382

    
383

    
384
# TODO: class Server(object)