Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 0a1e74d9

History | View | Annotate | Download (7.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocl. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also be used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36

    
37
from ganeti import serializer
38
from ganeti import constants
39

    
40

    
41
KEY_METHOD = 'method'
42
KEY_ARGS = 'args'
43
KEY_SUCCESS = "success"
44
KEY_RESULT = "result"
45

    
46
REQ_SUBMIT_JOB = "SubmitJob"
47
REQ_CANCEL_JOB = "CancelJob"
48
REQ_ARCHIVE_JOB = "ArchiveJob"
49
REQ_QUERY_JOBS = "QueryJobs"
50

    
51
DEF_CTMO = 10
52
DEF_RWTO = 60
53

    
54

    
55
class ProtocolError(Exception):
56
  """Denotes an error in the server communication"""
57

    
58

    
59
class ConnectionClosedError(ProtocolError):
60
  """Connection closed error"""
61

    
62

    
63
class TimeoutError(ProtocolError):
64
  """Operation timeout error"""
65

    
66

    
67
class EncodingError(ProtocolError):
68
  """Encoding failure on the sending side"""
69

    
70

    
71
class DecodingError(ProtocolError):
72
  """Decoding failure on the receiving side"""
73

    
74

    
75
class RequestError(ProtocolError):
76
  """Error on request
77

78
  This signifies an error in the request format or request handling,
79
  but not (e.g.) an error in starting up an instance.
80

81
  Some common conditions that can trigger this exception:
82
    - job submission failed because the job data was wrong
83
    - query failed because required fields were missing
84

85
  """
86

    
87

    
88
class NoMasterError(ProtocolError):
89
  """The master cannot be reached
90

91
  This means that the master daemon is not running or the socket has
92
  been removed.
93

94
  """
95

    
96

    
97
class Transport:
98
  """Low-level transport class.
99

100
  This is used on the client side.
101

102
  This could be replace by any other class that provides the same
103
  semantics to the Client. This means:
104
    - can send messages and receive messages
105
    - safe for multithreading
106

107
  """
108

    
109
  def __init__(self, address, timeouts=None, eom=None):
110
    """Constructor for the Client class.
111

112
    Arguments:
113
      - address: a valid address the the used transport class
114
      - timeout: a list of timeouts, to be used on connect and read/write
115
      - eom: an identifier to be used as end-of-message which the
116
        upper-layer will guarantee that this identifier will not appear
117
        in any message
118

119
    There are two timeouts used since we might want to wait for a long
120
    time for a response, but the connect timeout should be lower.
121

122
    If not passed, we use a default of 10 and respectively 60 seconds.
123

124
    Note that on reading data, since the timeout applies to an
125
    invidual receive, it might be that the total duration is longer
126
    than timeout value passed (we make a hard limit at twice the read
127
    timeout).
128

129
    """
130
    self.address = address
131
    if timeouts is None:
132
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
133
    else:
134
      self._ctimeout, self._rwtimeout = timeouts
135

    
136
    self.socket = None
137
    self._buffer = ""
138
    self._msgs = collections.deque()
139

    
140
    if eom is None:
141
      self.eom = '\3'
142
    else:
143
      self.eom = eom
144

    
145
    try:
146
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
147
      self.socket.settimeout(self._ctimeout)
148
      try:
149
        self.socket.connect(address)
150
      except socket.timeout, err:
151
        raise TimeoutError("Connect timed out: %s" % str(err))
152
      except socket.error, err:
153
        if err.args[0] == errno.ENOENT:
154
          raise NoMasterError((address,))
155
        raise
156
      self.socket.settimeout(self._rwtimeout)
157
    except (socket.error, NoMasterError):
158
      if self.socket is not None:
159
        self.socket.close()
160
      self.socket = None
161
      raise
162

    
163
  def _CheckSocket(self):
164
    """Make sure we are connected.
165

166
    """
167
    if self.socket is None:
168
      raise ProtocolError("Connection is closed")
169

    
170
  def Send(self, msg):
171
    """Send a message.
172

173
    This just sends a message and doesn't wait for the response.
174

175
    """
176
    if self.eom in msg:
177
      raise EncodingError("Message terminator found in payload")
178
    self._CheckSocket()
179
    try:
180
      self.socket.sendall(msg + self.eom)
181
    except socket.timeout, err:
182
      raise TimeoutError("Sending timeout: %s" % str(err))
183

    
184
  def Recv(self):
185
    """Try to receive a messae from the socket.
186

187
    In case we already have messages queued, we just return from the
188
    queue. Otherwise, we try to read data with a _rwtimeout network
189
    timeout, and making sure we don't go over 2x_rwtimeout as a global
190
    limit.
191

192
    """
193
    self._CheckSocket()
194
    etime = time.time() + self._rwtimeout
195
    while not self._msgs:
196
      if time.time() > etime:
197
        raise TimeoutError("Extended receive timeout")
198
      try:
199
        data = self.socket.recv(4096)
200
      except socket.timeout, err:
201
        raise TimeoutError("Receive timeout: %s" % str(err))
202
      if not data:
203
        raise ConnectionClosedError("Connection closed while reading")
204
      new_msgs = (self._buffer + data).split(self.eom)
205
      self._buffer = new_msgs.pop()
206
      self._msgs.extend(new_msgs)
207
    return self._msgs.popleft()
208

    
209
  def Call(self, msg):
210
    """Send a message and wait for the response.
211

212
    This is just a wrapper over Send and Recv.
213

214
    """
215
    self.Send(msg)
216
    return self.Recv()
217

    
218
  def Close(self):
219
    """Close the socket"""
220
    if self.socket is not None:
221
      self.socket.close()
222
      self.socket = None
223

    
224

    
225
class Client(object):
226
  """High-level client implementation.
227

228
  This uses a backing Transport-like class on top of which it
229
  implements data serialization/deserialization.
230

231
  """
232
  def __init__(self, address=None, timeouts=None, transport=Transport):
233
    """Constructor for the Client class.
234

235
    Arguments:
236
      - address: a valid address the the used transport class
237
      - timeout: a list of timeouts, to be used on connect and read/write
238
      - transport: a Transport-like class
239

240

241
    If timeout is not passed, the default timeouts of the transport
242
    class are used.
243

244
    """
245
    if address is None:
246
      address = constants.MASTER_SOCKET
247
    self.transport = transport(address, timeouts=timeouts)
248

    
249
  def CallMethod(self, method, args):
250
    """Send a generic request and return the response.
251

252
    """
253
    # Build request
254
    request = {
255
      KEY_METHOD: method,
256
      KEY_ARGS: args,
257
      }
258

    
259
    # Send request and wait for response
260
    result = self.transport.Call(serializer.DumpJson(request, indent=False))
261
    try:
262
      data = serializer.LoadJson(result)
263
    except Exception, err:
264
      raise ProtocolError("Error while deserializing response: %s" % str(err))
265

    
266
    # Validate response
267
    if (not isinstance(data, dict) or
268
        KEY_SUCCESS not in data or
269
        KEY_RESULT not in data):
270
      raise DecodingError("Invalid response from server: %s" % str(data))
271

    
272
    if not data[KEY_SUCCESS]:
273
      # TODO: decide on a standard exception
274
      raise RequestError(data[KEY_RESULT])
275

    
276
    return data[KEY_RESULT]
277

    
278
  def SubmitJob(self, ops):
279
    ops_state = map(lambda op: op.__getstate__(), ops)
280
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
281

    
282
  def CancelJob(self, job_id):
283
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
284

    
285
  def ArchiveJob(self, job_id):
286
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
287

    
288
  def QueryJobs(self, job_ids, fields):
289
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
290

    
291
# TODO: class Server(object)