Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 6c00d19a

History | View | Annotate | Download (10.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36

    
37
from ganeti import serializer
38
from ganeti import constants
39
from ganeti import errors
40

    
41

    
42
KEY_METHOD = 'method'
43
KEY_ARGS = 'args'
44
KEY_SUCCESS = "success"
45
KEY_RESULT = "result"
46

    
47
REQ_SUBMIT_JOB = "SubmitJob"
48
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
49
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
50
REQ_CANCEL_JOB = "CancelJob"
51
REQ_ARCHIVE_JOB = "ArchiveJob"
52
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
53
REQ_QUERY_JOBS = "QueryJobs"
54
REQ_QUERY_INSTANCES = "QueryInstances"
55
REQ_QUERY_NODES = "QueryNodes"
56
REQ_QUERY_EXPORTS = "QueryExports"
57
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
58
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
59
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
60

    
61
DEF_CTMO = 10
62
DEF_RWTO = 60
63

    
64

    
65
class ProtocolError(Exception):
66
  """Denotes an error in the server communication"""
67

    
68

    
69
class ConnectionClosedError(ProtocolError):
70
  """Connection closed error"""
71

    
72

    
73
class TimeoutError(ProtocolError):
74
  """Operation timeout error"""
75

    
76

    
77
class EncodingError(ProtocolError):
78
  """Encoding failure on the sending side"""
79

    
80

    
81
class DecodingError(ProtocolError):
82
  """Decoding failure on the receiving side"""
83

    
84

    
85
class RequestError(ProtocolError):
86
  """Error on request
87

88
  This signifies an error in the request format or request handling,
89
  but not (e.g.) an error in starting up an instance.
90

91
  Some common conditions that can trigger this exception:
92
    - job submission failed because the job data was wrong
93
    - query failed because required fields were missing
94

95
  """
96

    
97

    
98
class NoMasterError(ProtocolError):
99
  """The master cannot be reached
100

101
  This means that the master daemon is not running or the socket has
102
  been removed.
103

104
  """
105

    
106

    
107
class Transport:
108
  """Low-level transport class.
109

110
  This is used on the client side.
111

112
  This could be replace by any other class that provides the same
113
  semantics to the Client. This means:
114
    - can send messages and receive messages
115
    - safe for multithreading
116

117
  """
118

    
119
  def __init__(self, address, timeouts=None, eom=None):
120
    """Constructor for the Client class.
121

122
    Arguments:
123
      - address: a valid address the the used transport class
124
      - timeout: a list of timeouts, to be used on connect and read/write
125
      - eom: an identifier to be used as end-of-message which the
126
        upper-layer will guarantee that this identifier will not appear
127
        in any message
128

129
    There are two timeouts used since we might want to wait for a long
130
    time for a response, but the connect timeout should be lower.
131

132
    If not passed, we use a default of 10 and respectively 60 seconds.
133

134
    Note that on reading data, since the timeout applies to an
135
    invidual receive, it might be that the total duration is longer
136
    than timeout value passed (we make a hard limit at twice the read
137
    timeout).
138

139
    """
140
    self.address = address
141
    if timeouts is None:
142
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
143
    else:
144
      self._ctimeout, self._rwtimeout = timeouts
145

    
146
    self.socket = None
147
    self._buffer = ""
148
    self._msgs = collections.deque()
149

    
150
    if eom is None:
151
      self.eom = '\3'
152
    else:
153
      self.eom = eom
154

    
155
    try:
156
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
157
      self.socket.settimeout(self._ctimeout)
158
      try:
159
        self.socket.connect(address)
160
      except socket.timeout, err:
161
        raise TimeoutError("Connect timed out: %s" % str(err))
162
      except socket.error, err:
163
        if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
164
          raise NoMasterError((address,))
165
        raise
166
      self.socket.settimeout(self._rwtimeout)
167
    except (socket.error, NoMasterError):
168
      if self.socket is not None:
169
        self.socket.close()
170
      self.socket = None
171
      raise
172

    
173
  def _CheckSocket(self):
174
    """Make sure we are connected.
175

176
    """
177
    if self.socket is None:
178
      raise ProtocolError("Connection is closed")
179

    
180
  def Send(self, msg):
181
    """Send a message.
182

183
    This just sends a message and doesn't wait for the response.
184

185
    """
186
    if self.eom in msg:
187
      raise EncodingError("Message terminator found in payload")
188
    self._CheckSocket()
189
    try:
190
      self.socket.sendall(msg + self.eom)
191
    except socket.timeout, err:
192
      raise TimeoutError("Sending timeout: %s" % str(err))
193

    
194
  def Recv(self):
195
    """Try to receive a messae from the socket.
196

197
    In case we already have messages queued, we just return from the
198
    queue. Otherwise, we try to read data with a _rwtimeout network
199
    timeout, and making sure we don't go over 2x_rwtimeout as a global
200
    limit.
201

202
    """
203
    self._CheckSocket()
204
    etime = time.time() + self._rwtimeout
205
    while not self._msgs:
206
      if time.time() > etime:
207
        raise TimeoutError("Extended receive timeout")
208
      try:
209
        data = self.socket.recv(4096)
210
      except socket.timeout, err:
211
        raise TimeoutError("Receive timeout: %s" % str(err))
212
      if not data:
213
        raise ConnectionClosedError("Connection closed while reading")
214
      new_msgs = (self._buffer + data).split(self.eom)
215
      self._buffer = new_msgs.pop()
216
      self._msgs.extend(new_msgs)
217
    return self._msgs.popleft()
218

    
219
  def Call(self, msg):
220
    """Send a message and wait for the response.
221

222
    This is just a wrapper over Send and Recv.
223

224
    """
225
    self.Send(msg)
226
    return self.Recv()
227

    
228
  def Close(self):
229
    """Close the socket"""
230
    if self.socket is not None:
231
      self.socket.close()
232
      self.socket = None
233

    
234

    
235
class Client(object):
236
  """High-level client implementation.
237

238
  This uses a backing Transport-like class on top of which it
239
  implements data serialization/deserialization.
240

241
  """
242
  def __init__(self, address=None, timeouts=None, transport=Transport):
243
    """Constructor for the Client class.
244

245
    Arguments:
246
      - address: a valid address the the used transport class
247
      - timeout: a list of timeouts, to be used on connect and read/write
248
      - transport: a Transport-like class
249

250

251
    If timeout is not passed, the default timeouts of the transport
252
    class are used.
253

254
    """
255
    if address is None:
256
      address = constants.MASTER_SOCKET
257
    self.address = address
258
    self.timeouts = timeouts
259
    self.transport_class = transport
260
    self.transport = None
261
    self._InitTransport()
262

    
263
  def _InitTransport(self):
264
    """(Re)initialize the transport if needed.
265

266
    """
267
    if self.transport is None:
268
      self.transport = self.transport_class(self.address,
269
                                            timeouts=self.timeouts)
270

    
271
  def _CloseTransport(self):
272
    """Close the transport, ignoring errors.
273

274
    """
275
    if self.transport is None:
276
      return
277
    try:
278
      old_transp = self.transport
279
      self.transport = None
280
      old_transp.Close()
281
    except Exception, err:
282
      pass
283

    
284
  def CallMethod(self, method, args):
285
    """Send a generic request and return the response.
286

287
    """
288
    # Build request
289
    request = {
290
      KEY_METHOD: method,
291
      KEY_ARGS: args,
292
      }
293

    
294
    # Serialize the request
295
    send_data = serializer.DumpJson(request, indent=False)
296

    
297
    # Send request and wait for response
298
    try:
299
      self._InitTransport()
300
      result = self.transport.Call(send_data)
301
    except Exception:
302
      self._CloseTransport()
303
      raise
304

    
305
    # Parse the result
306
    try:
307
      data = serializer.LoadJson(result)
308
    except Exception, err:
309
      raise ProtocolError("Error while deserializing response: %s" % str(err))
310

    
311
    # Validate response
312
    if (not isinstance(data, dict) or
313
        KEY_SUCCESS not in data or
314
        KEY_RESULT not in data):
315
      raise DecodingError("Invalid response from server: %s" % str(data))
316

    
317
    result = data[KEY_RESULT]
318

    
319
    if not data[KEY_SUCCESS]:
320
      # TODO: decide on a standard exception
321
      if (isinstance(result, (tuple, list)) and len(result) == 2 and
322
          isinstance(result[1], (tuple, list))):
323
        # custom ganeti errors
324
        err_class = errors.GetErrorClass(result[0])
325
        if err_class is not None:
326
          raise err_class, tuple(result[1])
327

    
328
      raise RequestError(result)
329

    
330
    return result
331

    
332
  def SetQueueDrainFlag(self, drain_flag):
333
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
334

    
335
  def SubmitJob(self, ops):
336
    ops_state = map(lambda op: op.__getstate__(), ops)
337
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
338

    
339
  def SubmitManyJobs(self, jobs):
340
    jobs_state = []
341
    for ops in jobs:
342
      jobs_state.append([op.__getstate__() for op in ops])
343
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
344

    
345
  def CancelJob(self, job_id):
346
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
347

    
348
  def ArchiveJob(self, job_id):
349
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
350

    
351
  def AutoArchiveJobs(self, age):
352
    timeout = (DEF_RWTO - 1) / 2
353
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
354

    
355
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
356
    timeout = (DEF_RWTO - 1) / 2
357
    while True:
358
      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
359
                               (job_id, fields, prev_job_info,
360
                                prev_log_serial, timeout))
361
      if result != constants.JOB_NOTCHANGED:
362
        break
363
    return result
364

    
365
  def QueryJobs(self, job_ids, fields):
366
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
367

    
368
  def QueryInstances(self, names, fields, use_locking):
369
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
370

    
371
  def QueryNodes(self, names, fields, use_locking):
372
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
373

    
374
  def QueryExports(self, nodes, use_locking):
375
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
376

    
377
  def QueryClusterInfo(self):
378
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
379

    
380
  def QueryConfigValues(self, fields):
381
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
382

    
383

    
384
# TODO: class Server(object)