Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 3d8548c4

History | View | Annotate | Download (8.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocl. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also be used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import simplejson
35
import time
36
import errno
37

    
38
from ganeti import opcodes
39
from ganeti import serializer
40
from ganeti import constants
41

    
42

    
43
KEY_METHOD = 'method'
44
KEY_ARGS = 'args'
45
KEY_SUCCESS = "success"
46
KEY_RESULT = "result"
47

    
48
REQ_SUBMIT = 'submit'
49
REQ_ABORT = 'abort'
50
REQ_QUERY = 'query'
51

    
52
DEF_CTMO = 10
53
DEF_RWTO = 60
54

    
55

    
56
class ProtocolError(Exception):
57
  """Denotes an error in the server communication"""
58

    
59

    
60
class ConnectionClosedError(ProtocolError):
61
  """Connection closed error"""
62

    
63

    
64
class TimeoutError(ProtocolError):
65
  """Operation timeout error"""
66

    
67

    
68
class EncodingError(ProtocolError):
69
  """Encoding failure on the sending side"""
70

    
71

    
72
class DecodingError(ProtocolError):
73
  """Decoding failure on the receiving side"""
74

    
75

    
76
class RequestError(ProtocolError):
77
  """Error on request
78

79
  This signifies an error in the request format or request handling,
80
  but not (e.g.) an error in starting up an instance.
81

82
  Some common conditions that can trigger this exception:
83
    - job submission failed because the job data was wrong
84
    - query failed because required fields were missing
85

86
  """
87

    
88

    
89
class NoMasterError(ProtocolError):
90
  """The master cannot be reached
91

92
  This means that the master daemon is not running or the socket has
93
  been removed.
94

95
  """
96

    
97

    
98
def SerializeJob(job):
99
  """Convert a job description to a string format.
100

101
  """
102
  return simplejson.dumps(job.__getstate__())
103

    
104

    
105
def UnserializeJob(data):
106
  """Load a job from a string format"""
107
  try:
108
    new_data = simplejson.loads(data)
109
  except Exception, err:
110
    raise DecodingError("Error while unserializing: %s" % str(err))
111
  job = opcodes.Job()
112
  job.__setstate__(new_data)
113
  return job
114

    
115

    
116
class Transport:
117
  """Low-level transport class.
118

119
  This is used on the client side.
120

121
  This could be replace by any other class that provides the same
122
  semantics to the Client. This means:
123
    - can send messages and receive messages
124
    - safe for multithreading
125

126
  """
127

    
128
  def __init__(self, address, timeouts=None, eom=None):
129
    """Constructor for the Client class.
130

131
    Arguments:
132
      - address: a valid address the the used transport class
133
      - timeout: a list of timeouts, to be used on connect and read/write
134
      - eom: an identifier to be used as end-of-message which the
135
        upper-layer will guarantee that this identifier will not appear
136
        in any message
137

138
    There are two timeouts used since we might want to wait for a long
139
    time for a response, but the connect timeout should be lower.
140

141
    If not passed, we use a default of 10 and respectively 60 seconds.
142

143
    Note that on reading data, since the timeout applies to an
144
    invidual receive, it might be that the total duration is longer
145
    than timeout value passed (we make a hard limit at twice the read
146
    timeout).
147

148
    """
149
    self.address = address
150
    if timeouts is None:
151
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
152
    else:
153
      self._ctimeout, self._rwtimeout = timeouts
154

    
155
    self.socket = None
156
    self._buffer = ""
157
    self._msgs = collections.deque()
158

    
159
    if eom is None:
160
      self.eom = '\3'
161
    else:
162
      self.eom = eom
163

    
164
    try:
165
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
166
      self.socket.settimeout(self._ctimeout)
167
      try:
168
        self.socket.connect(address)
169
      except socket.timeout, err:
170
        raise TimeoutError("Connect timed out: %s" % str(err))
171
      except socket.error, err:
172
        if err.args[0] == errno.ENOENT:
173
          raise NoMasterError((address,))
174
        raise
175
      self.socket.settimeout(self._rwtimeout)
176
    except (socket.error, NoMasterError):
177
      if self.socket is not None:
178
        self.socket.close()
179
      self.socket = None
180
      raise
181

    
182
  def _CheckSocket(self):
183
    """Make sure we are connected.
184

185
    """
186
    if self.socket is None:
187
      raise ProtocolError("Connection is closed")
188

    
189
  def Send(self, msg):
190
    """Send a message.
191

192
    This just sends a message and doesn't wait for the response.
193

194
    """
195
    if self.eom in msg:
196
      raise EncodingError("Message terminator found in payload")
197
    self._CheckSocket()
198
    try:
199
      self.socket.sendall(msg + self.eom)
200
    except socket.timeout, err:
201
      raise TimeoutError("Sending timeout: %s" % str(err))
202

    
203
  def Recv(self):
204
    """Try to receive a messae from the socket.
205

206
    In case we already have messages queued, we just return from the
207
    queue. Otherwise, we try to read data with a _rwtimeout network
208
    timeout, and making sure we don't go over 2x_rwtimeout as a global
209
    limit.
210

211
    """
212
    self._CheckSocket()
213
    etime = time.time() + self._rwtimeout
214
    while not self._msgs:
215
      if time.time() > etime:
216
        raise TimeoutError("Extended receive timeout")
217
      try:
218
        data = self.socket.recv(4096)
219
      except socket.timeout, err:
220
        raise TimeoutError("Receive timeout: %s" % str(err))
221
      if not data:
222
        raise ConnectionClosedError("Connection closed while reading")
223
      new_msgs = (self._buffer + data).split(self.eom)
224
      self._buffer = new_msgs.pop()
225
      self._msgs.extend(new_msgs)
226
    return self._msgs.popleft()
227

    
228
  def Call(self, msg):
229
    """Send a message and wait for the response.
230

231
    This is just a wrapper over Send and Recv.
232

233
    """
234
    self.Send(msg)
235
    return self.Recv()
236

    
237
  def Close(self):
238
    """Close the socket"""
239
    if self.socket is not None:
240
      self.socket.close()
241
      self.socket = None
242

    
243

    
244
class Client(object):
245
  """High-level client implementation.
246

247
  This uses a backing Transport-like class on top of which it
248
  implements data serialization/deserialization.
249

250
  """
251
  def __init__(self, address=None, timeouts=None, transport=Transport):
252
    """Constructor for the Client class.
253

254
    Arguments:
255
      - address: a valid address the the used transport class
256
      - timeout: a list of timeouts, to be used on connect and read/write
257
      - transport: a Transport-like class
258

259

260
    If timeout is not passed, the default timeouts of the transport
261
    class are used.
262

263
    """
264
    if address is None:
265
      address = constants.MASTER_SOCKET
266
    self.transport = transport(address, timeouts=timeouts)
267

    
268
  def CallMethod(self, method, args):
269
    """Send a generic request and return the response.
270

271
    """
272
    # Build request
273
    request = {
274
      KEY_METHOD: method,
275
      KEY_ARGS: args,
276
      }
277

    
278
    # Send request and wait for response
279
    result = self.transport.Call(serializer.DumpJson(request, indent=False))
280
    try:
281
      data = serializer.LoadJson(result)
282
    except Exception, err:
283
      raise ProtocolError("Error while deserializing response: %s" % str(err))
284

    
285
    # Validate response
286
    if (not isinstance(data, dict) or
287
        KEY_SUCCESS not in data or
288
        KEY_RESULT not in data):
289
      raise DecodingError("Invalid response from server: %s" % str(data))
290

    
291
    if not data[KEY_SUCCESS]:
292
      # TODO: decide on a standard exception
293
      raise RequestError(data[KEY_RESULT])
294

    
295
    return data[KEY_RESULT]
296

    
297
  def SubmitJob(self, job):
298
    """Submit a job"""
299
    return self.CallMethod(REQ_SUBMIT, SerializeJob(job))
300

    
301
  def Query(self, data):
302
    """Make a query"""
303
    result = self.CallMethod(REQ_QUERY, data)
304
    if data["object"] == "jobs":
305
      # custom job processing of query values
306
      for row in result:
307
        for idx, field in enumerate(data["fields"]):
308
          if field == "op_list":
309
            row[idx] = [opcodes.OpCode.LoadOpCode(i) for i in row[idx]]
310
    return result
311

    
312
# TODO: class Server(object)