Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 8b3fd458

History | View | Annotate | Download (8.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocl. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also be used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36

    
37
from ganeti import serializer
38
from ganeti import constants
39

    
40

    
41
KEY_METHOD = 'method'
42
KEY_ARGS = 'args'
43
KEY_SUCCESS = "success"
44
KEY_RESULT = "result"
45

    
46
REQ_SUBMIT_JOB = "SubmitJob"
47
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
48
REQ_CANCEL_JOB = "CancelJob"
49
REQ_ARCHIVE_JOB = "ArchiveJob"
50
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
51
REQ_QUERY_JOBS = "QueryJobs"
52
REQ_QUERY_INSTANCES = "QueryInstances"
53
REQ_QUERY_NODES = "QueryNodes"
54
REQ_QUERY_EXPORTS = "QueryExports"
55
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
56

    
57
DEF_CTMO = 10
58
DEF_RWTO = 60
59

    
60

    
61
class ProtocolError(Exception):
62
  """Denotes an error in the server communication"""
63

    
64

    
65
class ConnectionClosedError(ProtocolError):
66
  """Connection closed error"""
67

    
68

    
69
class TimeoutError(ProtocolError):
70
  """Operation timeout error"""
71

    
72

    
73
class EncodingError(ProtocolError):
74
  """Encoding failure on the sending side"""
75

    
76

    
77
class DecodingError(ProtocolError):
78
  """Decoding failure on the receiving side"""
79

    
80

    
81
class RequestError(ProtocolError):
82
  """Error on request
83

84
  This signifies an error in the request format or request handling,
85
  but not (e.g.) an error in starting up an instance.
86

87
  Some common conditions that can trigger this exception:
88
    - job submission failed because the job data was wrong
89
    - query failed because required fields were missing
90

91
  """
92

    
93

    
94
class NoMasterError(ProtocolError):
95
  """The master cannot be reached
96

97
  This means that the master daemon is not running or the socket has
98
  been removed.
99

100
  """
101

    
102

    
103
class Transport:
104
  """Low-level transport class.
105

106
  This is used on the client side.
107

108
  This could be replace by any other class that provides the same
109
  semantics to the Client. This means:
110
    - can send messages and receive messages
111
    - safe for multithreading
112

113
  """
114

    
115
  def __init__(self, address, timeouts=None, eom=None):
116
    """Constructor for the Client class.
117

118
    Arguments:
119
      - address: a valid address the the used transport class
120
      - timeout: a list of timeouts, to be used on connect and read/write
121
      - eom: an identifier to be used as end-of-message which the
122
        upper-layer will guarantee that this identifier will not appear
123
        in any message
124

125
    There are two timeouts used since we might want to wait for a long
126
    time for a response, but the connect timeout should be lower.
127

128
    If not passed, we use a default of 10 and respectively 60 seconds.
129

130
    Note that on reading data, since the timeout applies to an
131
    invidual receive, it might be that the total duration is longer
132
    than timeout value passed (we make a hard limit at twice the read
133
    timeout).
134

135
    """
136
    self.address = address
137
    if timeouts is None:
138
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
139
    else:
140
      self._ctimeout, self._rwtimeout = timeouts
141

    
142
    self.socket = None
143
    self._buffer = ""
144
    self._msgs = collections.deque()
145

    
146
    if eom is None:
147
      self.eom = '\3'
148
    else:
149
      self.eom = eom
150

    
151
    try:
152
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
153
      self.socket.settimeout(self._ctimeout)
154
      try:
155
        self.socket.connect(address)
156
      except socket.timeout, err:
157
        raise TimeoutError("Connect timed out: %s" % str(err))
158
      except socket.error, err:
159
        if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
160
          raise NoMasterError((address,))
161
        raise
162
      self.socket.settimeout(self._rwtimeout)
163
    except (socket.error, NoMasterError):
164
      if self.socket is not None:
165
        self.socket.close()
166
      self.socket = None
167
      raise
168

    
169
  def _CheckSocket(self):
170
    """Make sure we are connected.
171

172
    """
173
    if self.socket is None:
174
      raise ProtocolError("Connection is closed")
175

    
176
  def Send(self, msg):
177
    """Send a message.
178

179
    This just sends a message and doesn't wait for the response.
180

181
    """
182
    if self.eom in msg:
183
      raise EncodingError("Message terminator found in payload")
184
    self._CheckSocket()
185
    try:
186
      self.socket.sendall(msg + self.eom)
187
    except socket.timeout, err:
188
      raise TimeoutError("Sending timeout: %s" % str(err))
189

    
190
  def Recv(self):
191
    """Try to receive a messae from the socket.
192

193
    In case we already have messages queued, we just return from the
194
    queue. Otherwise, we try to read data with a _rwtimeout network
195
    timeout, and making sure we don't go over 2x_rwtimeout as a global
196
    limit.
197

198
    """
199
    self._CheckSocket()
200
    etime = time.time() + self._rwtimeout
201
    while not self._msgs:
202
      if time.time() > etime:
203
        raise TimeoutError("Extended receive timeout")
204
      try:
205
        data = self.socket.recv(4096)
206
      except socket.timeout, err:
207
        raise TimeoutError("Receive timeout: %s" % str(err))
208
      if not data:
209
        raise ConnectionClosedError("Connection closed while reading")
210
      new_msgs = (self._buffer + data).split(self.eom)
211
      self._buffer = new_msgs.pop()
212
      self._msgs.extend(new_msgs)
213
    return self._msgs.popleft()
214

    
215
  def Call(self, msg):
216
    """Send a message and wait for the response.
217

218
    This is just a wrapper over Send and Recv.
219

220
    """
221
    self.Send(msg)
222
    return self.Recv()
223

    
224
  def Close(self):
225
    """Close the socket"""
226
    if self.socket is not None:
227
      self.socket.close()
228
      self.socket = None
229

    
230

    
231
class Client(object):
232
  """High-level client implementation.
233

234
  This uses a backing Transport-like class on top of which it
235
  implements data serialization/deserialization.
236

237
  """
238
  def __init__(self, address=None, timeouts=None, transport=Transport):
239
    """Constructor for the Client class.
240

241
    Arguments:
242
      - address: a valid address the the used transport class
243
      - timeout: a list of timeouts, to be used on connect and read/write
244
      - transport: a Transport-like class
245

246

247
    If timeout is not passed, the default timeouts of the transport
248
    class are used.
249

250
    """
251
    if address is None:
252
      address = constants.MASTER_SOCKET
253
    self.transport = transport(address, timeouts=timeouts)
254

    
255
  def CallMethod(self, method, args):
256
    """Send a generic request and return the response.
257

258
    """
259
    # Build request
260
    request = {
261
      KEY_METHOD: method,
262
      KEY_ARGS: args,
263
      }
264

    
265
    # Send request and wait for response
266
    result = self.transport.Call(serializer.DumpJson(request, indent=False))
267
    try:
268
      data = serializer.LoadJson(result)
269
    except Exception, err:
270
      raise ProtocolError("Error while deserializing response: %s" % str(err))
271

    
272
    # Validate response
273
    if (not isinstance(data, dict) or
274
        KEY_SUCCESS not in data or
275
        KEY_RESULT not in data):
276
      raise DecodingError("Invalid response from server: %s" % str(data))
277

    
278
    if not data[KEY_SUCCESS]:
279
      # TODO: decide on a standard exception
280
      raise RequestError(data[KEY_RESULT])
281

    
282
    return data[KEY_RESULT]
283

    
284
  def SubmitJob(self, ops):
285
    ops_state = map(lambda op: op.__getstate__(), ops)
286
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
287

    
288
  def CancelJob(self, job_id):
289
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
290

    
291
  def ArchiveJob(self, job_id):
292
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
293

    
294
  def AutoArchiveJobs(self, age):
295
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, age)
296

    
297
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
298
    timeout = (DEF_RWTO - 1) / 2
299
    while True:
300
      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
301
                               (job_id, fields, prev_job_info,
302
                                prev_log_serial, timeout))
303
      if result != constants.JOB_NOTCHANGED:
304
        break
305
    return result
306

    
307
  def QueryJobs(self, job_ids, fields):
308
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
309

    
310
  def QueryInstances(self, names, fields):
311
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
312

    
313
  def QueryNodes(self, names, fields):
314
    return self.CallMethod(REQ_QUERY_NODES, (names, fields))
315

    
316
  def QueryExports(self, nodes):
317
    return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
318

    
319
  def QueryConfigValues(self, fields):
320
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
321

    
322
# TODO: class Server(object)