Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 0fbae49a

History | View | Annotate | Download (11.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36

    
37
from ganeti import serializer
38
from ganeti import constants
39
from ganeti import errors
40
from ganeti import utils
41

    
42

    
43
KEY_METHOD = 'method'
44
KEY_ARGS = 'args'
45
KEY_SUCCESS = "success"
46
KEY_RESULT = "result"
47

    
48
REQ_SUBMIT_JOB = "SubmitJob"
49
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
50
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
51
REQ_CANCEL_JOB = "CancelJob"
52
REQ_ARCHIVE_JOB = "ArchiveJob"
53
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
54
REQ_QUERY_JOBS = "QueryJobs"
55
REQ_QUERY_INSTANCES = "QueryInstances"
56
REQ_QUERY_NODES = "QueryNodes"
57
REQ_QUERY_EXPORTS = "QueryExports"
58
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
59
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
60
REQ_QUERY_TAGS = "QueryTags"
61
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
62
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
63

    
64
DEF_CTMO = 10
65
DEF_RWTO = 60
66

    
67

    
68
class ProtocolError(Exception):
69
  """Denotes an error in the server communication"""
70

    
71

    
72
class ConnectionClosedError(ProtocolError):
73
  """Connection closed error"""
74

    
75

    
76
class TimeoutError(ProtocolError):
77
  """Operation timeout error"""
78

    
79

    
80
class EncodingError(ProtocolError):
81
  """Encoding failure on the sending side"""
82

    
83

    
84
class DecodingError(ProtocolError):
85
  """Decoding failure on the receiving side"""
86

    
87

    
88
class RequestError(ProtocolError):
89
  """Error on request
90

91
  This signifies an error in the request format or request handling,
92
  but not (e.g.) an error in starting up an instance.
93

94
  Some common conditions that can trigger this exception:
95
    - job submission failed because the job data was wrong
96
    - query failed because required fields were missing
97

98
  """
99

    
100

    
101
class NoMasterError(ProtocolError):
102
  """The master cannot be reached
103

104
  This means that the master daemon is not running or the socket has
105
  been removed.
106

107
  """
108

    
109

    
110
class Transport:
111
  """Low-level transport class.
112

113
  This is used on the client side.
114

115
  This could be replace by any other class that provides the same
116
  semantics to the Client. This means:
117
    - can send messages and receive messages
118
    - safe for multithreading
119

120
  """
121

    
122
  def __init__(self, address, timeouts=None, eom=None):
123
    """Constructor for the Client class.
124

125
    Arguments:
126
      - address: a valid address the the used transport class
127
      - timeout: a list of timeouts, to be used on connect and read/write
128
      - eom: an identifier to be used as end-of-message which the
129
        upper-layer will guarantee that this identifier will not appear
130
        in any message
131

132
    There are two timeouts used since we might want to wait for a long
133
    time for a response, but the connect timeout should be lower.
134

135
    If not passed, we use a default of 10 and respectively 60 seconds.
136

137
    Note that on reading data, since the timeout applies to an
138
    invidual receive, it might be that the total duration is longer
139
    than timeout value passed (we make a hard limit at twice the read
140
    timeout).
141

142
    """
143
    self.address = address
144
    if timeouts is None:
145
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
146
    else:
147
      self._ctimeout, self._rwtimeout = timeouts
148

    
149
    self.socket = None
150
    self._buffer = ""
151
    self._msgs = collections.deque()
152

    
153
    if eom is None:
154
      self.eom = '\3'
155
    else:
156
      self.eom = eom
157

    
158
    try:
159
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
160

    
161
      # Try to connect
162
      try:
163
        utils.Retry(self._Connect, 1.0, self._ctimeout,
164
                    args=(self.socket, address, self._ctimeout))
165
      except utils.RetryTimeout:
166
        raise TimeoutError("Connect timed out")
167

    
168
      self.socket.settimeout(self._rwtimeout)
169
    except (socket.error, NoMasterError):
170
      if self.socket is not None:
171
        self.socket.close()
172
      self.socket = None
173
      raise
174

    
175
  @staticmethod
176
  def _Connect(sock, address, timeout):
177
    sock.settimeout(timeout)
178
    try:
179
      sock.connect(address)
180
    except socket.timeout, err:
181
      raise TimeoutError("Connect timed out: %s" % str(err))
182
    except socket.error, err:
183
      if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
184
        raise NoMasterError(address)
185
      if err.args[0] == errno.EAGAIN:
186
        # Server's socket backlog is full at the moment
187
        raise utils.RetryAgain()
188
      raise
189

    
190
  def _CheckSocket(self):
191
    """Make sure we are connected.
192

193
    """
194
    if self.socket is None:
195
      raise ProtocolError("Connection is closed")
196

    
197
  def Send(self, msg):
198
    """Send a message.
199

200
    This just sends a message and doesn't wait for the response.
201

202
    """
203
    if self.eom in msg:
204
      raise EncodingError("Message terminator found in payload")
205
    self._CheckSocket()
206
    try:
207
      # TODO: sendall is not guaranteed to send everything
208
      self.socket.sendall(msg + self.eom)
209
    except socket.timeout, err:
210
      raise TimeoutError("Sending timeout: %s" % str(err))
211

    
212
  def Recv(self):
213
    """Try to receive a message from the socket.
214

215
    In case we already have messages queued, we just return from the
216
    queue. Otherwise, we try to read data with a _rwtimeout network
217
    timeout, and making sure we don't go over 2x_rwtimeout as a global
218
    limit.
219

220
    """
221
    self._CheckSocket()
222
    etime = time.time() + self._rwtimeout
223
    while not self._msgs:
224
      if time.time() > etime:
225
        raise TimeoutError("Extended receive timeout")
226
      while True:
227
        try:
228
          data = self.socket.recv(4096)
229
        except socket.error, err:
230
          if err.args and err.args[0] == errno.EAGAIN:
231
            continue
232
          raise
233
        except socket.timeout, err:
234
          raise TimeoutError("Receive timeout: %s" % str(err))
235
        break
236
      if not data:
237
        raise ConnectionClosedError("Connection closed while reading")
238
      new_msgs = (self._buffer + data).split(self.eom)
239
      self._buffer = new_msgs.pop()
240
      self._msgs.extend(new_msgs)
241
    return self._msgs.popleft()
242

    
243
  def Call(self, msg):
244
    """Send a message and wait for the response.
245

246
    This is just a wrapper over Send and Recv.
247

248
    """
249
    self.Send(msg)
250
    return self.Recv()
251

    
252
  def Close(self):
253
    """Close the socket"""
254
    if self.socket is not None:
255
      self.socket.close()
256
      self.socket = None
257

    
258

    
259
class Client(object):
260
  """High-level client implementation.
261

262
  This uses a backing Transport-like class on top of which it
263
  implements data serialization/deserialization.
264

265
  """
266
  def __init__(self, address=None, timeouts=None, transport=Transport):
267
    """Constructor for the Client class.
268

269
    Arguments:
270
      - address: a valid address the the used transport class
271
      - timeout: a list of timeouts, to be used on connect and read/write
272
      - transport: a Transport-like class
273

274

275
    If timeout is not passed, the default timeouts of the transport
276
    class are used.
277

278
    """
279
    if address is None:
280
      address = constants.MASTER_SOCKET
281
    self.address = address
282
    self.timeouts = timeouts
283
    self.transport_class = transport
284
    self.transport = None
285
    self._InitTransport()
286

    
287
  def _InitTransport(self):
288
    """(Re)initialize the transport if needed.
289

290
    """
291
    if self.transport is None:
292
      self.transport = self.transport_class(self.address,
293
                                            timeouts=self.timeouts)
294

    
295
  def _CloseTransport(self):
296
    """Close the transport, ignoring errors.
297

298
    """
299
    if self.transport is None:
300
      return
301
    try:
302
      old_transp = self.transport
303
      self.transport = None
304
      old_transp.Close()
305
    except Exception: # pylint: disable-msg=W0703
306
      pass
307

    
308
  def CallMethod(self, method, args):
309
    """Send a generic request and return the response.
310

311
    """
312
    # Build request
313
    request = {
314
      KEY_METHOD: method,
315
      KEY_ARGS: args,
316
      }
317

    
318
    # Serialize the request
319
    send_data = serializer.DumpJson(request, indent=False)
320

    
321
    # Send request and wait for response
322
    try:
323
      self._InitTransport()
324
      result = self.transport.Call(send_data)
325
    except Exception:
326
      self._CloseTransport()
327
      raise
328

    
329
    # Parse the result
330
    try:
331
      data = serializer.LoadJson(result)
332
    except Exception, err:
333
      raise ProtocolError("Error while deserializing response: %s" % str(err))
334

    
335
    # Validate response
336
    if (not isinstance(data, dict) or
337
        KEY_SUCCESS not in data or
338
        KEY_RESULT not in data):
339
      raise DecodingError("Invalid response from server: %s" % str(data))
340

    
341
    result = data[KEY_RESULT]
342

    
343
    if not data[KEY_SUCCESS]:
344
      errors.MaybeRaise(result)
345
      raise RequestError(result)
346

    
347
    return result
348

    
349
  def SetQueueDrainFlag(self, drain_flag):
350
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
351

    
352
  def SetWatcherPause(self, until):
353
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
354

    
355
  def SubmitJob(self, ops):
356
    ops_state = map(lambda op: op.__getstate__(), ops)
357
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
358

    
359
  def SubmitManyJobs(self, jobs):
360
    jobs_state = []
361
    for ops in jobs:
362
      jobs_state.append([op.__getstate__() for op in ops])
363
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
364

    
365
  def CancelJob(self, job_id):
366
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
367

    
368
  def ArchiveJob(self, job_id):
369
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
370

    
371
  def AutoArchiveJobs(self, age):
372
    timeout = (DEF_RWTO - 1) / 2
373
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
374

    
375
  def WaitForJobChangeOnce(self, job_id, fields,
376
                           prev_job_info, prev_log_serial):
377
    timeout = (DEF_RWTO - 1) / 2
378
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
379
                           (job_id, fields, prev_job_info,
380
                            prev_log_serial, timeout))
381

    
382
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
383
    while True:
384
      result = self.WaitForJobChangeOnce(job_id, fields,
385
                                         prev_job_info, prev_log_serial)
386
      if result != constants.JOB_NOTCHANGED:
387
        break
388
    return result
389

    
390
  def QueryJobs(self, job_ids, fields):
391
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
392

    
393
  def QueryInstances(self, names, fields, use_locking):
394
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
395

    
396
  def QueryNodes(self, names, fields, use_locking):
397
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
398

    
399
  def QueryExports(self, nodes, use_locking):
400
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
401

    
402
  def QueryClusterInfo(self):
403
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
404

    
405
  def QueryConfigValues(self, fields):
406
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
407

    
408
  def QueryTags(self, kind, name):
409
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))