Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 6797ec29

History | View | Annotate | Download (9.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocl. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also be used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36

    
37
from ganeti import serializer
38
from ganeti import constants
39
from ganeti import errors
40

    
41

    
42
KEY_METHOD = 'method'
43
KEY_ARGS = 'args'
44
KEY_SUCCESS = "success"
45
KEY_RESULT = "result"
46

    
47
REQ_SUBMIT_JOB = "SubmitJob"
48
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
49
REQ_CANCEL_JOB = "CancelJob"
50
REQ_ARCHIVE_JOB = "ArchiveJob"
51
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
52
REQ_QUERY_JOBS = "QueryJobs"
53
REQ_QUERY_INSTANCES = "QueryInstances"
54
REQ_QUERY_NODES = "QueryNodes"
55
REQ_QUERY_EXPORTS = "QueryExports"
56
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
57

    
58
DEF_CTMO = 10
59
DEF_RWTO = 60
60

    
61

    
62
class ProtocolError(Exception):
63
  """Denotes an error in the server communication"""
64

    
65

    
66
class ConnectionClosedError(ProtocolError):
67
  """Connection closed error"""
68

    
69

    
70
class TimeoutError(ProtocolError):
71
  """Operation timeout error"""
72

    
73

    
74
class EncodingError(ProtocolError):
75
  """Encoding failure on the sending side"""
76

    
77

    
78
class DecodingError(ProtocolError):
79
  """Decoding failure on the receiving side"""
80

    
81

    
82
class RequestError(ProtocolError):
83
  """Error on request
84

85
  This signifies an error in the request format or request handling,
86
  but not (e.g.) an error in starting up an instance.
87

88
  Some common conditions that can trigger this exception:
89
    - job submission failed because the job data was wrong
90
    - query failed because required fields were missing
91

92
  """
93

    
94

    
95
class NoMasterError(ProtocolError):
96
  """The master cannot be reached
97

98
  This means that the master daemon is not running or the socket has
99
  been removed.
100

101
  """
102

    
103

    
104
class Transport:
105
  """Low-level transport class.
106

107
  This is used on the client side.
108

109
  This could be replace by any other class that provides the same
110
  semantics to the Client. This means:
111
    - can send messages and receive messages
112
    - safe for multithreading
113

114
  """
115

    
116
  def __init__(self, address, timeouts=None, eom=None):
117
    """Constructor for the Client class.
118

119
    Arguments:
120
      - address: a valid address the the used transport class
121
      - timeout: a list of timeouts, to be used on connect and read/write
122
      - eom: an identifier to be used as end-of-message which the
123
        upper-layer will guarantee that this identifier will not appear
124
        in any message
125

126
    There are two timeouts used since we might want to wait for a long
127
    time for a response, but the connect timeout should be lower.
128

129
    If not passed, we use a default of 10 and respectively 60 seconds.
130

131
    Note that on reading data, since the timeout applies to an
132
    invidual receive, it might be that the total duration is longer
133
    than timeout value passed (we make a hard limit at twice the read
134
    timeout).
135

136
    """
137
    self.address = address
138
    if timeouts is None:
139
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
140
    else:
141
      self._ctimeout, self._rwtimeout = timeouts
142

    
143
    self.socket = None
144
    self._buffer = ""
145
    self._msgs = collections.deque()
146

    
147
    if eom is None:
148
      self.eom = '\3'
149
    else:
150
      self.eom = eom
151

    
152
    try:
153
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
154
      self.socket.settimeout(self._ctimeout)
155
      try:
156
        self.socket.connect(address)
157
      except socket.timeout, err:
158
        raise TimeoutError("Connect timed out: %s" % str(err))
159
      except socket.error, err:
160
        if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
161
          raise NoMasterError((address,))
162
        raise
163
      self.socket.settimeout(self._rwtimeout)
164
    except (socket.error, NoMasterError):
165
      if self.socket is not None:
166
        self.socket.close()
167
      self.socket = None
168
      raise
169

    
170
  def _CheckSocket(self):
171
    """Make sure we are connected.
172

173
    """
174
    if self.socket is None:
175
      raise ProtocolError("Connection is closed")
176

    
177
  def Send(self, msg):
178
    """Send a message.
179

180
    This just sends a message and doesn't wait for the response.
181

182
    """
183
    if self.eom in msg:
184
      raise EncodingError("Message terminator found in payload")
185
    self._CheckSocket()
186
    try:
187
      self.socket.sendall(msg + self.eom)
188
    except socket.timeout, err:
189
      raise TimeoutError("Sending timeout: %s" % str(err))
190

    
191
  def Recv(self):
192
    """Try to receive a messae from the socket.
193

194
    In case we already have messages queued, we just return from the
195
    queue. Otherwise, we try to read data with a _rwtimeout network
196
    timeout, and making sure we don't go over 2x_rwtimeout as a global
197
    limit.
198

199
    """
200
    self._CheckSocket()
201
    etime = time.time() + self._rwtimeout
202
    while not self._msgs:
203
      if time.time() > etime:
204
        raise TimeoutError("Extended receive timeout")
205
      try:
206
        data = self.socket.recv(4096)
207
      except socket.timeout, err:
208
        raise TimeoutError("Receive timeout: %s" % str(err))
209
      if not data:
210
        raise ConnectionClosedError("Connection closed while reading")
211
      new_msgs = (self._buffer + data).split(self.eom)
212
      self._buffer = new_msgs.pop()
213
      self._msgs.extend(new_msgs)
214
    return self._msgs.popleft()
215

    
216
  def Call(self, msg):
217
    """Send a message and wait for the response.
218

219
    This is just a wrapper over Send and Recv.
220

221
    """
222
    self.Send(msg)
223
    return self.Recv()
224

    
225
  def Close(self):
226
    """Close the socket"""
227
    if self.socket is not None:
228
      self.socket.close()
229
      self.socket = None
230

    
231

    
232
class Client(object):
233
  """High-level client implementation.
234

235
  This uses a backing Transport-like class on top of which it
236
  implements data serialization/deserialization.
237

238
  """
239
  def __init__(self, address=None, timeouts=None, transport=Transport):
240
    """Constructor for the Client class.
241

242
    Arguments:
243
      - address: a valid address the the used transport class
244
      - timeout: a list of timeouts, to be used on connect and read/write
245
      - transport: a Transport-like class
246

247

248
    If timeout is not passed, the default timeouts of the transport
249
    class are used.
250

251
    """
252
    if address is None:
253
      address = constants.MASTER_SOCKET
254
    self.transport = transport(address, timeouts=timeouts)
255

    
256
  def CallMethod(self, method, args):
257
    """Send a generic request and return the response.
258

259
    """
260
    # Build request
261
    request = {
262
      KEY_METHOD: method,
263
      KEY_ARGS: args,
264
      }
265

    
266
    # Send request and wait for response
267
    result = self.transport.Call(serializer.DumpJson(request, indent=False))
268
    try:
269
      data = serializer.LoadJson(result)
270
    except Exception, err:
271
      raise ProtocolError("Error while deserializing response: %s" % str(err))
272

    
273
    # Validate response
274
    if (not isinstance(data, dict) or
275
        KEY_SUCCESS not in data or
276
        KEY_RESULT not in data):
277
      raise DecodingError("Invalid response from server: %s" % str(data))
278

    
279
    result = data[KEY_RESULT]
280

    
281
    if not data[KEY_SUCCESS]:
282
      # TODO: decide on a standard exception
283
      if (isinstance(result, (tuple, list)) and len(result) == 2 and
284
          isinstance(result[1], (tuple, list))):
285
        # custom ganeti errors
286
        err_class = errors.GetErrorClass(result[0])
287
        if err_class is not None:
288
          raise err_class, tuple(result[1])
289

    
290
      raise RequestError(result)
291

    
292
    return result
293

    
294
  def SubmitJob(self, ops):
295
    ops_state = map(lambda op: op.__getstate__(), ops)
296
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
297

    
298
  def CancelJob(self, job_id):
299
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
300

    
301
  def ArchiveJob(self, job_id):
302
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
303

    
304
  def AutoArchiveJobs(self, age):
305
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, age)
306

    
307
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
308
    timeout = (DEF_RWTO - 1) / 2
309
    while True:
310
      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
311
                               (job_id, fields, prev_job_info,
312
                                prev_log_serial, timeout))
313
      if result != constants.JOB_NOTCHANGED:
314
        break
315
    return result
316

    
317
  def QueryJobs(self, job_ids, fields):
318
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
319

    
320
  def QueryInstances(self, names, fields):
321
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
322

    
323
  def QueryNodes(self, names, fields):
324
    return self.CallMethod(REQ_QUERY_NODES, (names, fields))
325

    
326
  def QueryExports(self, nodes):
327
    return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
328

    
329
  def QueryConfigValues(self, fields):
330
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
331

    
332
# TODO: class Server(object)