Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 04864530

History | View | Annotate | Download (8.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocl. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also be used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import simplejson
35
import time
36
import errno
37

    
38
from ganeti import opcodes
39
from ganeti import constants
40

    
41

    
42
KEY_REQUEST = 'request'
43
KEY_DATA = 'data'
44
REQ_SUBMIT = 'submit'
45
REQ_ABORT = 'abort'
46
REQ_QUERY = 'query'
47

    
48
DEF_CTMO = 10
49
DEF_RWTO = 60
50

    
51

    
52
class ProtocolError(Exception):
53
  """Denotes an error in the server communication"""
54

    
55

    
56
class ConnectionClosedError(ProtocolError):
57
  """Connection closed error"""
58

    
59

    
60
class TimeoutError(ProtocolError):
61
  """Operation timeout error"""
62

    
63

    
64
class EncodingError(ProtocolError):
65
  """Encoding failure on the sending side"""
66

    
67

    
68
class DecodingError(ProtocolError):
69
  """Decoding failure on the receiving side"""
70

    
71

    
72
class RequestError(ProtocolError):
73
  """Error on request
74

75
  This signifies an error in the request format or request handling,
76
  but not (e.g.) an error in starting up an instance.
77

78
  Some common conditions that can trigger this exception:
79
    - job submission failed because the job data was wrong
80
    - query failed because required fields were missing
81

82
  """
83

    
84
class NoMasterError(ProtocolError):
85
  """The master cannot be reached
86

87
  This means that the master daemon is not running or the socket has
88
  been removed.
89

90
  """
91

    
92

    
93
def SerializeJob(job):
94
  """Convert a job description to a string format.
95

96
  """
97
  return simplejson.dumps(job.__getstate__())
98

    
99

    
100
def UnserializeJob(data):
101
  """Load a job from a string format"""
102
  try:
103
    new_data = simplejson.loads(data)
104
  except Exception, err:
105
    raise DecodingError("Error while unserializing: %s" % str(err))
106
  job = opcodes.Job()
107
  job.__setstate__(new_data)
108
  return job
109

    
110

    
111
class Transport:
112
  """Low-level transport class.
113

114
  This is used on the client side.
115

116
  This could be replace by any other class that provides the same
117
  semantics to the Client. This means:
118
    - can send messages and receive messages
119
    - safe for multithreading
120

121
  """
122

    
123
  def __init__(self, address, timeouts=None, eom=None):
124
    """Constructor for the Client class.
125

126
    Arguments:
127
      - address: a valid address the the used transport class
128
      - timeout: a list of timeouts, to be used on connect and read/write
129
      - eom: an identifier to be used as end-of-message which the
130
        upper-layer will guarantee that this identifier will not appear
131
        in any message
132

133
    There are two timeouts used since we might want to wait for a long
134
    time for a response, but the connect timeout should be lower.
135

136
    If not passed, we use a default of 10 and respectively 60 seconds.
137

138
    Note that on reading data, since the timeout applies to an
139
    invidual receive, it might be that the total duration is longer
140
    than timeout value passed (we make a hard limit at twice the read
141
    timeout).
142

143
    """
144
    self.address = address
145
    if timeouts is None:
146
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
147
    else:
148
      self._ctimeout, self._rwtimeout = timeouts
149

    
150
    self.socket = None
151
    self._buffer = ""
152
    self._msgs = collections.deque()
153

    
154
    if eom is None:
155
      self.eom = '\3'
156
    else:
157
      self.eom = eom
158

    
159
    try:
160
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
161
      self.socket.settimeout(self._ctimeout)
162
      try:
163
        self.socket.connect(address)
164
      except socket.timeout, err:
165
        raise TimeoutError("Connect timed out: %s" % str(err))
166
      except socket.error, err:
167
        if err.args[0] == errno.ENOENT:
168
          raise NoMasterError((address,))
169
        raise
170
      self.socket.settimeout(self._rwtimeout)
171
    except (socket.error, NoMasterError):
172
      if self.socket is not None:
173
        self.socket.close()
174
      self.socket = None
175
      raise
176

    
177
  def _CheckSocket(self):
178
    """Make sure we are connected.
179

180
    """
181
    if self.socket is None:
182
      raise ProtocolError("Connection is closed")
183

    
184
  def Send(self, msg):
185
    """Send a message.
186

187
    This just sends a message and doesn't wait for the response.
188

189
    """
190
    if self.eom in msg:
191
      raise EncodingError("Message terminator found in payload")
192
    self._CheckSocket()
193
    try:
194
      self.socket.sendall(msg + self.eom)
195
    except socket.timeout, err:
196
      raise TimeoutError("Sending timeout: %s" % str(err))
197

    
198
  def Recv(self):
199
    """Try to receive a messae from the socket.
200

201
    In case we already have messages queued, we just return from the
202
    queue. Otherwise, we try to read data with a _rwtimeout network
203
    timeout, and making sure we don't go over 2x_rwtimeout as a global
204
    limit.
205

206
    """
207
    self._CheckSocket()
208
    etime = time.time() + self._rwtimeout
209
    while not self._msgs:
210
      if time.time() > etime:
211
        raise TimeoutError("Extended receive timeout")
212
      try:
213
        data = self.socket.recv(4096)
214
      except socket.timeout, err:
215
        raise TimeoutError("Receive timeout: %s" % str(err))
216
      if not data:
217
        raise ConnectionClosedError("Connection closed while reading")
218
      new_msgs = (self._buffer + data).split(self.eom)
219
      self._buffer = new_msgs.pop()
220
      self._msgs.extend(new_msgs)
221
    return self._msgs.popleft()
222

    
223
  def Call(self, msg):
224
    """Send a message and wait for the response.
225

226
    This is just a wrapper over Send and Recv.
227

228
    """
229
    self.Send(msg)
230
    return self.Recv()
231

    
232
  def Close(self):
233
    """Close the socket"""
234
    if self.socket is not None:
235
      self.socket.close()
236
      self.socket = None
237

    
238

    
239
class Client(object):
240
  """High-level client implementation.
241

242
  This uses a backing Transport-like class on top of which it
243
  implements data serialization/deserialization.
244

245
  """
246
  def __init__(self, address=None, timeouts=None, transport=Transport):
247
    """Constructor for the Client class.
248

249
    Arguments:
250
      - address: a valid address the the used transport class
251
      - timeout: a list of timeouts, to be used on connect and read/write
252
      - transport: a Transport-like class
253

254

255
    If timeout is not passed, the default timeouts of the transport
256
    class are used.
257

258
    """
259
    if address is None:
260
      address = constants.MASTER_SOCKET
261
    self.transport = transport(address, timeouts=timeouts)
262

    
263
  def SendRequest(self, request, data):
264
    """Send a generic request and return the response.
265

266
    """
267
    msg = {KEY_REQUEST: request, KEY_DATA: data}
268
    result = self.transport.Call(simplejson.dumps(msg))
269
    try:
270
      data = simplejson.loads(result)
271
    except Exception, err:
272
      raise ProtocolError("Error while deserializing response: %s" % str(err))
273
    if (not isinstance(data, dict) or
274
        'success' not in data or
275
        'result' not in data):
276
      raise DecodingError("Invalid response from server: %s" % str(data))
277
    return data
278

    
279
  def SubmitJob(self, job):
280
    """Submit a job"""
281
    result = self.SendRequest(REQ_SUBMIT, SerializeJob(job))
282
    if not result['success']:
283
      raise RequestError(result['result'])
284
    return result['result']
285

    
286
  def Query(self, data):
287
    """Make a query"""
288
    result = self.SendRequest(REQ_QUERY, data)
289
    if not result['success']:
290
      raise RequestError(result[result])
291
    result = result['result']
292
    if data["object"] == "jobs":
293
      # custom job processing of query values
294
      for row in result:
295
        for idx, field in enumerate(data["fields"]):
296
          if field == "op_list":
297
            row[idx] = [opcodes.OpCode.LoadOpCode(i) for i in row[idx]]
298
    return result