Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 196ec587

History | View | Annotate | Download (11.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36

    
37
from ganeti import serializer
38
from ganeti import constants
39
from ganeti import errors
40
from ganeti import utils
41

    
42

    
43
KEY_METHOD = 'method'
44
KEY_ARGS = 'args'
45
KEY_SUCCESS = "success"
46
KEY_RESULT = "result"
47

    
48
REQ_SUBMIT_JOB = "SubmitJob"
49
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
50
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
51
REQ_CANCEL_JOB = "CancelJob"
52
REQ_ARCHIVE_JOB = "ArchiveJob"
53
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
54
REQ_QUERY_JOBS = "QueryJobs"
55
REQ_QUERY_INSTANCES = "QueryInstances"
56
REQ_QUERY_NODES = "QueryNodes"
57
REQ_QUERY_EXPORTS = "QueryExports"
58
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
59
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
60
REQ_QUERY_TAGS = "QueryTags"
61
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
62
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
63

    
64
DEF_CTMO = 10
65
DEF_RWTO = 60
66

    
67
# WaitForJobChange timeout
68
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
69

    
70

    
71
class ProtocolError(Exception):
72
  """Denotes an error in the server communication"""
73

    
74

    
75
class ConnectionClosedError(ProtocolError):
76
  """Connection closed error"""
77

    
78

    
79
class TimeoutError(ProtocolError):
80
  """Operation timeout error"""
81

    
82

    
83
class EncodingError(ProtocolError):
84
  """Encoding failure on the sending side"""
85

    
86

    
87
class DecodingError(ProtocolError):
88
  """Decoding failure on the receiving side"""
89

    
90

    
91
class RequestError(ProtocolError):
92
  """Error on request
93

94
  This signifies an error in the request format or request handling,
95
  but not (e.g.) an error in starting up an instance.
96

97
  Some common conditions that can trigger this exception:
98
    - job submission failed because the job data was wrong
99
    - query failed because required fields were missing
100

101
  """
102

    
103

    
104
class NoMasterError(ProtocolError):
105
  """The master cannot be reached
106

107
  This means that the master daemon is not running or the socket has
108
  been removed.
109

110
  """
111

    
112

    
113
class Transport:
114
  """Low-level transport class.
115

116
  This is used on the client side.
117

118
  This could be replace by any other class that provides the same
119
  semantics to the Client. This means:
120
    - can send messages and receive messages
121
    - safe for multithreading
122

123
  """
124

    
125
  def __init__(self, address, timeouts=None):
126
    """Constructor for the Client class.
127

128
    Arguments:
129
      - address: a valid address the the used transport class
130
      - timeout: a list of timeouts, to be used on connect and read/write
131

132
    There are two timeouts used since we might want to wait for a long
133
    time for a response, but the connect timeout should be lower.
134

135
    If not passed, we use a default of 10 and respectively 60 seconds.
136

137
    Note that on reading data, since the timeout applies to an
138
    invidual receive, it might be that the total duration is longer
139
    than timeout value passed (we make a hard limit at twice the read
140
    timeout).
141

142
    """
143
    self.address = address
144
    if timeouts is None:
145
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
146
    else:
147
      self._ctimeout, self._rwtimeout = timeouts
148

    
149
    self.socket = None
150
    self._buffer = ""
151
    self._msgs = collections.deque()
152

    
153
    try:
154
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
155

    
156
      # Try to connect
157
      try:
158
        utils.Retry(self._Connect, 1.0, self._ctimeout,
159
                    args=(self.socket, address, self._ctimeout))
160
      except utils.RetryTimeout:
161
        raise TimeoutError("Connect timed out")
162

    
163
      self.socket.settimeout(self._rwtimeout)
164
    except (socket.error, NoMasterError):
165
      if self.socket is not None:
166
        self.socket.close()
167
      self.socket = None
168
      raise
169

    
170
  @staticmethod
171
  def _Connect(sock, address, timeout):
172
    sock.settimeout(timeout)
173
    try:
174
      sock.connect(address)
175
    except socket.timeout, err:
176
      raise TimeoutError("Connect timed out: %s" % str(err))
177
    except socket.error, err:
178
      if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
179
        raise NoMasterError(address)
180
      if err.args[0] == errno.EAGAIN:
181
        # Server's socket backlog is full at the moment
182
        raise utils.RetryAgain()
183
      raise
184

    
185
  def _CheckSocket(self):
186
    """Make sure we are connected.
187

188
    """
189
    if self.socket is None:
190
      raise ProtocolError("Connection is closed")
191

    
192
  def Send(self, msg):
193
    """Send a message.
194

195
    This just sends a message and doesn't wait for the response.
196

197
    """
198
    if constants.LUXI_EOM in msg:
199
      raise EncodingError("Message terminator found in payload")
200
    self._CheckSocket()
201
    try:
202
      # TODO: sendall is not guaranteed to send everything
203
      self.socket.sendall(msg + constants.LUXI_EOM)
204
    except socket.timeout, err:
205
      raise TimeoutError("Sending timeout: %s" % str(err))
206

    
207
  def Recv(self):
208
    """Try to receive a message from the socket.
209

210
    In case we already have messages queued, we just return from the
211
    queue. Otherwise, we try to read data with a _rwtimeout network
212
    timeout, and making sure we don't go over 2x_rwtimeout as a global
213
    limit.
214

215
    """
216
    self._CheckSocket()
217
    etime = time.time() + self._rwtimeout
218
    while not self._msgs:
219
      if time.time() > etime:
220
        raise TimeoutError("Extended receive timeout")
221
      while True:
222
        try:
223
          data = self.socket.recv(4096)
224
        except socket.error, err:
225
          if err.args and err.args[0] == errno.EAGAIN:
226
            continue
227
          raise
228
        except socket.timeout, err:
229
          raise TimeoutError("Receive timeout: %s" % str(err))
230
        break
231
      if not data:
232
        raise ConnectionClosedError("Connection closed while reading")
233
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
234
      self._buffer = new_msgs.pop()
235
      self._msgs.extend(new_msgs)
236
    return self._msgs.popleft()
237

    
238
  def Call(self, msg):
239
    """Send a message and wait for the response.
240

241
    This is just a wrapper over Send and Recv.
242

243
    """
244
    self.Send(msg)
245
    return self.Recv()
246

    
247
  def Close(self):
248
    """Close the socket"""
249
    if self.socket is not None:
250
      self.socket.close()
251
      self.socket = None
252

    
253

    
254
class Client(object):
255
  """High-level client implementation.
256

257
  This uses a backing Transport-like class on top of which it
258
  implements data serialization/deserialization.
259

260
  """
261
  def __init__(self, address=None, timeouts=None, transport=Transport):
262
    """Constructor for the Client class.
263

264
    Arguments:
265
      - address: a valid address the the used transport class
266
      - timeout: a list of timeouts, to be used on connect and read/write
267
      - transport: a Transport-like class
268

269

270
    If timeout is not passed, the default timeouts of the transport
271
    class are used.
272

273
    """
274
    if address is None:
275
      address = constants.MASTER_SOCKET
276
    self.address = address
277
    self.timeouts = timeouts
278
    self.transport_class = transport
279
    self.transport = None
280
    self._InitTransport()
281

    
282
  def _InitTransport(self):
283
    """(Re)initialize the transport if needed.
284

285
    """
286
    if self.transport is None:
287
      self.transport = self.transport_class(self.address,
288
                                            timeouts=self.timeouts)
289

    
290
  def _CloseTransport(self):
291
    """Close the transport, ignoring errors.
292

293
    """
294
    if self.transport is None:
295
      return
296
    try:
297
      old_transp = self.transport
298
      self.transport = None
299
      old_transp.Close()
300
    except Exception: # pylint: disable-msg=W0703
301
      pass
302

    
303
  def CallMethod(self, method, args):
304
    """Send a generic request and return the response.
305

306
    """
307
    # Build request
308
    request = {
309
      KEY_METHOD: method,
310
      KEY_ARGS: args,
311
      }
312

    
313
    # Serialize the request
314
    send_data = serializer.DumpJson(request, indent=False)
315

    
316
    # Send request and wait for response
317
    try:
318
      self._InitTransport()
319
      result = self.transport.Call(send_data)
320
    except Exception:
321
      self._CloseTransport()
322
      raise
323

    
324
    # Parse the result
325
    try:
326
      data = serializer.LoadJson(result)
327
    except Exception, err:
328
      raise ProtocolError("Error while deserializing response: %s" % str(err))
329

    
330
    # Validate response
331
    if (not isinstance(data, dict) or
332
        KEY_SUCCESS not in data or
333
        KEY_RESULT not in data):
334
      raise DecodingError("Invalid response from server: %s" % str(data))
335

    
336
    result = data[KEY_RESULT]
337

    
338
    if not data[KEY_SUCCESS]:
339
      errors.MaybeRaise(result)
340
      raise RequestError(result)
341

    
342
    return result
343

    
344
  def SetQueueDrainFlag(self, drain_flag):
345
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
346

    
347
  def SetWatcherPause(self, until):
348
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
349

    
350
  def SubmitJob(self, ops):
351
    ops_state = map(lambda op: op.__getstate__(), ops)
352
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
353

    
354
  def SubmitManyJobs(self, jobs):
355
    jobs_state = []
356
    for ops in jobs:
357
      jobs_state.append([op.__getstate__() for op in ops])
358
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
359

    
360
  def CancelJob(self, job_id):
361
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
362

    
363
  def ArchiveJob(self, job_id):
364
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
365

    
366
  def AutoArchiveJobs(self, age):
367
    timeout = (DEF_RWTO - 1) / 2
368
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
369

    
370
  def WaitForJobChangeOnce(self, job_id, fields,
371
                           prev_job_info, prev_log_serial,
372
                           timeout=WFJC_TIMEOUT):
373
    """Waits for changes on a job.
374

375
    @param job_id: Job ID
376
    @type fields: list
377
    @param fields: List of field names to be observed
378
    @type prev_job_info: None or list
379
    @param prev_job_info: Previously received job information
380
    @type prev_log_serial: None or int/long
381
    @param prev_log_serial: Highest log serial number previously received
382
    @type timeout: int/float
383
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
384
                    be capped to that value)
385

386
    """
387
    assert timeout >= 0, "Timeout can not be negative"
388
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
389
                           (job_id, fields, prev_job_info,
390
                            prev_log_serial,
391
                            min(WFJC_TIMEOUT, timeout)))
392

    
393
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
394
    while True:
395
      result = self.WaitForJobChangeOnce(job_id, fields,
396
                                         prev_job_info, prev_log_serial)
397
      if result != constants.JOB_NOTCHANGED:
398
        break
399
    return result
400

    
401
  def QueryJobs(self, job_ids, fields):
402
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
403

    
404
  def QueryInstances(self, names, fields, use_locking):
405
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
406

    
407
  def QueryNodes(self, names, fields, use_locking):
408
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
409

    
410
  def QueryExports(self, nodes, use_locking):
411
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
412

    
413
  def QueryClusterInfo(self):
414
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
415

    
416
  def QueryConfigValues(self, fields):
417
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
418

    
419
  def QueryTags(self, kind, name):
420
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))