Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 7260cfbe

History | View | Annotate | Download (10.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36

    
37
from ganeti import serializer
38
from ganeti import constants
39
from ganeti import errors
40

    
41

    
42
KEY_METHOD = 'method'
43
KEY_ARGS = 'args'
44
KEY_SUCCESS = "success"
45
KEY_RESULT = "result"
46

    
47
REQ_SUBMIT_JOB = "SubmitJob"
48
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
49
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
50
REQ_CANCEL_JOB = "CancelJob"
51
REQ_ARCHIVE_JOB = "ArchiveJob"
52
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
53
REQ_QUERY_JOBS = "QueryJobs"
54
REQ_QUERY_INSTANCES = "QueryInstances"
55
REQ_QUERY_NODES = "QueryNodes"
56
REQ_QUERY_EXPORTS = "QueryExports"
57
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
58
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
59
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
60
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
61

    
62
DEF_CTMO = 10
63
DEF_RWTO = 60
64

    
65

    
66
class ProtocolError(Exception):
67
  """Denotes an error in the server communication"""
68

    
69

    
70
class ConnectionClosedError(ProtocolError):
71
  """Connection closed error"""
72

    
73

    
74
class TimeoutError(ProtocolError):
75
  """Operation timeout error"""
76

    
77

    
78
class EncodingError(ProtocolError):
79
  """Encoding failure on the sending side"""
80

    
81

    
82
class DecodingError(ProtocolError):
83
  """Decoding failure on the receiving side"""
84

    
85

    
86
class RequestError(ProtocolError):
87
  """Error on request
88

89
  This signifies an error in the request format or request handling,
90
  but not (e.g.) an error in starting up an instance.
91

92
  Some common conditions that can trigger this exception:
93
    - job submission failed because the job data was wrong
94
    - query failed because required fields were missing
95

96
  """
97

    
98

    
99
class NoMasterError(ProtocolError):
100
  """The master cannot be reached
101

102
  This means that the master daemon is not running or the socket has
103
  been removed.
104

105
  """
106

    
107

    
108
class Transport:
109
  """Low-level transport class.
110

111
  This is used on the client side.
112

113
  This could be replace by any other class that provides the same
114
  semantics to the Client. This means:
115
    - can send messages and receive messages
116
    - safe for multithreading
117

118
  """
119

    
120
  def __init__(self, address, timeouts=None, eom=None):
121
    """Constructor for the Client class.
122

123
    Arguments:
124
      - address: a valid address the the used transport class
125
      - timeout: a list of timeouts, to be used on connect and read/write
126
      - eom: an identifier to be used as end-of-message which the
127
        upper-layer will guarantee that this identifier will not appear
128
        in any message
129

130
    There are two timeouts used since we might want to wait for a long
131
    time for a response, but the connect timeout should be lower.
132

133
    If not passed, we use a default of 10 and respectively 60 seconds.
134

135
    Note that on reading data, since the timeout applies to an
136
    invidual receive, it might be that the total duration is longer
137
    than timeout value passed (we make a hard limit at twice the read
138
    timeout).
139

140
    """
141
    self.address = address
142
    if timeouts is None:
143
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
144
    else:
145
      self._ctimeout, self._rwtimeout = timeouts
146

    
147
    self.socket = None
148
    self._buffer = ""
149
    self._msgs = collections.deque()
150

    
151
    if eom is None:
152
      self.eom = '\3'
153
    else:
154
      self.eom = eom
155

    
156
    try:
157
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
158
      self.socket.settimeout(self._ctimeout)
159
      try:
160
        self.socket.connect(address)
161
      except socket.timeout, err:
162
        raise TimeoutError("Connect timed out: %s" % str(err))
163
      except socket.error, err:
164
        if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
165
          raise NoMasterError(address)
166
        raise
167
      self.socket.settimeout(self._rwtimeout)
168
    except (socket.error, NoMasterError):
169
      if self.socket is not None:
170
        self.socket.close()
171
      self.socket = None
172
      raise
173

    
174
  def _CheckSocket(self):
175
    """Make sure we are connected.
176

177
    """
178
    if self.socket is None:
179
      raise ProtocolError("Connection is closed")
180

    
181
  def Send(self, msg):
182
    """Send a message.
183

184
    This just sends a message and doesn't wait for the response.
185

186
    """
187
    if self.eom in msg:
188
      raise EncodingError("Message terminator found in payload")
189
    self._CheckSocket()
190
    try:
191
      # TODO: sendall is not guaranteed to send everything
192
      self.socket.sendall(msg + self.eom)
193
    except socket.timeout, err:
194
      raise TimeoutError("Sending timeout: %s" % str(err))
195

    
196
  def Recv(self):
197
    """Try to receive a message from the socket.
198

199
    In case we already have messages queued, we just return from the
200
    queue. Otherwise, we try to read data with a _rwtimeout network
201
    timeout, and making sure we don't go over 2x_rwtimeout as a global
202
    limit.
203

204
    """
205
    self._CheckSocket()
206
    etime = time.time() + self._rwtimeout
207
    while not self._msgs:
208
      if time.time() > etime:
209
        raise TimeoutError("Extended receive timeout")
210
      while True:
211
        try:
212
          data = self.socket.recv(4096)
213
        except socket.error, err:
214
          if err.args and err.args[0] == errno.EAGAIN:
215
            continue
216
          raise
217
        except socket.timeout, err:
218
          raise TimeoutError("Receive timeout: %s" % str(err))
219
        break
220
      if not data:
221
        raise ConnectionClosedError("Connection closed while reading")
222
      new_msgs = (self._buffer + data).split(self.eom)
223
      self._buffer = new_msgs.pop()
224
      self._msgs.extend(new_msgs)
225
    return self._msgs.popleft()
226

    
227
  def Call(self, msg):
228
    """Send a message and wait for the response.
229

230
    This is just a wrapper over Send and Recv.
231

232
    """
233
    self.Send(msg)
234
    return self.Recv()
235

    
236
  def Close(self):
237
    """Close the socket"""
238
    if self.socket is not None:
239
      self.socket.close()
240
      self.socket = None
241

    
242

    
243
class Client(object):
244
  """High-level client implementation.
245

246
  This uses a backing Transport-like class on top of which it
247
  implements data serialization/deserialization.
248

249
  """
250
  def __init__(self, address=None, timeouts=None, transport=Transport):
251
    """Constructor for the Client class.
252

253
    Arguments:
254
      - address: a valid address the the used transport class
255
      - timeout: a list of timeouts, to be used on connect and read/write
256
      - transport: a Transport-like class
257

258

259
    If timeout is not passed, the default timeouts of the transport
260
    class are used.
261

262
    """
263
    if address is None:
264
      address = constants.MASTER_SOCKET
265
    self.address = address
266
    self.timeouts = timeouts
267
    self.transport_class = transport
268
    self.transport = None
269
    self._InitTransport()
270

    
271
  def _InitTransport(self):
272
    """(Re)initialize the transport if needed.
273

274
    """
275
    if self.transport is None:
276
      self.transport = self.transport_class(self.address,
277
                                            timeouts=self.timeouts)
278

    
279
  def _CloseTransport(self):
280
    """Close the transport, ignoring errors.
281

282
    """
283
    if self.transport is None:
284
      return
285
    try:
286
      old_transp = self.transport
287
      self.transport = None
288
      old_transp.Close()
289
    except Exception: # pylint: disable-msg=W0703
290
      pass
291

    
292
  def CallMethod(self, method, args):
293
    """Send a generic request and return the response.
294

295
    """
296
    # Build request
297
    request = {
298
      KEY_METHOD: method,
299
      KEY_ARGS: args,
300
      }
301

    
302
    # Serialize the request
303
    send_data = serializer.DumpJson(request, indent=False)
304

    
305
    # Send request and wait for response
306
    try:
307
      self._InitTransport()
308
      result = self.transport.Call(send_data)
309
    except Exception:
310
      self._CloseTransport()
311
      raise
312

    
313
    # Parse the result
314
    try:
315
      data = serializer.LoadJson(result)
316
    except Exception, err:
317
      raise ProtocolError("Error while deserializing response: %s" % str(err))
318

    
319
    # Validate response
320
    if (not isinstance(data, dict) or
321
        KEY_SUCCESS not in data or
322
        KEY_RESULT not in data):
323
      raise DecodingError("Invalid response from server: %s" % str(data))
324

    
325
    result = data[KEY_RESULT]
326

    
327
    if not data[KEY_SUCCESS]:
328
      errors.MaybeRaise(result)
329
      raise RequestError(result)
330

    
331
    return result
332

    
333
  def SetQueueDrainFlag(self, drain_flag):
334
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
335

    
336
  def SetWatcherPause(self, until):
337
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
338

    
339
  def SubmitJob(self, ops):
340
    ops_state = map(lambda op: op.__getstate__(), ops)
341
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
342

    
343
  def SubmitManyJobs(self, jobs):
344
    jobs_state = []
345
    for ops in jobs:
346
      jobs_state.append([op.__getstate__() for op in ops])
347
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
348

    
349
  def CancelJob(self, job_id):
350
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
351

    
352
  def ArchiveJob(self, job_id):
353
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
354

    
355
  def AutoArchiveJobs(self, age):
356
    timeout = (DEF_RWTO - 1) / 2
357
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
358

    
359
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
360
    timeout = (DEF_RWTO - 1) / 2
361
    while True:
362
      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
363
                               (job_id, fields, prev_job_info,
364
                                prev_log_serial, timeout))
365
      if result != constants.JOB_NOTCHANGED:
366
        break
367
    return result
368

    
369
  def QueryJobs(self, job_ids, fields):
370
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
371

    
372
  def QueryInstances(self, names, fields, use_locking):
373
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
374

    
375
  def QueryNodes(self, names, fields, use_locking):
376
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
377

    
378
  def QueryExports(self, nodes, use_locking):
379
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
380

    
381
  def QueryClusterInfo(self):
382
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
383

    
384
  def QueryConfigValues(self, fields):
385
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
386

    
387

    
388
# TODO: class Server(object)