Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 2467e0d3

History | View | Annotate | Download (7.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocl. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also be used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import simplejson
35
import time
36
import errno
37

    
38
from ganeti import opcodes
39
from ganeti import serializer
40
from ganeti import constants
41

    
42

    
43
KEY_METHOD = 'method'
44
KEY_ARGS = 'args'
45
KEY_SUCCESS = "success"
46
KEY_RESULT = "result"
47

    
48
REQ_SUBMIT_JOB = "SubmitJob"
49
REQ_CANCEL_JOB = "CancelJob"
50
REQ_ARCHIVE_JOB = "ArchiveJob"
51
REQ_QUERY_JOBS = "QueryJobs"
52

    
53
DEF_CTMO = 10
54
DEF_RWTO = 60
55

    
56

    
57
class ProtocolError(Exception):
58
  """Denotes an error in the server communication"""
59

    
60

    
61
class ConnectionClosedError(ProtocolError):
62
  """Connection closed error"""
63

    
64

    
65
class TimeoutError(ProtocolError):
66
  """Operation timeout error"""
67

    
68

    
69
class EncodingError(ProtocolError):
70
  """Encoding failure on the sending side"""
71

    
72

    
73
class DecodingError(ProtocolError):
74
  """Decoding failure on the receiving side"""
75

    
76

    
77
class RequestError(ProtocolError):
78
  """Error on request
79

80
  This signifies an error in the request format or request handling,
81
  but not (e.g.) an error in starting up an instance.
82

83
  Some common conditions that can trigger this exception:
84
    - job submission failed because the job data was wrong
85
    - query failed because required fields were missing
86

87
  """
88

    
89

    
90
class NoMasterError(ProtocolError):
91
  """The master cannot be reached
92

93
  This means that the master daemon is not running or the socket has
94
  been removed.
95

96
  """
97

    
98

    
99
class Transport:
100
  """Low-level transport class.
101

102
  This is used on the client side.
103

104
  This could be replace by any other class that provides the same
105
  semantics to the Client. This means:
106
    - can send messages and receive messages
107
    - safe for multithreading
108

109
  """
110

    
111
  def __init__(self, address, timeouts=None, eom=None):
112
    """Constructor for the Client class.
113

114
    Arguments:
115
      - address: a valid address the the used transport class
116
      - timeout: a list of timeouts, to be used on connect and read/write
117
      - eom: an identifier to be used as end-of-message which the
118
        upper-layer will guarantee that this identifier will not appear
119
        in any message
120

121
    There are two timeouts used since we might want to wait for a long
122
    time for a response, but the connect timeout should be lower.
123

124
    If not passed, we use a default of 10 and respectively 60 seconds.
125

126
    Note that on reading data, since the timeout applies to an
127
    invidual receive, it might be that the total duration is longer
128
    than timeout value passed (we make a hard limit at twice the read
129
    timeout).
130

131
    """
132
    self.address = address
133
    if timeouts is None:
134
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
135
    else:
136
      self._ctimeout, self._rwtimeout = timeouts
137

    
138
    self.socket = None
139
    self._buffer = ""
140
    self._msgs = collections.deque()
141

    
142
    if eom is None:
143
      self.eom = '\3'
144
    else:
145
      self.eom = eom
146

    
147
    try:
148
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
149
      self.socket.settimeout(self._ctimeout)
150
      try:
151
        self.socket.connect(address)
152
      except socket.timeout, err:
153
        raise TimeoutError("Connect timed out: %s" % str(err))
154
      except socket.error, err:
155
        if err.args[0] == errno.ENOENT:
156
          raise NoMasterError((address,))
157
        raise
158
      self.socket.settimeout(self._rwtimeout)
159
    except (socket.error, NoMasterError):
160
      if self.socket is not None:
161
        self.socket.close()
162
      self.socket = None
163
      raise
164

    
165
  def _CheckSocket(self):
166
    """Make sure we are connected.
167

168
    """
169
    if self.socket is None:
170
      raise ProtocolError("Connection is closed")
171

    
172
  def Send(self, msg):
173
    """Send a message.
174

175
    This just sends a message and doesn't wait for the response.
176

177
    """
178
    if self.eom in msg:
179
      raise EncodingError("Message terminator found in payload")
180
    self._CheckSocket()
181
    try:
182
      self.socket.sendall(msg + self.eom)
183
    except socket.timeout, err:
184
      raise TimeoutError("Sending timeout: %s" % str(err))
185

    
186
  def Recv(self):
187
    """Try to receive a messae from the socket.
188

189
    In case we already have messages queued, we just return from the
190
    queue. Otherwise, we try to read data with a _rwtimeout network
191
    timeout, and making sure we don't go over 2x_rwtimeout as a global
192
    limit.
193

194
    """
195
    self._CheckSocket()
196
    etime = time.time() + self._rwtimeout
197
    while not self._msgs:
198
      if time.time() > etime:
199
        raise TimeoutError("Extended receive timeout")
200
      try:
201
        data = self.socket.recv(4096)
202
      except socket.timeout, err:
203
        raise TimeoutError("Receive timeout: %s" % str(err))
204
      if not data:
205
        raise ConnectionClosedError("Connection closed while reading")
206
      new_msgs = (self._buffer + data).split(self.eom)
207
      self._buffer = new_msgs.pop()
208
      self._msgs.extend(new_msgs)
209
    return self._msgs.popleft()
210

    
211
  def Call(self, msg):
212
    """Send a message and wait for the response.
213

214
    This is just a wrapper over Send and Recv.
215

216
    """
217
    self.Send(msg)
218
    return self.Recv()
219

    
220
  def Close(self):
221
    """Close the socket"""
222
    if self.socket is not None:
223
      self.socket.close()
224
      self.socket = None
225

    
226

    
227
class Client(object):
228
  """High-level client implementation.
229

230
  This uses a backing Transport-like class on top of which it
231
  implements data serialization/deserialization.
232

233
  """
234
  def __init__(self, address=None, timeouts=None, transport=Transport):
235
    """Constructor for the Client class.
236

237
    Arguments:
238
      - address: a valid address the the used transport class
239
      - timeout: a list of timeouts, to be used on connect and read/write
240
      - transport: a Transport-like class
241

242

243
    If timeout is not passed, the default timeouts of the transport
244
    class are used.
245

246
    """
247
    if address is None:
248
      address = constants.MASTER_SOCKET
249
    self.transport = transport(address, timeouts=timeouts)
250

    
251
  def CallMethod(self, method, args):
252
    """Send a generic request and return the response.
253

254
    """
255
    # Build request
256
    request = {
257
      KEY_METHOD: method,
258
      KEY_ARGS: args,
259
      }
260

    
261
    # Send request and wait for response
262
    result = self.transport.Call(serializer.DumpJson(request, indent=False))
263
    try:
264
      data = serializer.LoadJson(result)
265
    except Exception, err:
266
      raise ProtocolError("Error while deserializing response: %s" % str(err))
267

    
268
    # Validate response
269
    if (not isinstance(data, dict) or
270
        KEY_SUCCESS not in data or
271
        KEY_RESULT not in data):
272
      raise DecodingError("Invalid response from server: %s" % str(data))
273

    
274
    if not data[KEY_SUCCESS]:
275
      # TODO: decide on a standard exception
276
      raise RequestError(data[KEY_RESULT])
277

    
278
    return data[KEY_RESULT]
279

    
280
  def SubmitJob(self, ops):
281
    ops_state = map(lambda op: op.__getstate__(), ops)
282
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
283

    
284
  def CancelJob(self, job_id):
285
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
286

    
287
  def ArchiveJob(self, job_id):
288
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
289

    
290
  def QueryJobs(self, job_ids, fields):
291
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
292

    
293
# TODO: class Server(object)