Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 23828f1c

History | View | Annotate | Download (9.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocl. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also be used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36

    
37
from ganeti import serializer
38
from ganeti import constants
39
from ganeti import errors
40

    
41

    
42
KEY_METHOD = 'method'
43
KEY_ARGS = 'args'
44
KEY_SUCCESS = "success"
45
KEY_RESULT = "result"
46

    
47
REQ_SUBMIT_JOB = "SubmitJob"
48
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
49
REQ_CANCEL_JOB = "CancelJob"
50
REQ_ARCHIVE_JOB = "ArchiveJob"
51
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
52
REQ_QUERY_JOBS = "QueryJobs"
53
REQ_QUERY_INSTANCES = "QueryInstances"
54
REQ_QUERY_NODES = "QueryNodes"
55
REQ_QUERY_EXPORTS = "QueryExports"
56
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
57
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
58

    
59
DEF_CTMO = 10
60
DEF_RWTO = 60
61

    
62

    
63
class ProtocolError(Exception):
64
  """Denotes an error in the server communication"""
65

    
66

    
67
class ConnectionClosedError(ProtocolError):
68
  """Connection closed error"""
69

    
70

    
71
class TimeoutError(ProtocolError):
72
  """Operation timeout error"""
73

    
74

    
75
class EncodingError(ProtocolError):
76
  """Encoding failure on the sending side"""
77

    
78

    
79
class DecodingError(ProtocolError):
80
  """Decoding failure on the receiving side"""
81

    
82

    
83
class RequestError(ProtocolError):
84
  """Error on request
85

86
  This signifies an error in the request format or request handling,
87
  but not (e.g.) an error in starting up an instance.
88

89
  Some common conditions that can trigger this exception:
90
    - job submission failed because the job data was wrong
91
    - query failed because required fields were missing
92

93
  """
94

    
95

    
96
class NoMasterError(ProtocolError):
97
  """The master cannot be reached
98

99
  This means that the master daemon is not running or the socket has
100
  been removed.
101

102
  """
103

    
104

    
105
class Transport:
106
  """Low-level transport class.
107

108
  This is used on the client side.
109

110
  This could be replace by any other class that provides the same
111
  semantics to the Client. This means:
112
    - can send messages and receive messages
113
    - safe for multithreading
114

115
  """
116

    
117
  def __init__(self, address, timeouts=None, eom=None):
118
    """Constructor for the Client class.
119

120
    Arguments:
121
      - address: a valid address the the used transport class
122
      - timeout: a list of timeouts, to be used on connect and read/write
123
      - eom: an identifier to be used as end-of-message which the
124
        upper-layer will guarantee that this identifier will not appear
125
        in any message
126

127
    There are two timeouts used since we might want to wait for a long
128
    time for a response, but the connect timeout should be lower.
129

130
    If not passed, we use a default of 10 and respectively 60 seconds.
131

132
    Note that on reading data, since the timeout applies to an
133
    invidual receive, it might be that the total duration is longer
134
    than timeout value passed (we make a hard limit at twice the read
135
    timeout).
136

137
    """
138
    self.address = address
139
    if timeouts is None:
140
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
141
    else:
142
      self._ctimeout, self._rwtimeout = timeouts
143

    
144
    self.socket = None
145
    self._buffer = ""
146
    self._msgs = collections.deque()
147

    
148
    if eom is None:
149
      self.eom = '\3'
150
    else:
151
      self.eom = eom
152

    
153
    try:
154
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
155
      self.socket.settimeout(self._ctimeout)
156
      try:
157
        self.socket.connect(address)
158
      except socket.timeout, err:
159
        raise TimeoutError("Connect timed out: %s" % str(err))
160
      except socket.error, err:
161
        if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
162
          raise NoMasterError((address,))
163
        raise
164
      self.socket.settimeout(self._rwtimeout)
165
    except (socket.error, NoMasterError):
166
      if self.socket is not None:
167
        self.socket.close()
168
      self.socket = None
169
      raise
170

    
171
  def _CheckSocket(self):
172
    """Make sure we are connected.
173

174
    """
175
    if self.socket is None:
176
      raise ProtocolError("Connection is closed")
177

    
178
  def Send(self, msg):
179
    """Send a message.
180

181
    This just sends a message and doesn't wait for the response.
182

183
    """
184
    if self.eom in msg:
185
      raise EncodingError("Message terminator found in payload")
186
    self._CheckSocket()
187
    try:
188
      self.socket.sendall(msg + self.eom)
189
    except socket.timeout, err:
190
      raise TimeoutError("Sending timeout: %s" % str(err))
191

    
192
  def Recv(self):
193
    """Try to receive a messae from the socket.
194

195
    In case we already have messages queued, we just return from the
196
    queue. Otherwise, we try to read data with a _rwtimeout network
197
    timeout, and making sure we don't go over 2x_rwtimeout as a global
198
    limit.
199

200
    """
201
    self._CheckSocket()
202
    etime = time.time() + self._rwtimeout
203
    while not self._msgs:
204
      if time.time() > etime:
205
        raise TimeoutError("Extended receive timeout")
206
      try:
207
        data = self.socket.recv(4096)
208
      except socket.timeout, err:
209
        raise TimeoutError("Receive timeout: %s" % str(err))
210
      if not data:
211
        raise ConnectionClosedError("Connection closed while reading")
212
      new_msgs = (self._buffer + data).split(self.eom)
213
      self._buffer = new_msgs.pop()
214
      self._msgs.extend(new_msgs)
215
    return self._msgs.popleft()
216

    
217
  def Call(self, msg):
218
    """Send a message and wait for the response.
219

220
    This is just a wrapper over Send and Recv.
221

222
    """
223
    self.Send(msg)
224
    return self.Recv()
225

    
226
  def Close(self):
227
    """Close the socket"""
228
    if self.socket is not None:
229
      self.socket.close()
230
      self.socket = None
231

    
232

    
233
class Client(object):
234
  """High-level client implementation.
235

236
  This uses a backing Transport-like class on top of which it
237
  implements data serialization/deserialization.
238

239
  """
240
  def __init__(self, address=None, timeouts=None, transport=Transport):
241
    """Constructor for the Client class.
242

243
    Arguments:
244
      - address: a valid address the the used transport class
245
      - timeout: a list of timeouts, to be used on connect and read/write
246
      - transport: a Transport-like class
247

248

249
    If timeout is not passed, the default timeouts of the transport
250
    class are used.
251

252
    """
253
    if address is None:
254
      address = constants.MASTER_SOCKET
255
    self.transport = transport(address, timeouts=timeouts)
256

    
257
  def CallMethod(self, method, args):
258
    """Send a generic request and return the response.
259

260
    """
261
    # Build request
262
    request = {
263
      KEY_METHOD: method,
264
      KEY_ARGS: args,
265
      }
266

    
267
    # Send request and wait for response
268
    result = self.transport.Call(serializer.DumpJson(request, indent=False))
269
    try:
270
      data = serializer.LoadJson(result)
271
    except Exception, err:
272
      raise ProtocolError("Error while deserializing response: %s" % str(err))
273

    
274
    # Validate response
275
    if (not isinstance(data, dict) or
276
        KEY_SUCCESS not in data or
277
        KEY_RESULT not in data):
278
      raise DecodingError("Invalid response from server: %s" % str(data))
279

    
280
    result = data[KEY_RESULT]
281

    
282
    if not data[KEY_SUCCESS]:
283
      # TODO: decide on a standard exception
284
      if (isinstance(result, (tuple, list)) and len(result) == 2 and
285
          isinstance(result[1], (tuple, list))):
286
        # custom ganeti errors
287
        err_class = errors.GetErrorClass(result[0])
288
        if err_class is not None:
289
          raise err_class, tuple(result[1])
290

    
291
      raise RequestError(result)
292

    
293
    return result
294

    
295
  def SetQueueDrainFlag(self, drain_flag):
296
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
297

    
298
  def SubmitJob(self, ops):
299
    ops_state = map(lambda op: op.__getstate__(), ops)
300
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
301

    
302
  def CancelJob(self, job_id):
303
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
304

    
305
  def ArchiveJob(self, job_id):
306
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
307

    
308
  def AutoArchiveJobs(self, age):
309
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, age)
310

    
311
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
312
    timeout = (DEF_RWTO - 1) / 2
313
    while True:
314
      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
315
                               (job_id, fields, prev_job_info,
316
                                prev_log_serial, timeout))
317
      if result != constants.JOB_NOTCHANGED:
318
        break
319
    return result
320

    
321
  def QueryJobs(self, job_ids, fields):
322
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
323

    
324
  def QueryInstances(self, names, fields):
325
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
326

    
327
  def QueryNodes(self, names, fields):
328
    return self.CallMethod(REQ_QUERY_NODES, (names, fields))
329

    
330
  def QueryExports(self, nodes):
331
    return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
332

    
333
  def QueryConfigValues(self, fields):
334
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
335

    
336

    
337
# TODO: class Server(object)