Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 56d8ff91

History | View | Annotate | Download (10.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36

    
37
from ganeti import serializer
38
from ganeti import constants
39
from ganeti import errors
40

    
41

    
42
KEY_METHOD = 'method'
43
KEY_ARGS = 'args'
44
KEY_SUCCESS = "success"
45
KEY_RESULT = "result"
46

    
47
REQ_SUBMIT_JOB = "SubmitJob"
48
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
49
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
50
REQ_CANCEL_JOB = "CancelJob"
51
REQ_ARCHIVE_JOB = "ArchiveJob"
52
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
53
REQ_QUERY_JOBS = "QueryJobs"
54
REQ_QUERY_INSTANCES = "QueryInstances"
55
REQ_QUERY_NODES = "QueryNodes"
56
REQ_QUERY_EXPORTS = "QueryExports"
57
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
58
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
59
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
60

    
61
DEF_CTMO = 10
62
DEF_RWTO = 60
63

    
64

    
65
class ProtocolError(Exception):
66
  """Denotes an error in the server communication"""
67

    
68

    
69
class ConnectionClosedError(ProtocolError):
70
  """Connection closed error"""
71

    
72

    
73
class TimeoutError(ProtocolError):
74
  """Operation timeout error"""
75

    
76

    
77
class EncodingError(ProtocolError):
78
  """Encoding failure on the sending side"""
79

    
80

    
81
class DecodingError(ProtocolError):
82
  """Decoding failure on the receiving side"""
83

    
84

    
85
class RequestError(ProtocolError):
86
  """Error on request
87

88
  This signifies an error in the request format or request handling,
89
  but not (e.g.) an error in starting up an instance.
90

91
  Some common conditions that can trigger this exception:
92
    - job submission failed because the job data was wrong
93
    - query failed because required fields were missing
94

95
  """
96

    
97

    
98
class NoMasterError(ProtocolError):
99
  """The master cannot be reached
100

101
  This means that the master daemon is not running or the socket has
102
  been removed.
103

104
  """
105

    
106

    
107
class Transport:
108
  """Low-level transport class.
109

110
  This is used on the client side.
111

112
  This could be replace by any other class that provides the same
113
  semantics to the Client. This means:
114
    - can send messages and receive messages
115
    - safe for multithreading
116

117
  """
118

    
119
  def __init__(self, address, timeouts=None, eom=None):
120
    """Constructor for the Client class.
121

122
    Arguments:
123
      - address: a valid address the the used transport class
124
      - timeout: a list of timeouts, to be used on connect and read/write
125
      - eom: an identifier to be used as end-of-message which the
126
        upper-layer will guarantee that this identifier will not appear
127
        in any message
128

129
    There are two timeouts used since we might want to wait for a long
130
    time for a response, but the connect timeout should be lower.
131

132
    If not passed, we use a default of 10 and respectively 60 seconds.
133

134
    Note that on reading data, since the timeout applies to an
135
    invidual receive, it might be that the total duration is longer
136
    than timeout value passed (we make a hard limit at twice the read
137
    timeout).
138

139
    """
140
    self.address = address
141
    if timeouts is None:
142
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
143
    else:
144
      self._ctimeout, self._rwtimeout = timeouts
145

    
146
    self.socket = None
147
    self._buffer = ""
148
    self._msgs = collections.deque()
149

    
150
    if eom is None:
151
      self.eom = '\3'
152
    else:
153
      self.eom = eom
154

    
155
    try:
156
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
157
      self.socket.settimeout(self._ctimeout)
158
      try:
159
        self.socket.connect(address)
160
      except socket.timeout, err:
161
        raise TimeoutError("Connect timed out: %s" % str(err))
162
      except socket.error, err:
163
        if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
164
          raise NoMasterError((address,))
165
        raise
166
      self.socket.settimeout(self._rwtimeout)
167
    except (socket.error, NoMasterError):
168
      if self.socket is not None:
169
        self.socket.close()
170
      self.socket = None
171
      raise
172

    
173
  def _CheckSocket(self):
174
    """Make sure we are connected.
175

176
    """
177
    if self.socket is None:
178
      raise ProtocolError("Connection is closed")
179

    
180
  def Send(self, msg):
181
    """Send a message.
182

183
    This just sends a message and doesn't wait for the response.
184

185
    """
186
    if self.eom in msg:
187
      raise EncodingError("Message terminator found in payload")
188
    self._CheckSocket()
189
    try:
190
      # TODO: sendall is not guaranteed to send everything
191
      self.socket.sendall(msg + self.eom)
192
    except socket.timeout, err:
193
      raise TimeoutError("Sending timeout: %s" % str(err))
194

    
195
  def Recv(self):
196
    """Try to receive a message from the socket.
197

198
    In case we already have messages queued, we just return from the
199
    queue. Otherwise, we try to read data with a _rwtimeout network
200
    timeout, and making sure we don't go over 2x_rwtimeout as a global
201
    limit.
202

203
    """
204
    self._CheckSocket()
205
    etime = time.time() + self._rwtimeout
206
    while not self._msgs:
207
      if time.time() > etime:
208
        raise TimeoutError("Extended receive timeout")
209
      while True:
210
        try:
211
          data = self.socket.recv(4096)
212
        except socket.error, err:
213
          if err.args and err.args[0] == errno.EAGAIN:
214
            continue
215
          raise
216
        except socket.timeout, err:
217
          raise TimeoutError("Receive timeout: %s" % str(err))
218
        break
219
      if not data:
220
        raise ConnectionClosedError("Connection closed while reading")
221
      new_msgs = (self._buffer + data).split(self.eom)
222
      self._buffer = new_msgs.pop()
223
      self._msgs.extend(new_msgs)
224
    return self._msgs.popleft()
225

    
226
  def Call(self, msg):
227
    """Send a message and wait for the response.
228

229
    This is just a wrapper over Send and Recv.
230

231
    """
232
    self.Send(msg)
233
    return self.Recv()
234

    
235
  def Close(self):
236
    """Close the socket"""
237
    if self.socket is not None:
238
      self.socket.close()
239
      self.socket = None
240

    
241

    
242
class Client(object):
243
  """High-level client implementation.
244

245
  This uses a backing Transport-like class on top of which it
246
  implements data serialization/deserialization.
247

248
  """
249
  def __init__(self, address=None, timeouts=None, transport=Transport):
250
    """Constructor for the Client class.
251

252
    Arguments:
253
      - address: a valid address the the used transport class
254
      - timeout: a list of timeouts, to be used on connect and read/write
255
      - transport: a Transport-like class
256

257

258
    If timeout is not passed, the default timeouts of the transport
259
    class are used.
260

261
    """
262
    if address is None:
263
      address = constants.MASTER_SOCKET
264
    self.address = address
265
    self.timeouts = timeouts
266
    self.transport_class = transport
267
    self.transport = None
268
    self._InitTransport()
269

    
270
  def _InitTransport(self):
271
    """(Re)initialize the transport if needed.
272

273
    """
274
    if self.transport is None:
275
      self.transport = self.transport_class(self.address,
276
                                            timeouts=self.timeouts)
277

    
278
  def _CloseTransport(self):
279
    """Close the transport, ignoring errors.
280

281
    """
282
    if self.transport is None:
283
      return
284
    try:
285
      old_transp = self.transport
286
      self.transport = None
287
      old_transp.Close()
288
    except Exception:
289
      pass
290

    
291
  def CallMethod(self, method, args):
292
    """Send a generic request and return the response.
293

294
    """
295
    # Build request
296
    request = {
297
      KEY_METHOD: method,
298
      KEY_ARGS: args,
299
      }
300

    
301
    # Serialize the request
302
    send_data = serializer.DumpJson(request, indent=False)
303

    
304
    # Send request and wait for response
305
    try:
306
      self._InitTransport()
307
      result = self.transport.Call(send_data)
308
    except Exception:
309
      self._CloseTransport()
310
      raise
311

    
312
    # Parse the result
313
    try:
314
      data = serializer.LoadJson(result)
315
    except Exception, err:
316
      raise ProtocolError("Error while deserializing response: %s" % str(err))
317

    
318
    # Validate response
319
    if (not isinstance(data, dict) or
320
        KEY_SUCCESS not in data or
321
        KEY_RESULT not in data):
322
      raise DecodingError("Invalid response from server: %s" % str(data))
323

    
324
    result = data[KEY_RESULT]
325

    
326
    if not data[KEY_SUCCESS]:
327
      # TODO: decide on a standard exception
328
      if (isinstance(result, (tuple, list)) and len(result) == 2 and
329
          isinstance(result[1], (tuple, list))):
330
        # custom ganeti errors
331
        err_class = errors.GetErrorClass(result[0])
332
        if err_class is not None:
333
          raise err_class, tuple(result[1])
334

    
335
      raise RequestError(result)
336

    
337
    return result
338

    
339
  def SetQueueDrainFlag(self, drain_flag):
340
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
341

    
342
  def SubmitJob(self, ops):
343
    ops_state = map(lambda op: op.__getstate__(), ops)
344
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
345

    
346
  def SubmitManyJobs(self, jobs):
347
    jobs_state = []
348
    for ops in jobs:
349
      jobs_state.append([op.__getstate__() for op in ops])
350
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
351

    
352
  def CancelJob(self, job_id):
353
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
354

    
355
  def ArchiveJob(self, job_id):
356
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
357

    
358
  def AutoArchiveJobs(self, age):
359
    timeout = (DEF_RWTO - 1) / 2
360
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
361

    
362
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
363
    timeout = (DEF_RWTO - 1) / 2
364
    while True:
365
      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
366
                               (job_id, fields, prev_job_info,
367
                                prev_log_serial, timeout))
368
      if result != constants.JOB_NOTCHANGED:
369
        break
370
    return result
371

    
372
  def QueryJobs(self, job_ids, fields):
373
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
374

    
375
  def QueryInstances(self, names, fields, use_locking):
376
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
377

    
378
  def QueryNodes(self, names, fields, use_locking):
379
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
380

    
381
  def QueryExports(self, nodes, use_locking):
382
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
383

    
384
  def QueryClusterInfo(self):
385
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
386

    
387
  def QueryConfigValues(self, fields):
388
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
389

    
390

    
391
# TODO: class Server(object)