Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 0bbe448c

History | View | Annotate | Download (8.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocl. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also be used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import simplejson
35
import time
36
import errno
37

    
38
from ganeti import opcodes
39
from ganeti import serializer
40
from ganeti import constants
41

    
42

    
43
KEY_METHOD = 'method'
44
KEY_ARGS = 'args'
45
KEY_SUCCESS = "success"
46
KEY_RESULT = "result"
47

    
48
REQ_SUBMIT_JOB = "SubmitJob"
49
REQ_CANCEL_JOB = "CancelJob"
50
REQ_ARCHIVE_JOB = "ArchiveJob"
51
REQ_QUERY_JOBS = "QueryJobs"
52

    
53
DEF_CTMO = 10
54
DEF_RWTO = 60
55

    
56

    
57
class ProtocolError(Exception):
58
  """Denotes an error in the server communication"""
59

    
60

    
61
class ConnectionClosedError(ProtocolError):
62
  """Connection closed error"""
63

    
64

    
65
class TimeoutError(ProtocolError):
66
  """Operation timeout error"""
67

    
68

    
69
class EncodingError(ProtocolError):
70
  """Encoding failure on the sending side"""
71

    
72

    
73
class DecodingError(ProtocolError):
74
  """Decoding failure on the receiving side"""
75

    
76

    
77
class RequestError(ProtocolError):
78
  """Error on request
79

80
  This signifies an error in the request format or request handling,
81
  but not (e.g.) an error in starting up an instance.
82

83
  Some common conditions that can trigger this exception:
84
    - job submission failed because the job data was wrong
85
    - query failed because required fields were missing
86

87
  """
88

    
89

    
90
class NoMasterError(ProtocolError):
91
  """The master cannot be reached
92

93
  This means that the master daemon is not running or the socket has
94
  been removed.
95

96
  """
97

    
98

    
99
def SerializeJob(job):
100
  """Convert a job description to a string format.
101

102
  """
103
  return simplejson.dumps(job.__getstate__())
104

    
105

    
106
def UnserializeJob(data):
107
  """Load a job from a string format"""
108
  try:
109
    new_data = simplejson.loads(data)
110
  except Exception, err:
111
    raise DecodingError("Error while unserializing: %s" % str(err))
112
  job = opcodes.Job()
113
  job.__setstate__(new_data)
114
  return job
115

    
116

    
117
class Transport:
118
  """Low-level transport class.
119

120
  This is used on the client side.
121

122
  This could be replace by any other class that provides the same
123
  semantics to the Client. This means:
124
    - can send messages and receive messages
125
    - safe for multithreading
126

127
  """
128

    
129
  def __init__(self, address, timeouts=None, eom=None):
130
    """Constructor for the Client class.
131

132
    Arguments:
133
      - address: a valid address the the used transport class
134
      - timeout: a list of timeouts, to be used on connect and read/write
135
      - eom: an identifier to be used as end-of-message which the
136
        upper-layer will guarantee that this identifier will not appear
137
        in any message
138

139
    There are two timeouts used since we might want to wait for a long
140
    time for a response, but the connect timeout should be lower.
141

142
    If not passed, we use a default of 10 and respectively 60 seconds.
143

144
    Note that on reading data, since the timeout applies to an
145
    invidual receive, it might be that the total duration is longer
146
    than timeout value passed (we make a hard limit at twice the read
147
    timeout).
148

149
    """
150
    self.address = address
151
    if timeouts is None:
152
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
153
    else:
154
      self._ctimeout, self._rwtimeout = timeouts
155

    
156
    self.socket = None
157
    self._buffer = ""
158
    self._msgs = collections.deque()
159

    
160
    if eom is None:
161
      self.eom = '\3'
162
    else:
163
      self.eom = eom
164

    
165
    try:
166
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
167
      self.socket.settimeout(self._ctimeout)
168
      try:
169
        self.socket.connect(address)
170
      except socket.timeout, err:
171
        raise TimeoutError("Connect timed out: %s" % str(err))
172
      except socket.error, err:
173
        if err.args[0] == errno.ENOENT:
174
          raise NoMasterError((address,))
175
        raise
176
      self.socket.settimeout(self._rwtimeout)
177
    except (socket.error, NoMasterError):
178
      if self.socket is not None:
179
        self.socket.close()
180
      self.socket = None
181
      raise
182

    
183
  def _CheckSocket(self):
184
    """Make sure we are connected.
185

186
    """
187
    if self.socket is None:
188
      raise ProtocolError("Connection is closed")
189

    
190
  def Send(self, msg):
191
    """Send a message.
192

193
    This just sends a message and doesn't wait for the response.
194

195
    """
196
    if self.eom in msg:
197
      raise EncodingError("Message terminator found in payload")
198
    self._CheckSocket()
199
    try:
200
      self.socket.sendall(msg + self.eom)
201
    except socket.timeout, err:
202
      raise TimeoutError("Sending timeout: %s" % str(err))
203

    
204
  def Recv(self):
205
    """Try to receive a messae from the socket.
206

207
    In case we already have messages queued, we just return from the
208
    queue. Otherwise, we try to read data with a _rwtimeout network
209
    timeout, and making sure we don't go over 2x_rwtimeout as a global
210
    limit.
211

212
    """
213
    self._CheckSocket()
214
    etime = time.time() + self._rwtimeout
215
    while not self._msgs:
216
      if time.time() > etime:
217
        raise TimeoutError("Extended receive timeout")
218
      try:
219
        data = self.socket.recv(4096)
220
      except socket.timeout, err:
221
        raise TimeoutError("Receive timeout: %s" % str(err))
222
      if not data:
223
        raise ConnectionClosedError("Connection closed while reading")
224
      new_msgs = (self._buffer + data).split(self.eom)
225
      self._buffer = new_msgs.pop()
226
      self._msgs.extend(new_msgs)
227
    return self._msgs.popleft()
228

    
229
  def Call(self, msg):
230
    """Send a message and wait for the response.
231

232
    This is just a wrapper over Send and Recv.
233

234
    """
235
    self.Send(msg)
236
    return self.Recv()
237

    
238
  def Close(self):
239
    """Close the socket"""
240
    if self.socket is not None:
241
      self.socket.close()
242
      self.socket = None
243

    
244

    
245
class Client(object):
246
  """High-level client implementation.
247

248
  This uses a backing Transport-like class on top of which it
249
  implements data serialization/deserialization.
250

251
  """
252
  def __init__(self, address=None, timeouts=None, transport=Transport):
253
    """Constructor for the Client class.
254

255
    Arguments:
256
      - address: a valid address the the used transport class
257
      - timeout: a list of timeouts, to be used on connect and read/write
258
      - transport: a Transport-like class
259

260

261
    If timeout is not passed, the default timeouts of the transport
262
    class are used.
263

264
    """
265
    if address is None:
266
      address = constants.MASTER_SOCKET
267
    self.transport = transport(address, timeouts=timeouts)
268

    
269
  def CallMethod(self, method, args):
270
    """Send a generic request and return the response.
271

272
    """
273
    # Build request
274
    request = {
275
      KEY_METHOD: method,
276
      KEY_ARGS: args,
277
      }
278

    
279
    # Send request and wait for response
280
    result = self.transport.Call(serializer.DumpJson(request, indent=False))
281
    try:
282
      data = serializer.LoadJson(result)
283
    except Exception, err:
284
      raise ProtocolError("Error while deserializing response: %s" % str(err))
285

    
286
    # Validate response
287
    if (not isinstance(data, dict) or
288
        KEY_SUCCESS not in data or
289
        KEY_RESULT not in data):
290
      raise DecodingError("Invalid response from server: %s" % str(data))
291

    
292
    if not data[KEY_SUCCESS]:
293
      # TODO: decide on a standard exception
294
      raise RequestError(data[KEY_RESULT])
295

    
296
    return data[KEY_RESULT]
297

    
298
  def SubmitJob(self, ops):
299
    ops_state = map(lambda op: op.__getstate__(), ops)
300
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
301

    
302
  def CancelJob(self, job_id):
303
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
304

    
305
  def ArchiveJob(self, job_id):
306
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
307

    
308
  def QueryJobs(self, job_ids, fields):
309
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
310

    
311
# TODO: class Server(object)