Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ ec39d63c

History | View | Annotate | Download (10.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36

    
37
from ganeti import serializer
38
from ganeti import constants
39
from ganeti import errors
40

    
41

    
42
KEY_METHOD = 'method'
43
KEY_ARGS = 'args'
44
KEY_SUCCESS = "success"
45
KEY_RESULT = "result"
46

    
47
REQ_SUBMIT_JOB = "SubmitJob"
48
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
49
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
50
REQ_CANCEL_JOB = "CancelJob"
51
REQ_ARCHIVE_JOB = "ArchiveJob"
52
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
53
REQ_QUERY_JOBS = "QueryJobs"
54
REQ_QUERY_INSTANCES = "QueryInstances"
55
REQ_QUERY_NODES = "QueryNodes"
56
REQ_QUERY_EXPORTS = "QueryExports"
57
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
58
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
59
REQ_QUERY_TAGS = "QueryTags"
60
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
61
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
62

    
63
DEF_CTMO = 10
64
DEF_RWTO = 60
65

    
66

    
67
class ProtocolError(Exception):
68
  """Denotes an error in the server communication"""
69

    
70

    
71
class ConnectionClosedError(ProtocolError):
72
  """Connection closed error"""
73

    
74

    
75
class TimeoutError(ProtocolError):
76
  """Operation timeout error"""
77

    
78

    
79
class EncodingError(ProtocolError):
80
  """Encoding failure on the sending side"""
81

    
82

    
83
class DecodingError(ProtocolError):
84
  """Decoding failure on the receiving side"""
85

    
86

    
87
class RequestError(ProtocolError):
88
  """Error on request
89

90
  This signifies an error in the request format or request handling,
91
  but not (e.g.) an error in starting up an instance.
92

93
  Some common conditions that can trigger this exception:
94
    - job submission failed because the job data was wrong
95
    - query failed because required fields were missing
96

97
  """
98

    
99

    
100
class NoMasterError(ProtocolError):
101
  """The master cannot be reached
102

103
  This means that the master daemon is not running or the socket has
104
  been removed.
105

106
  """
107

    
108

    
109
class Transport:
110
  """Low-level transport class.
111

112
  This is used on the client side.
113

114
  This could be replace by any other class that provides the same
115
  semantics to the Client. This means:
116
    - can send messages and receive messages
117
    - safe for multithreading
118

119
  """
120

    
121
  def __init__(self, address, timeouts=None, eom=None):
122
    """Constructor for the Client class.
123

124
    Arguments:
125
      - address: a valid address the the used transport class
126
      - timeout: a list of timeouts, to be used on connect and read/write
127
      - eom: an identifier to be used as end-of-message which the
128
        upper-layer will guarantee that this identifier will not appear
129
        in any message
130

131
    There are two timeouts used since we might want to wait for a long
132
    time for a response, but the connect timeout should be lower.
133

134
    If not passed, we use a default of 10 and respectively 60 seconds.
135

136
    Note that on reading data, since the timeout applies to an
137
    invidual receive, it might be that the total duration is longer
138
    than timeout value passed (we make a hard limit at twice the read
139
    timeout).
140

141
    """
142
    self.address = address
143
    if timeouts is None:
144
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
145
    else:
146
      self._ctimeout, self._rwtimeout = timeouts
147

    
148
    self.socket = None
149
    self._buffer = ""
150
    self._msgs = collections.deque()
151

    
152
    if eom is None:
153
      self.eom = '\3'
154
    else:
155
      self.eom = eom
156

    
157
    try:
158
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
159
      self.socket.settimeout(self._ctimeout)
160
      try:
161
        self.socket.connect(address)
162
      except socket.timeout, err:
163
        raise TimeoutError("Connect timed out: %s" % str(err))
164
      except socket.error, err:
165
        if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
166
          raise NoMasterError(address)
167
        raise
168
      self.socket.settimeout(self._rwtimeout)
169
    except (socket.error, NoMasterError):
170
      if self.socket is not None:
171
        self.socket.close()
172
      self.socket = None
173
      raise
174

    
175
  def _CheckSocket(self):
176
    """Make sure we are connected.
177

178
    """
179
    if self.socket is None:
180
      raise ProtocolError("Connection is closed")
181

    
182
  def Send(self, msg):
183
    """Send a message.
184

185
    This just sends a message and doesn't wait for the response.
186

187
    """
188
    if self.eom in msg:
189
      raise EncodingError("Message terminator found in payload")
190
    self._CheckSocket()
191
    try:
192
      # TODO: sendall is not guaranteed to send everything
193
      self.socket.sendall(msg + self.eom)
194
    except socket.timeout, err:
195
      raise TimeoutError("Sending timeout: %s" % str(err))
196

    
197
  def Recv(self):
198
    """Try to receive a message from the socket.
199

200
    In case we already have messages queued, we just return from the
201
    queue. Otherwise, we try to read data with a _rwtimeout network
202
    timeout, and making sure we don't go over 2x_rwtimeout as a global
203
    limit.
204

205
    """
206
    self._CheckSocket()
207
    etime = time.time() + self._rwtimeout
208
    while not self._msgs:
209
      if time.time() > etime:
210
        raise TimeoutError("Extended receive timeout")
211
      while True:
212
        try:
213
          data = self.socket.recv(4096)
214
        except socket.error, err:
215
          if err.args and err.args[0] == errno.EAGAIN:
216
            continue
217
          raise
218
        except socket.timeout, err:
219
          raise TimeoutError("Receive timeout: %s" % str(err))
220
        break
221
      if not data:
222
        raise ConnectionClosedError("Connection closed while reading")
223
      new_msgs = (self._buffer + data).split(self.eom)
224
      self._buffer = new_msgs.pop()
225
      self._msgs.extend(new_msgs)
226
    return self._msgs.popleft()
227

    
228
  def Call(self, msg):
229
    """Send a message and wait for the response.
230

231
    This is just a wrapper over Send and Recv.
232

233
    """
234
    self.Send(msg)
235
    return self.Recv()
236

    
237
  def Close(self):
238
    """Close the socket"""
239
    if self.socket is not None:
240
      self.socket.close()
241
      self.socket = None
242

    
243

    
244
class Client(object):
245
  """High-level client implementation.
246

247
  This uses a backing Transport-like class on top of which it
248
  implements data serialization/deserialization.
249

250
  """
251
  def __init__(self, address=None, timeouts=None, transport=Transport):
252
    """Constructor for the Client class.
253

254
    Arguments:
255
      - address: a valid address the the used transport class
256
      - timeout: a list of timeouts, to be used on connect and read/write
257
      - transport: a Transport-like class
258

259

260
    If timeout is not passed, the default timeouts of the transport
261
    class are used.
262

263
    """
264
    if address is None:
265
      address = constants.MASTER_SOCKET
266
    self.address = address
267
    self.timeouts = timeouts
268
    self.transport_class = transport
269
    self.transport = None
270
    self._InitTransport()
271

    
272
  def _InitTransport(self):
273
    """(Re)initialize the transport if needed.
274

275
    """
276
    if self.transport is None:
277
      self.transport = self.transport_class(self.address,
278
                                            timeouts=self.timeouts)
279

    
280
  def _CloseTransport(self):
281
    """Close the transport, ignoring errors.
282

283
    """
284
    if self.transport is None:
285
      return
286
    try:
287
      old_transp = self.transport
288
      self.transport = None
289
      old_transp.Close()
290
    except Exception: # pylint: disable-msg=W0703
291
      pass
292

    
293
  def CallMethod(self, method, args):
294
    """Send a generic request and return the response.
295

296
    """
297
    # Build request
298
    request = {
299
      KEY_METHOD: method,
300
      KEY_ARGS: args,
301
      }
302

    
303
    # Serialize the request
304
    send_data = serializer.DumpJson(request, indent=False)
305

    
306
    # Send request and wait for response
307
    try:
308
      self._InitTransport()
309
      result = self.transport.Call(send_data)
310
    except Exception:
311
      self._CloseTransport()
312
      raise
313

    
314
    # Parse the result
315
    try:
316
      data = serializer.LoadJson(result)
317
    except Exception, err:
318
      raise ProtocolError("Error while deserializing response: %s" % str(err))
319

    
320
    # Validate response
321
    if (not isinstance(data, dict) or
322
        KEY_SUCCESS not in data or
323
        KEY_RESULT not in data):
324
      raise DecodingError("Invalid response from server: %s" % str(data))
325

    
326
    result = data[KEY_RESULT]
327

    
328
    if not data[KEY_SUCCESS]:
329
      errors.MaybeRaise(result)
330
      raise RequestError(result)
331

    
332
    return result
333

    
334
  def SetQueueDrainFlag(self, drain_flag):
335
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
336

    
337
  def SetWatcherPause(self, until):
338
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
339

    
340
  def SubmitJob(self, ops):
341
    ops_state = map(lambda op: op.__getstate__(), ops)
342
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
343

    
344
  def SubmitManyJobs(self, jobs):
345
    jobs_state = []
346
    for ops in jobs:
347
      jobs_state.append([op.__getstate__() for op in ops])
348
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
349

    
350
  def CancelJob(self, job_id):
351
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
352

    
353
  def ArchiveJob(self, job_id):
354
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
355

    
356
  def AutoArchiveJobs(self, age):
357
    timeout = (DEF_RWTO - 1) / 2
358
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
359

    
360
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
361
    timeout = (DEF_RWTO - 1) / 2
362
    while True:
363
      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
364
                               (job_id, fields, prev_job_info,
365
                                prev_log_serial, timeout))
366
      if result != constants.JOB_NOTCHANGED:
367
        break
368
    return result
369

    
370
  def QueryJobs(self, job_ids, fields):
371
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
372

    
373
  def QueryInstances(self, names, fields, use_locking):
374
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
375

    
376
  def QueryNodes(self, names, fields, use_locking):
377
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
378

    
379
  def QueryExports(self, nodes, use_locking):
380
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
381

    
382
  def QueryClusterInfo(self):
383
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
384

    
385
  def QueryConfigValues(self, fields):
386
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
387

    
388
  def QueryTags(self, kind, name):
389
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
390

    
391

    
392
# TODO: class Server(object)