Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ ee6c7b94

History | View | Annotate | Download (8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocl. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also be used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36

    
37
from ganeti import serializer
38
from ganeti import constants
39

    
40

    
41
KEY_METHOD = 'method'
42
KEY_ARGS = 'args'
43
KEY_SUCCESS = "success"
44
KEY_RESULT = "result"
45

    
46
REQ_SUBMIT_JOB = "SubmitJob"
47
REQ_CANCEL_JOB = "CancelJob"
48
REQ_ARCHIVE_JOB = "ArchiveJob"
49
REQ_QUERY_JOBS = "QueryJobs"
50
REQ_QUERY_INSTANCES = "QueryInstances"
51

    
52
DEF_CTMO = 10
53
DEF_RWTO = 60
54

    
55

    
56
class ProtocolError(Exception):
57
  """Denotes an error in the server communication"""
58

    
59

    
60
class ConnectionClosedError(ProtocolError):
61
  """Connection closed error"""
62

    
63

    
64
class TimeoutError(ProtocolError):
65
  """Operation timeout error"""
66

    
67

    
68
class EncodingError(ProtocolError):
69
  """Encoding failure on the sending side"""
70

    
71

    
72
class DecodingError(ProtocolError):
73
  """Decoding failure on the receiving side"""
74

    
75

    
76
class RequestError(ProtocolError):
77
  """Error on request
78

79
  This signifies an error in the request format or request handling,
80
  but not (e.g.) an error in starting up an instance.
81

82
  Some common conditions that can trigger this exception:
83
    - job submission failed because the job data was wrong
84
    - query failed because required fields were missing
85

86
  """
87

    
88

    
89
class NoMasterError(ProtocolError):
90
  """The master cannot be reached
91

92
  This means that the master daemon is not running or the socket has
93
  been removed.
94

95
  """
96

    
97

    
98
class Transport:
99
  """Low-level transport class.
100

101
  This is used on the client side.
102

103
  This could be replace by any other class that provides the same
104
  semantics to the Client. This means:
105
    - can send messages and receive messages
106
    - safe for multithreading
107

108
  """
109

    
110
  def __init__(self, address, timeouts=None, eom=None):
111
    """Constructor for the Client class.
112

113
    Arguments:
114
      - address: a valid address the the used transport class
115
      - timeout: a list of timeouts, to be used on connect and read/write
116
      - eom: an identifier to be used as end-of-message which the
117
        upper-layer will guarantee that this identifier will not appear
118
        in any message
119

120
    There are two timeouts used since we might want to wait for a long
121
    time for a response, but the connect timeout should be lower.
122

123
    If not passed, we use a default of 10 and respectively 60 seconds.
124

125
    Note that on reading data, since the timeout applies to an
126
    invidual receive, it might be that the total duration is longer
127
    than timeout value passed (we make a hard limit at twice the read
128
    timeout).
129

130
    """
131
    self.address = address
132
    if timeouts is None:
133
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
134
    else:
135
      self._ctimeout, self._rwtimeout = timeouts
136

    
137
    self.socket = None
138
    self._buffer = ""
139
    self._msgs = collections.deque()
140

    
141
    if eom is None:
142
      self.eom = '\3'
143
    else:
144
      self.eom = eom
145

    
146
    try:
147
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
148
      self.socket.settimeout(self._ctimeout)
149
      try:
150
        self.socket.connect(address)
151
      except socket.timeout, err:
152
        raise TimeoutError("Connect timed out: %s" % str(err))
153
      except socket.error, err:
154
        if err.args[0] == errno.ENOENT:
155
          raise NoMasterError((address,))
156
        raise
157
      self.socket.settimeout(self._rwtimeout)
158
    except (socket.error, NoMasterError):
159
      if self.socket is not None:
160
        self.socket.close()
161
      self.socket = None
162
      raise
163

    
164
  def _CheckSocket(self):
165
    """Make sure we are connected.
166

167
    """
168
    if self.socket is None:
169
      raise ProtocolError("Connection is closed")
170

    
171
  def Send(self, msg):
172
    """Send a message.
173

174
    This just sends a message and doesn't wait for the response.
175

176
    """
177
    if self.eom in msg:
178
      raise EncodingError("Message terminator found in payload")
179
    self._CheckSocket()
180
    try:
181
      self.socket.sendall(msg + self.eom)
182
    except socket.timeout, err:
183
      raise TimeoutError("Sending timeout: %s" % str(err))
184

    
185
  def Recv(self):
186
    """Try to receive a messae from the socket.
187

188
    In case we already have messages queued, we just return from the
189
    queue. Otherwise, we try to read data with a _rwtimeout network
190
    timeout, and making sure we don't go over 2x_rwtimeout as a global
191
    limit.
192

193
    """
194
    self._CheckSocket()
195
    etime = time.time() + self._rwtimeout
196
    while not self._msgs:
197
      if time.time() > etime:
198
        raise TimeoutError("Extended receive timeout")
199
      try:
200
        data = self.socket.recv(4096)
201
      except socket.timeout, err:
202
        raise TimeoutError("Receive timeout: %s" % str(err))
203
      if not data:
204
        raise ConnectionClosedError("Connection closed while reading")
205
      new_msgs = (self._buffer + data).split(self.eom)
206
      self._buffer = new_msgs.pop()
207
      self._msgs.extend(new_msgs)
208
    return self._msgs.popleft()
209

    
210
  def Call(self, msg):
211
    """Send a message and wait for the response.
212

213
    This is just a wrapper over Send and Recv.
214

215
    """
216
    self.Send(msg)
217
    return self.Recv()
218

    
219
  def Close(self):
220
    """Close the socket"""
221
    if self.socket is not None:
222
      self.socket.close()
223
      self.socket = None
224

    
225

    
226
class Client(object):
227
  """High-level client implementation.
228

229
  This uses a backing Transport-like class on top of which it
230
  implements data serialization/deserialization.
231

232
  """
233
  def __init__(self, address=None, timeouts=None, transport=Transport):
234
    """Constructor for the Client class.
235

236
    Arguments:
237
      - address: a valid address the the used transport class
238
      - timeout: a list of timeouts, to be used on connect and read/write
239
      - transport: a Transport-like class
240

241

242
    If timeout is not passed, the default timeouts of the transport
243
    class are used.
244

245
    """
246
    if address is None:
247
      address = constants.MASTER_SOCKET
248
    self.transport = transport(address, timeouts=timeouts)
249

    
250
  def CallMethod(self, method, args):
251
    """Send a generic request and return the response.
252

253
    """
254
    # Build request
255
    request = {
256
      KEY_METHOD: method,
257
      KEY_ARGS: args,
258
      }
259

    
260
    # Send request and wait for response
261
    result = self.transport.Call(serializer.DumpJson(request, indent=False))
262
    try:
263
      data = serializer.LoadJson(result)
264
    except Exception, err:
265
      raise ProtocolError("Error while deserializing response: %s" % str(err))
266

    
267
    # Validate response
268
    if (not isinstance(data, dict) or
269
        KEY_SUCCESS not in data or
270
        KEY_RESULT not in data):
271
      raise DecodingError("Invalid response from server: %s" % str(data))
272

    
273
    if not data[KEY_SUCCESS]:
274
      # TODO: decide on a standard exception
275
      raise RequestError(data[KEY_RESULT])
276

    
277
    return data[KEY_RESULT]
278

    
279
  def SubmitJob(self, ops):
280
    ops_state = map(lambda op: op.__getstate__(), ops)
281
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
282

    
283
  def CancelJob(self, job_id):
284
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
285

    
286
  def ArchiveJob(self, job_id):
287
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
288

    
289
  def QueryJobs(self, job_ids, fields):
290
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
291

    
292
  def QueryInstances(self, names, fields):
293
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
294

    
295
# TODO: class Server(object)