Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 797506fc

History | View | Annotate | Download (10.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36

    
37
from ganeti import serializer
38
from ganeti import constants
39
from ganeti import errors
40

    
41

    
42
KEY_METHOD = 'method'
43
KEY_ARGS = 'args'
44
KEY_SUCCESS = "success"
45
KEY_RESULT = "result"
46

    
47
REQ_SUBMIT_JOB = "SubmitJob"
48
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
49
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
50
REQ_CANCEL_JOB = "CancelJob"
51
REQ_ARCHIVE_JOB = "ArchiveJob"
52
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
53
REQ_QUERY_JOBS = "QueryJobs"
54
REQ_QUERY_INSTANCES = "QueryInstances"
55
REQ_QUERY_NODES = "QueryNodes"
56
REQ_QUERY_EXPORTS = "QueryExports"
57
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
58
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
59
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
60
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
61

    
62
DEF_CTMO = 10
63
DEF_RWTO = 60
64

    
65

    
66
class ProtocolError(errors.GenericError):
67
  """Denotes an error in the LUXI protocol"""
68

    
69

    
70
class ConnectionClosedError(ProtocolError):
71
  """Connection closed error"""
72

    
73

    
74
class TimeoutError(ProtocolError):
75
  """Operation timeout error"""
76

    
77

    
78
class RequestError(ProtocolError):
79
  """Error on request
80

81
  This signifies an error in the request format or request handling,
82
  but not (e.g.) an error in starting up an instance.
83

84
  Some common conditions that can trigger this exception:
85
    - job submission failed because the job data was wrong
86
    - query failed because required fields were missing
87

88
  """
89

    
90

    
91
class NoMasterError(ProtocolError):
92
  """The master cannot be reached
93

94
  This means that the master daemon is not running or the socket has
95
  been removed.
96

97
  """
98

    
99

    
100
class Transport:
101
  """Low-level transport class.
102

103
  This is used on the client side.
104

105
  This could be replace by any other class that provides the same
106
  semantics to the Client. This means:
107
    - can send messages and receive messages
108
    - safe for multithreading
109

110
  """
111

    
112
  def __init__(self, address, timeouts=None, eom=None):
113
    """Constructor for the Client class.
114

115
    Arguments:
116
      - address: a valid address the the used transport class
117
      - timeout: a list of timeouts, to be used on connect and read/write
118
      - eom: an identifier to be used as end-of-message which the
119
        upper-layer will guarantee that this identifier will not appear
120
        in any message
121

122
    There are two timeouts used since we might want to wait for a long
123
    time for a response, but the connect timeout should be lower.
124

125
    If not passed, we use a default of 10 and respectively 60 seconds.
126

127
    Note that on reading data, since the timeout applies to an
128
    invidual receive, it might be that the total duration is longer
129
    than timeout value passed (we make a hard limit at twice the read
130
    timeout).
131

132
    """
133
    self.address = address
134
    if timeouts is None:
135
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
136
    else:
137
      self._ctimeout, self._rwtimeout = timeouts
138

    
139
    self.socket = None
140
    self._buffer = ""
141
    self._msgs = collections.deque()
142

    
143
    if eom is None:
144
      self.eom = '\3'
145
    else:
146
      self.eom = eom
147

    
148
    try:
149
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
150
      self.socket.settimeout(self._ctimeout)
151
      try:
152
        self.socket.connect(address)
153
      except socket.timeout, err:
154
        raise TimeoutError("Connect timed out: %s" % str(err))
155
      except socket.error, err:
156
        if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
157
          raise NoMasterError(address)
158
        raise
159
      self.socket.settimeout(self._rwtimeout)
160
    except (socket.error, NoMasterError):
161
      if self.socket is not None:
162
        self.socket.close()
163
      self.socket = None
164
      raise
165

    
166
  def _CheckSocket(self):
167
    """Make sure we are connected.
168

169
    """
170
    if self.socket is None:
171
      raise ProtocolError("Connection is closed")
172

    
173
  def Send(self, msg):
174
    """Send a message.
175

176
    This just sends a message and doesn't wait for the response.
177

178
    """
179
    if self.eom in msg:
180
      raise ProtocolError("Message terminator found in payload")
181

    
182
    self._CheckSocket()
183
    try:
184
      # TODO: sendall is not guaranteed to send everything
185
      self.socket.sendall(msg + self.eom)
186
    except socket.timeout, err:
187
      raise TimeoutError("Sending timeout: %s" % str(err))
188

    
189
  def Recv(self):
190
    """Try to receive a message from the socket.
191

192
    In case we already have messages queued, we just return from the
193
    queue. Otherwise, we try to read data with a _rwtimeout network
194
    timeout, and making sure we don't go over 2x_rwtimeout as a global
195
    limit.
196

197
    """
198
    self._CheckSocket()
199
    etime = time.time() + self._rwtimeout
200
    while not self._msgs:
201
      if time.time() > etime:
202
        raise TimeoutError("Extended receive timeout")
203
      while True:
204
        try:
205
          data = self.socket.recv(4096)
206
        except socket.error, err:
207
          if err.args and err.args[0] == errno.EAGAIN:
208
            continue
209
          raise
210
        except socket.timeout, err:
211
          raise TimeoutError("Receive timeout: %s" % str(err))
212
        break
213
      if not data:
214
        raise ConnectionClosedError("Connection closed while reading")
215
      new_msgs = (self._buffer + data).split(self.eom)
216
      self._buffer = new_msgs.pop()
217
      self._msgs.extend(new_msgs)
218
    return self._msgs.popleft()
219

    
220
  def Call(self, msg):
221
    """Send a message and wait for the response.
222

223
    This is just a wrapper over Send and Recv.
224

225
    """
226
    self.Send(msg)
227
    return self.Recv()
228

    
229
  def Close(self):
230
    """Close the socket"""
231
    if self.socket is not None:
232
      self.socket.close()
233
      self.socket = None
234

    
235

    
236
class Client(object):
237
  """High-level client implementation.
238

239
  This uses a backing Transport-like class on top of which it
240
  implements data serialization/deserialization.
241

242
  """
243
  def __init__(self, address=None, timeouts=None, transport=Transport):
244
    """Constructor for the Client class.
245

246
    Arguments:
247
      - address: a valid address the the used transport class
248
      - timeout: a list of timeouts, to be used on connect and read/write
249
      - transport: a Transport-like class
250

251

252
    If timeout is not passed, the default timeouts of the transport
253
    class are used.
254

255
    """
256
    if address is None:
257
      address = constants.MASTER_SOCKET
258
    self.address = address
259
    self.timeouts = timeouts
260
    self.transport_class = transport
261
    self.transport = None
262
    self._InitTransport()
263

    
264
  def _InitTransport(self):
265
    """(Re)initialize the transport if needed.
266

267
    """
268
    if self.transport is None:
269
      self.transport = self.transport_class(self.address,
270
                                            timeouts=self.timeouts)
271

    
272
  def _CloseTransport(self):
273
    """Close the transport, ignoring errors.
274

275
    """
276
    if self.transport is None:
277
      return
278
    try:
279
      old_transp = self.transport
280
      self.transport = None
281
      old_transp.Close()
282
    except Exception: # pylint: disable-msg=W0703
283
      pass
284

    
285
  def CallMethod(self, method, args):
286
    """Send a generic request and return the response.
287

288
    """
289
    # Build request
290
    request = {
291
      KEY_METHOD: method,
292
      KEY_ARGS: args,
293
      }
294

    
295
    # Serialize the request
296
    send_data = serializer.DumpJson(request, indent=False)
297

    
298
    # Send request and wait for response
299
    try:
300
      self._InitTransport()
301
      result = self.transport.Call(send_data)
302
    except Exception:
303
      self._CloseTransport()
304
      raise
305

    
306
    # Parse the result
307
    try:
308
      data = serializer.LoadJson(result)
309
    except Exception, err:
310
      raise ProtocolError("Error while deserializing response: %s" % str(err))
311

    
312
    # Validate response
313
    if (not isinstance(data, dict) or
314
        KEY_SUCCESS not in data or
315
        KEY_RESULT not in data):
316
      raise ProtocolError("Invalid response from server: %s" % str(data))
317

    
318
    result = data[KEY_RESULT]
319

    
320
    if not data[KEY_SUCCESS]:
321
      errors.MaybeRaise(result)
322
      raise RequestError(result)
323

    
324
    return result
325

    
326
  def SetQueueDrainFlag(self, drain_flag):
327
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
328

    
329
  def SetWatcherPause(self, until):
330
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
331

    
332
  def SubmitJob(self, ops):
333
    ops_state = map(lambda op: op.__getstate__(), ops)
334
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
335

    
336
  def SubmitManyJobs(self, jobs):
337
    jobs_state = []
338
    for ops in jobs:
339
      jobs_state.append([op.__getstate__() for op in ops])
340
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
341

    
342
  def CancelJob(self, job_id):
343
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
344

    
345
  def ArchiveJob(self, job_id):
346
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
347

    
348
  def AutoArchiveJobs(self, age):
349
    timeout = (DEF_RWTO - 1) / 2
350
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
351

    
352
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
353
    timeout = (DEF_RWTO - 1) / 2
354
    while True:
355
      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
356
                               (job_id, fields, prev_job_info,
357
                                prev_log_serial, timeout))
358
      if result != constants.JOB_NOTCHANGED:
359
        break
360
    return result
361

    
362
  def QueryJobs(self, job_ids, fields):
363
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
364

    
365
  def QueryInstances(self, names, fields, use_locking):
366
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
367

    
368
  def QueryNodes(self, names, fields, use_locking):
369
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
370

    
371
  def QueryExports(self, nodes, use_locking):
372
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
373

    
374
  def QueryClusterInfo(self):
375
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
376

    
377
  def QueryConfigValues(self, fields):
378
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
379

    
380

    
381
# TODO: class Server(object)