Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ d04aaa2f

History | View | Annotate | Download (10.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36

    
37
from ganeti import serializer
38
from ganeti import constants
39
from ganeti import errors
40

    
41

    
42
KEY_METHOD = 'method'
43
KEY_ARGS = 'args'
44
KEY_SUCCESS = "success"
45
KEY_RESULT = "result"
46

    
47
REQ_SUBMIT_JOB = "SubmitJob"
48
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
49
REQ_CANCEL_JOB = "CancelJob"
50
REQ_ARCHIVE_JOB = "ArchiveJob"
51
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
52
REQ_QUERY_JOBS = "QueryJobs"
53
REQ_QUERY_INSTANCES = "QueryInstances"
54
REQ_QUERY_NODES = "QueryNodes"
55
REQ_QUERY_EXPORTS = "QueryExports"
56
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
57
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
58
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
59

    
60
DEF_CTMO = 10
61
DEF_RWTO = 60
62

    
63

    
64
class ProtocolError(Exception):
65
  """Denotes an error in the server communication"""
66

    
67

    
68
class ConnectionClosedError(ProtocolError):
69
  """Connection closed error"""
70

    
71

    
72
class TimeoutError(ProtocolError):
73
  """Operation timeout error"""
74

    
75

    
76
class EncodingError(ProtocolError):
77
  """Encoding failure on the sending side"""
78

    
79

    
80
class DecodingError(ProtocolError):
81
  """Decoding failure on the receiving side"""
82

    
83

    
84
class RequestError(ProtocolError):
85
  """Error on request
86

87
  This signifies an error in the request format or request handling,
88
  but not (e.g.) an error in starting up an instance.
89

90
  Some common conditions that can trigger this exception:
91
    - job submission failed because the job data was wrong
92
    - query failed because required fields were missing
93

94
  """
95

    
96

    
97
class NoMasterError(ProtocolError):
98
  """The master cannot be reached
99

100
  This means that the master daemon is not running or the socket has
101
  been removed.
102

103
  """
104

    
105

    
106
class Transport:
107
  """Low-level transport class.
108

109
  This is used on the client side.
110

111
  This could be replace by any other class that provides the same
112
  semantics to the Client. This means:
113
    - can send messages and receive messages
114
    - safe for multithreading
115

116
  """
117

    
118
  def __init__(self, address, timeouts=None, eom=None):
119
    """Constructor for the Client class.
120

121
    Arguments:
122
      - address: a valid address the the used transport class
123
      - timeout: a list of timeouts, to be used on connect and read/write
124
      - eom: an identifier to be used as end-of-message which the
125
        upper-layer will guarantee that this identifier will not appear
126
        in any message
127

128
    There are two timeouts used since we might want to wait for a long
129
    time for a response, but the connect timeout should be lower.
130

131
    If not passed, we use a default of 10 and respectively 60 seconds.
132

133
    Note that on reading data, since the timeout applies to an
134
    invidual receive, it might be that the total duration is longer
135
    than timeout value passed (we make a hard limit at twice the read
136
    timeout).
137

138
    """
139
    self.address = address
140
    if timeouts is None:
141
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
142
    else:
143
      self._ctimeout, self._rwtimeout = timeouts
144

    
145
    self.socket = None
146
    self._buffer = ""
147
    self._msgs = collections.deque()
148

    
149
    if eom is None:
150
      self.eom = '\3'
151
    else:
152
      self.eom = eom
153

    
154
    try:
155
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
156
      self.socket.settimeout(self._ctimeout)
157
      try:
158
        self.socket.connect(address)
159
      except socket.timeout, err:
160
        raise TimeoutError("Connect timed out: %s" % str(err))
161
      except socket.error, err:
162
        if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
163
          raise NoMasterError((address,))
164
        raise
165
      self.socket.settimeout(self._rwtimeout)
166
    except (socket.error, NoMasterError):
167
      if self.socket is not None:
168
        self.socket.close()
169
      self.socket = None
170
      raise
171

    
172
  def _CheckSocket(self):
173
    """Make sure we are connected.
174

175
    """
176
    if self.socket is None:
177
      raise ProtocolError("Connection is closed")
178

    
179
  def Send(self, msg):
180
    """Send a message.
181

182
    This just sends a message and doesn't wait for the response.
183

184
    """
185
    if self.eom in msg:
186
      raise EncodingError("Message terminator found in payload")
187
    self._CheckSocket()
188
    try:
189
      self.socket.sendall(msg + self.eom)
190
    except socket.timeout, err:
191
      raise TimeoutError("Sending timeout: %s" % str(err))
192

    
193
  def Recv(self):
194
    """Try to receive a messae from the socket.
195

196
    In case we already have messages queued, we just return from the
197
    queue. Otherwise, we try to read data with a _rwtimeout network
198
    timeout, and making sure we don't go over 2x_rwtimeout as a global
199
    limit.
200

201
    """
202
    self._CheckSocket()
203
    etime = time.time() + self._rwtimeout
204
    while not self._msgs:
205
      if time.time() > etime:
206
        raise TimeoutError("Extended receive timeout")
207
      try:
208
        data = self.socket.recv(4096)
209
      except socket.timeout, err:
210
        raise TimeoutError("Receive timeout: %s" % str(err))
211
      if not data:
212
        raise ConnectionClosedError("Connection closed while reading")
213
      new_msgs = (self._buffer + data).split(self.eom)
214
      self._buffer = new_msgs.pop()
215
      self._msgs.extend(new_msgs)
216
    return self._msgs.popleft()
217

    
218
  def Call(self, msg):
219
    """Send a message and wait for the response.
220

221
    This is just a wrapper over Send and Recv.
222

223
    """
224
    self.Send(msg)
225
    return self.Recv()
226

    
227
  def Close(self):
228
    """Close the socket"""
229
    if self.socket is not None:
230
      self.socket.close()
231
      self.socket = None
232

    
233

    
234
class Client(object):
235
  """High-level client implementation.
236

237
  This uses a backing Transport-like class on top of which it
238
  implements data serialization/deserialization.
239

240
  """
241
  def __init__(self, address=None, timeouts=None, transport=Transport):
242
    """Constructor for the Client class.
243

244
    Arguments:
245
      - address: a valid address the the used transport class
246
      - timeout: a list of timeouts, to be used on connect and read/write
247
      - transport: a Transport-like class
248

249

250
    If timeout is not passed, the default timeouts of the transport
251
    class are used.
252

253
    """
254
    if address is None:
255
      address = constants.MASTER_SOCKET
256
    self.address = address
257
    self.timeouts = timeouts
258
    self.transport_class = transport
259
    self.transport = None
260
    self._InitTransport()
261

    
262
  def _InitTransport(self):
263
    """(Re)initialize the transport if needed.
264

265
    """
266
    if self.transport is None:
267
      self.transport = self.transport_class(self.address,
268
                                            timeouts=self.timeouts)
269

    
270
  def _CloseTransport(self):
271
    """Close the transport, ignoring errors.
272

273
    """
274
    if self.transport is None:
275
      return
276
    try:
277
      old_transp = self.transport
278
      self.transport = None
279
      old_transp.Close()
280
    except Exception, err:
281
      pass
282

    
283
  def CallMethod(self, method, args):
284
    """Send a generic request and return the response.
285

286
    """
287
    # Build request
288
    request = {
289
      KEY_METHOD: method,
290
      KEY_ARGS: args,
291
      }
292

    
293
    # Serialize the request
294
    send_data = serializer.DumpJson(request, indent=False)
295

    
296
    # Send request and wait for response
297
    try:
298
      self._InitTransport()
299
      result = self.transport.Call(send_data)
300
    except Exception:
301
      self._CloseTransport()
302
      raise
303

    
304
    # Parse the result
305
    try:
306
      data = serializer.LoadJson(result)
307
    except Exception, err:
308
      raise ProtocolError("Error while deserializing response: %s" % str(err))
309

    
310
    # Validate response
311
    if (not isinstance(data, dict) or
312
        KEY_SUCCESS not in data or
313
        KEY_RESULT not in data):
314
      raise DecodingError("Invalid response from server: %s" % str(data))
315

    
316
    result = data[KEY_RESULT]
317

    
318
    if not data[KEY_SUCCESS]:
319
      # TODO: decide on a standard exception
320
      if (isinstance(result, (tuple, list)) and len(result) == 2 and
321
          isinstance(result[1], (tuple, list))):
322
        # custom ganeti errors
323
        err_class = errors.GetErrorClass(result[0])
324
        if err_class is not None:
325
          raise err_class, tuple(result[1])
326

    
327
      raise RequestError(result)
328

    
329
    return result
330

    
331
  def SetQueueDrainFlag(self, drain_flag):
332
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
333

    
334
  def SubmitJob(self, ops):
335
    ops_state = map(lambda op: op.__getstate__(), ops)
336
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
337

    
338
  def CancelJob(self, job_id):
339
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
340

    
341
  def ArchiveJob(self, job_id):
342
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
343

    
344
  def AutoArchiveJobs(self, age):
345
    timeout = (DEF_RWTO - 1) / 2
346
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
347

    
348
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
349
    timeout = (DEF_RWTO - 1) / 2
350
    while True:
351
      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
352
                               (job_id, fields, prev_job_info,
353
                                prev_log_serial, timeout))
354
      if result != constants.JOB_NOTCHANGED:
355
        break
356
    return result
357

    
358
  def QueryJobs(self, job_ids, fields):
359
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
360

    
361
  def QueryInstances(self, names, fields, use_locking):
362
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
363

    
364
  def QueryNodes(self, names, fields, use_locking):
365
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
366

    
367
  def QueryExports(self, nodes, use_locking):
368
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
369

    
370
  def QueryClusterInfo(self):
371
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
372

    
373
  def QueryConfigValues(self, fields):
374
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
375

    
376

    
377
# TODO: class Server(object)