Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ fad50141

History | View | Annotate | Download (8.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocl. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also be used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import simplejson
35
import time
36
import errno
37

    
38
from ganeti import opcodes
39
from ganeti import serializer
40
from ganeti import constants
41

    
42

    
43
KEY_REQUEST = 'request'
44
KEY_DATA = 'data'
45
REQ_SUBMIT = 'submit'
46
REQ_ABORT = 'abort'
47
REQ_QUERY = 'query'
48

    
49
DEF_CTMO = 10
50
DEF_RWTO = 60
51

    
52

    
53
class ProtocolError(Exception):
54
  """Denotes an error in the server communication"""
55

    
56

    
57
class ConnectionClosedError(ProtocolError):
58
  """Connection closed error"""
59

    
60

    
61
class TimeoutError(ProtocolError):
62
  """Operation timeout error"""
63

    
64

    
65
class EncodingError(ProtocolError):
66
  """Encoding failure on the sending side"""
67

    
68

    
69
class DecodingError(ProtocolError):
70
  """Decoding failure on the receiving side"""
71

    
72

    
73
class RequestError(ProtocolError):
74
  """Error on request
75

76
  This signifies an error in the request format or request handling,
77
  but not (e.g.) an error in starting up an instance.
78

79
  Some common conditions that can trigger this exception:
80
    - job submission failed because the job data was wrong
81
    - query failed because required fields were missing
82

83
  """
84

    
85
class NoMasterError(ProtocolError):
86
  """The master cannot be reached
87

88
  This means that the master daemon is not running or the socket has
89
  been removed.
90

91
  """
92

    
93

    
94
def SerializeJob(job):
95
  """Convert a job description to a string format.
96

97
  """
98
  return simplejson.dumps(job.__getstate__())
99

    
100

    
101
def UnserializeJob(data):
102
  """Load a job from a string format"""
103
  try:
104
    new_data = simplejson.loads(data)
105
  except Exception, err:
106
    raise DecodingError("Error while unserializing: %s" % str(err))
107
  job = opcodes.Job()
108
  job.__setstate__(new_data)
109
  return job
110

    
111

    
112
class Transport:
113
  """Low-level transport class.
114

115
  This is used on the client side.
116

117
  This could be replace by any other class that provides the same
118
  semantics to the Client. This means:
119
    - can send messages and receive messages
120
    - safe for multithreading
121

122
  """
123

    
124
  def __init__(self, address, timeouts=None, eom=None):
125
    """Constructor for the Client class.
126

127
    Arguments:
128
      - address: a valid address the the used transport class
129
      - timeout: a list of timeouts, to be used on connect and read/write
130
      - eom: an identifier to be used as end-of-message which the
131
        upper-layer will guarantee that this identifier will not appear
132
        in any message
133

134
    There are two timeouts used since we might want to wait for a long
135
    time for a response, but the connect timeout should be lower.
136

137
    If not passed, we use a default of 10 and respectively 60 seconds.
138

139
    Note that on reading data, since the timeout applies to an
140
    invidual receive, it might be that the total duration is longer
141
    than timeout value passed (we make a hard limit at twice the read
142
    timeout).
143

144
    """
145
    self.address = address
146
    if timeouts is None:
147
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
148
    else:
149
      self._ctimeout, self._rwtimeout = timeouts
150

    
151
    self.socket = None
152
    self._buffer = ""
153
    self._msgs = collections.deque()
154

    
155
    if eom is None:
156
      self.eom = '\3'
157
    else:
158
      self.eom = eom
159

    
160
    try:
161
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
162
      self.socket.settimeout(self._ctimeout)
163
      try:
164
        self.socket.connect(address)
165
      except socket.timeout, err:
166
        raise TimeoutError("Connect timed out: %s" % str(err))
167
      except socket.error, err:
168
        if err.args[0] == errno.ENOENT:
169
          raise NoMasterError((address,))
170
        raise
171
      self.socket.settimeout(self._rwtimeout)
172
    except (socket.error, NoMasterError):
173
      if self.socket is not None:
174
        self.socket.close()
175
      self.socket = None
176
      raise
177

    
178
  def _CheckSocket(self):
179
    """Make sure we are connected.
180

181
    """
182
    if self.socket is None:
183
      raise ProtocolError("Connection is closed")
184

    
185
  def Send(self, msg):
186
    """Send a message.
187

188
    This just sends a message and doesn't wait for the response.
189

190
    """
191
    if self.eom in msg:
192
      raise EncodingError("Message terminator found in payload")
193
    self._CheckSocket()
194
    try:
195
      self.socket.sendall(msg + self.eom)
196
    except socket.timeout, err:
197
      raise TimeoutError("Sending timeout: %s" % str(err))
198

    
199
  def Recv(self):
200
    """Try to receive a messae from the socket.
201

202
    In case we already have messages queued, we just return from the
203
    queue. Otherwise, we try to read data with a _rwtimeout network
204
    timeout, and making sure we don't go over 2x_rwtimeout as a global
205
    limit.
206

207
    """
208
    self._CheckSocket()
209
    etime = time.time() + self._rwtimeout
210
    while not self._msgs:
211
      if time.time() > etime:
212
        raise TimeoutError("Extended receive timeout")
213
      try:
214
        data = self.socket.recv(4096)
215
      except socket.timeout, err:
216
        raise TimeoutError("Receive timeout: %s" % str(err))
217
      if not data:
218
        raise ConnectionClosedError("Connection closed while reading")
219
      new_msgs = (self._buffer + data).split(self.eom)
220
      self._buffer = new_msgs.pop()
221
      self._msgs.extend(new_msgs)
222
    return self._msgs.popleft()
223

    
224
  def Call(self, msg):
225
    """Send a message and wait for the response.
226

227
    This is just a wrapper over Send and Recv.
228

229
    """
230
    self.Send(msg)
231
    return self.Recv()
232

    
233
  def Close(self):
234
    """Close the socket"""
235
    if self.socket is not None:
236
      self.socket.close()
237
      self.socket = None
238

    
239

    
240
class Client(object):
241
  """High-level client implementation.
242

243
  This uses a backing Transport-like class on top of which it
244
  implements data serialization/deserialization.
245

246
  """
247
  def __init__(self, address=None, timeouts=None, transport=Transport):
248
    """Constructor for the Client class.
249

250
    Arguments:
251
      - address: a valid address the the used transport class
252
      - timeout: a list of timeouts, to be used on connect and read/write
253
      - transport: a Transport-like class
254

255

256
    If timeout is not passed, the default timeouts of the transport
257
    class are used.
258

259
    """
260
    if address is None:
261
      address = constants.MASTER_SOCKET
262
    self.transport = transport(address, timeouts=timeouts)
263

    
264
  def SendRequest(self, request, data):
265
    """Send a generic request and return the response.
266

267
    """
268
    msg = {KEY_REQUEST: request, KEY_DATA: data}
269
    result = self.transport.Call(serializer.DumpJson(msg, indent=False))
270
    try:
271
      data = serializer.LoadJson(result)
272
    except Exception, err:
273
      raise ProtocolError("Error while deserializing response: %s" % str(err))
274
    if (not isinstance(data, dict) or
275
        'success' not in data or
276
        'result' not in data):
277
      raise DecodingError("Invalid response from server: %s" % str(data))
278
    return data
279

    
280
  def SubmitJob(self, job):
281
    """Submit a job"""
282
    result = self.SendRequest(REQ_SUBMIT, SerializeJob(job))
283
    if not result['success']:
284
      raise RequestError(result['result'])
285
    return result['result']
286

    
287
  def Query(self, data):
288
    """Make a query"""
289
    result = self.SendRequest(REQ_QUERY, data)
290
    if not result['success']:
291
      raise RequestError(result[result])
292
    result = result['result']
293
    if data["object"] == "jobs":
294
      # custom job processing of query values
295
      for row in result:
296
        for idx, field in enumerate(data["fields"]):
297
          if field == "op_list":
298
            row[idx] = [opcodes.OpCode.LoadOpCode(i) for i in row[idx]]
299
    return result