Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 793a8f7c

History | View | Annotate | Download (12.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36

    
37
from ganeti import serializer
38
from ganeti import constants
39
from ganeti import errors
40
from ganeti import utils
41

    
42

    
43
KEY_METHOD = 'method'
44
KEY_ARGS = 'args'
45
KEY_SUCCESS = "success"
46
KEY_RESULT = "result"
47

    
48
REQ_SUBMIT_JOB = "SubmitJob"
49
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
50
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
51
REQ_CANCEL_JOB = "CancelJob"
52
REQ_ARCHIVE_JOB = "ArchiveJob"
53
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
54
REQ_QUERY_JOBS = "QueryJobs"
55
REQ_QUERY_INSTANCES = "QueryInstances"
56
REQ_QUERY_NODES = "QueryNodes"
57
REQ_QUERY_EXPORTS = "QueryExports"
58
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
59
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
60
REQ_QUERY_TAGS = "QueryTags"
61
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
62
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
63

    
64
DEF_CTMO = 10
65
DEF_RWTO = 60
66

    
67
# WaitForJobChange timeout
68
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
69

    
70

    
71
class ProtocolError(Exception):
72
  """Denotes an error in the server communication"""
73

    
74

    
75
class ConnectionClosedError(ProtocolError):
76
  """Connection closed error"""
77

    
78

    
79
class TimeoutError(ProtocolError):
80
  """Operation timeout error"""
81

    
82

    
83
class EncodingError(ProtocolError):
84
  """Encoding failure on the sending side"""
85

    
86

    
87
class DecodingError(ProtocolError):
88
  """Decoding failure on the receiving side"""
89

    
90

    
91
class RequestError(ProtocolError):
92
  """Error on request
93

94
  This signifies an error in the request format or request handling,
95
  but not (e.g.) an error in starting up an instance.
96

97
  Some common conditions that can trigger this exception:
98
    - job submission failed because the job data was wrong
99
    - query failed because required fields were missing
100

101
  """
102

    
103

    
104
class NoMasterError(ProtocolError):
105
  """The master cannot be reached
106

107
  This means that the master daemon is not running or the socket has
108
  been removed.
109

110
  """
111

    
112

    
113
class Transport:
114
  """Low-level transport class.
115

116
  This is used on the client side.
117

118
  This could be replace by any other class that provides the same
119
  semantics to the Client. This means:
120
    - can send messages and receive messages
121
    - safe for multithreading
122

123
  """
124

    
125
  def __init__(self, address, timeouts=None, eom=None):
126
    """Constructor for the Client class.
127

128
    Arguments:
129
      - address: a valid address the the used transport class
130
      - timeout: a list of timeouts, to be used on connect and read/write
131
      - eom: an identifier to be used as end-of-message which the
132
        upper-layer will guarantee that this identifier will not appear
133
        in any message
134

135
    There are two timeouts used since we might want to wait for a long
136
    time for a response, but the connect timeout should be lower.
137

138
    If not passed, we use a default of 10 and respectively 60 seconds.
139

140
    Note that on reading data, since the timeout applies to an
141
    invidual receive, it might be that the total duration is longer
142
    than timeout value passed (we make a hard limit at twice the read
143
    timeout).
144

145
    """
146
    self.address = address
147
    if timeouts is None:
148
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
149
    else:
150
      self._ctimeout, self._rwtimeout = timeouts
151

    
152
    self.socket = None
153
    self._buffer = ""
154
    self._msgs = collections.deque()
155

    
156
    if eom is None:
157
      self.eom = '\3'
158
    else:
159
      self.eom = eom
160

    
161
    try:
162
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
163

    
164
      # Try to connect
165
      try:
166
        utils.Retry(self._Connect, 1.0, self._ctimeout,
167
                    args=(self.socket, address, self._ctimeout))
168
      except utils.RetryTimeout:
169
        raise TimeoutError("Connect timed out")
170

    
171
      self.socket.settimeout(self._rwtimeout)
172
    except (socket.error, NoMasterError):
173
      if self.socket is not None:
174
        self.socket.close()
175
      self.socket = None
176
      raise
177

    
178
  @staticmethod
179
  def _Connect(sock, address, timeout):
180
    sock.settimeout(timeout)
181
    try:
182
      sock.connect(address)
183
    except socket.timeout, err:
184
      raise TimeoutError("Connect timed out: %s" % str(err))
185
    except socket.error, err:
186
      if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
187
        raise NoMasterError(address)
188
      if err.args[0] == errno.EAGAIN:
189
        # Server's socket backlog is full at the moment
190
        raise utils.RetryAgain()
191
      raise
192

    
193
  def _CheckSocket(self):
194
    """Make sure we are connected.
195

196
    """
197
    if self.socket is None:
198
      raise ProtocolError("Connection is closed")
199

    
200
  def Send(self, msg):
201
    """Send a message.
202

203
    This just sends a message and doesn't wait for the response.
204

205
    """
206
    if self.eom in msg:
207
      raise EncodingError("Message terminator found in payload")
208
    self._CheckSocket()
209
    try:
210
      # TODO: sendall is not guaranteed to send everything
211
      self.socket.sendall(msg + self.eom)
212
    except socket.timeout, err:
213
      raise TimeoutError("Sending timeout: %s" % str(err))
214

    
215
  def Recv(self):
216
    """Try to receive a message from the socket.
217

218
    In case we already have messages queued, we just return from the
219
    queue. Otherwise, we try to read data with a _rwtimeout network
220
    timeout, and making sure we don't go over 2x_rwtimeout as a global
221
    limit.
222

223
    """
224
    self._CheckSocket()
225
    etime = time.time() + self._rwtimeout
226
    while not self._msgs:
227
      if time.time() > etime:
228
        raise TimeoutError("Extended receive timeout")
229
      while True:
230
        try:
231
          data = self.socket.recv(4096)
232
        except socket.error, err:
233
          if err.args and err.args[0] == errno.EAGAIN:
234
            continue
235
          raise
236
        except socket.timeout, err:
237
          raise TimeoutError("Receive timeout: %s" % str(err))
238
        break
239
      if not data:
240
        raise ConnectionClosedError("Connection closed while reading")
241
      new_msgs = (self._buffer + data).split(self.eom)
242
      self._buffer = new_msgs.pop()
243
      self._msgs.extend(new_msgs)
244
    return self._msgs.popleft()
245

    
246
  def Call(self, msg):
247
    """Send a message and wait for the response.
248

249
    This is just a wrapper over Send and Recv.
250

251
    """
252
    self.Send(msg)
253
    return self.Recv()
254

    
255
  def Close(self):
256
    """Close the socket"""
257
    if self.socket is not None:
258
      self.socket.close()
259
      self.socket = None
260

    
261

    
262
class Client(object):
263
  """High-level client implementation.
264

265
  This uses a backing Transport-like class on top of which it
266
  implements data serialization/deserialization.
267

268
  """
269
  def __init__(self, address=None, timeouts=None, transport=Transport):
270
    """Constructor for the Client class.
271

272
    Arguments:
273
      - address: a valid address the the used transport class
274
      - timeout: a list of timeouts, to be used on connect and read/write
275
      - transport: a Transport-like class
276

277

278
    If timeout is not passed, the default timeouts of the transport
279
    class are used.
280

281
    """
282
    if address is None:
283
      address = constants.MASTER_SOCKET
284
    self.address = address
285
    self.timeouts = timeouts
286
    self.transport_class = transport
287
    self.transport = None
288
    self._InitTransport()
289

    
290
  def _InitTransport(self):
291
    """(Re)initialize the transport if needed.
292

293
    """
294
    if self.transport is None:
295
      self.transport = self.transport_class(self.address,
296
                                            timeouts=self.timeouts)
297

    
298
  def _CloseTransport(self):
299
    """Close the transport, ignoring errors.
300

301
    """
302
    if self.transport is None:
303
      return
304
    try:
305
      old_transp = self.transport
306
      self.transport = None
307
      old_transp.Close()
308
    except Exception: # pylint: disable-msg=W0703
309
      pass
310

    
311
  def CallMethod(self, method, args):
312
    """Send a generic request and return the response.
313

314
    """
315
    # Build request
316
    request = {
317
      KEY_METHOD: method,
318
      KEY_ARGS: args,
319
      }
320

    
321
    # Serialize the request
322
    send_data = serializer.DumpJson(request, indent=False)
323

    
324
    # Send request and wait for response
325
    try:
326
      self._InitTransport()
327
      result = self.transport.Call(send_data)
328
    except Exception:
329
      self._CloseTransport()
330
      raise
331

    
332
    # Parse the result
333
    try:
334
      data = serializer.LoadJson(result)
335
    except Exception, err:
336
      raise ProtocolError("Error while deserializing response: %s" % str(err))
337

    
338
    # Validate response
339
    if (not isinstance(data, dict) or
340
        KEY_SUCCESS not in data or
341
        KEY_RESULT not in data):
342
      raise DecodingError("Invalid response from server: %s" % str(data))
343

    
344
    result = data[KEY_RESULT]
345

    
346
    if not data[KEY_SUCCESS]:
347
      errors.MaybeRaise(result)
348
      raise RequestError(result)
349

    
350
    return result
351

    
352
  def SetQueueDrainFlag(self, drain_flag):
353
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
354

    
355
  def SetWatcherPause(self, until):
356
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
357

    
358
  def SubmitJob(self, ops):
359
    ops_state = map(lambda op: op.__getstate__(), ops)
360
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
361

    
362
  def SubmitManyJobs(self, jobs):
363
    jobs_state = []
364
    for ops in jobs:
365
      jobs_state.append([op.__getstate__() for op in ops])
366
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
367

    
368
  def CancelJob(self, job_id):
369
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
370

    
371
  def ArchiveJob(self, job_id):
372
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
373

    
374
  def AutoArchiveJobs(self, age):
375
    timeout = (DEF_RWTO - 1) / 2
376
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
377

    
378
  def WaitForJobChangeOnce(self, job_id, fields,
379
                           prev_job_info, prev_log_serial,
380
                           timeout=WFJC_TIMEOUT):
381
    """Waits for changes on a job.
382

383
    @param job_id: Job ID
384
    @type fields: list
385
    @param fields: List of field names to be observed
386
    @type prev_job_info: None or list
387
    @param prev_job_info: Previously received job information
388
    @type prev_log_serial: None or int/long
389
    @param prev_log_serial: Highest log serial number previously received
390
    @type timeout: int/float
391
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
392
                    be capped to that value)
393

394
    """
395
    assert timeout >= 0, "Timeout can not be negative"
396
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
397
                           (job_id, fields, prev_job_info,
398
                            prev_log_serial,
399
                            min(WFJC_TIMEOUT, timeout)))
400

    
401
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
402
    while True:
403
      result = self.WaitForJobChangeOnce(job_id, fields,
404
                                         prev_job_info, prev_log_serial)
405
      if result != constants.JOB_NOTCHANGED:
406
        break
407
    return result
408

    
409
  def QueryJobs(self, job_ids, fields):
410
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
411

    
412
  def QueryInstances(self, names, fields, use_locking):
413
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
414

    
415
  def QueryNodes(self, names, fields, use_locking):
416
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
417

    
418
  def QueryExports(self, nodes, use_locking):
419
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
420

    
421
  def QueryClusterInfo(self):
422
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
423

    
424
  def QueryConfigValues(self, fields):
425
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
426

    
427
  def QueryTags(self, kind, name):
428
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))