Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ ae5849b5

History | View | Annotate | Download (8.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocl. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also be used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36

    
37
from ganeti import serializer
38
from ganeti import constants
39

    
40

    
41
KEY_METHOD = 'method'
42
KEY_ARGS = 'args'
43
KEY_SUCCESS = "success"
44
KEY_RESULT = "result"
45

    
46
REQ_SUBMIT_JOB = "SubmitJob"
47
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
48
REQ_CANCEL_JOB = "CancelJob"
49
REQ_ARCHIVE_JOB = "ArchiveJob"
50
REQ_QUERY_JOBS = "QueryJobs"
51
REQ_QUERY_INSTANCES = "QueryInstances"
52
REQ_QUERY_NODES = "QueryNodes"
53
REQ_QUERY_EXPORTS = "QueryExports"
54
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
55

    
56
DEF_CTMO = 10
57
DEF_RWTO = 60
58

    
59

    
60
class ProtocolError(Exception):
61
  """Denotes an error in the server communication"""
62

    
63

    
64
class ConnectionClosedError(ProtocolError):
65
  """Connection closed error"""
66

    
67

    
68
class TimeoutError(ProtocolError):
69
  """Operation timeout error"""
70

    
71

    
72
class EncodingError(ProtocolError):
73
  """Encoding failure on the sending side"""
74

    
75

    
76
class DecodingError(ProtocolError):
77
  """Decoding failure on the receiving side"""
78

    
79

    
80
class RequestError(ProtocolError):
81
  """Error on request
82

83
  This signifies an error in the request format or request handling,
84
  but not (e.g.) an error in starting up an instance.
85

86
  Some common conditions that can trigger this exception:
87
    - job submission failed because the job data was wrong
88
    - query failed because required fields were missing
89

90
  """
91

    
92

    
93
class NoMasterError(ProtocolError):
94
  """The master cannot be reached
95

96
  This means that the master daemon is not running or the socket has
97
  been removed.
98

99
  """
100

    
101

    
102
class Transport:
103
  """Low-level transport class.
104

105
  This is used on the client side.
106

107
  This could be replace by any other class that provides the same
108
  semantics to the Client. This means:
109
    - can send messages and receive messages
110
    - safe for multithreading
111

112
  """
113

    
114
  def __init__(self, address, timeouts=None, eom=None):
115
    """Constructor for the Client class.
116

117
    Arguments:
118
      - address: a valid address the the used transport class
119
      - timeout: a list of timeouts, to be used on connect and read/write
120
      - eom: an identifier to be used as end-of-message which the
121
        upper-layer will guarantee that this identifier will not appear
122
        in any message
123

124
    There are two timeouts used since we might want to wait for a long
125
    time for a response, but the connect timeout should be lower.
126

127
    If not passed, we use a default of 10 and respectively 60 seconds.
128

129
    Note that on reading data, since the timeout applies to an
130
    invidual receive, it might be that the total duration is longer
131
    than timeout value passed (we make a hard limit at twice the read
132
    timeout).
133

134
    """
135
    self.address = address
136
    if timeouts is None:
137
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
138
    else:
139
      self._ctimeout, self._rwtimeout = timeouts
140

    
141
    self.socket = None
142
    self._buffer = ""
143
    self._msgs = collections.deque()
144

    
145
    if eom is None:
146
      self.eom = '\3'
147
    else:
148
      self.eom = eom
149

    
150
    try:
151
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
152
      self.socket.settimeout(self._ctimeout)
153
      try:
154
        self.socket.connect(address)
155
      except socket.timeout, err:
156
        raise TimeoutError("Connect timed out: %s" % str(err))
157
      except socket.error, err:
158
        if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
159
          raise NoMasterError((address,))
160
        raise
161
      self.socket.settimeout(self._rwtimeout)
162
    except (socket.error, NoMasterError):
163
      if self.socket is not None:
164
        self.socket.close()
165
      self.socket = None
166
      raise
167

    
168
  def _CheckSocket(self):
169
    """Make sure we are connected.
170

171
    """
172
    if self.socket is None:
173
      raise ProtocolError("Connection is closed")
174

    
175
  def Send(self, msg):
176
    """Send a message.
177

178
    This just sends a message and doesn't wait for the response.
179

180
    """
181
    if self.eom in msg:
182
      raise EncodingError("Message terminator found in payload")
183
    self._CheckSocket()
184
    try:
185
      self.socket.sendall(msg + self.eom)
186
    except socket.timeout, err:
187
      raise TimeoutError("Sending timeout: %s" % str(err))
188

    
189
  def Recv(self):
190
    """Try to receive a messae from the socket.
191

192
    In case we already have messages queued, we just return from the
193
    queue. Otherwise, we try to read data with a _rwtimeout network
194
    timeout, and making sure we don't go over 2x_rwtimeout as a global
195
    limit.
196

197
    """
198
    self._CheckSocket()
199
    etime = time.time() + self._rwtimeout
200
    while not self._msgs:
201
      if time.time() > etime:
202
        raise TimeoutError("Extended receive timeout")
203
      try:
204
        data = self.socket.recv(4096)
205
      except socket.timeout, err:
206
        raise TimeoutError("Receive timeout: %s" % str(err))
207
      if not data:
208
        raise ConnectionClosedError("Connection closed while reading")
209
      new_msgs = (self._buffer + data).split(self.eom)
210
      self._buffer = new_msgs.pop()
211
      self._msgs.extend(new_msgs)
212
    return self._msgs.popleft()
213

    
214
  def Call(self, msg):
215
    """Send a message and wait for the response.
216

217
    This is just a wrapper over Send and Recv.
218

219
    """
220
    self.Send(msg)
221
    return self.Recv()
222

    
223
  def Close(self):
224
    """Close the socket"""
225
    if self.socket is not None:
226
      self.socket.close()
227
      self.socket = None
228

    
229

    
230
class Client(object):
231
  """High-level client implementation.
232

233
  This uses a backing Transport-like class on top of which it
234
  implements data serialization/deserialization.
235

236
  """
237
  def __init__(self, address=None, timeouts=None, transport=Transport):
238
    """Constructor for the Client class.
239

240
    Arguments:
241
      - address: a valid address the the used transport class
242
      - timeout: a list of timeouts, to be used on connect and read/write
243
      - transport: a Transport-like class
244

245

246
    If timeout is not passed, the default timeouts of the transport
247
    class are used.
248

249
    """
250
    if address is None:
251
      address = constants.MASTER_SOCKET
252
    self.transport = transport(address, timeouts=timeouts)
253

    
254
  def CallMethod(self, method, args):
255
    """Send a generic request and return the response.
256

257
    """
258
    # Build request
259
    request = {
260
      KEY_METHOD: method,
261
      KEY_ARGS: args,
262
      }
263

    
264
    # Send request and wait for response
265
    result = self.transport.Call(serializer.DumpJson(request, indent=False))
266
    try:
267
      data = serializer.LoadJson(result)
268
    except Exception, err:
269
      raise ProtocolError("Error while deserializing response: %s" % str(err))
270

    
271
    # Validate response
272
    if (not isinstance(data, dict) or
273
        KEY_SUCCESS not in data or
274
        KEY_RESULT not in data):
275
      raise DecodingError("Invalid response from server: %s" % str(data))
276

    
277
    if not data[KEY_SUCCESS]:
278
      # TODO: decide on a standard exception
279
      raise RequestError(data[KEY_RESULT])
280

    
281
    return data[KEY_RESULT]
282

    
283
  def SubmitJob(self, ops):
284
    ops_state = map(lambda op: op.__getstate__(), ops)
285
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
286

    
287
  def CancelJob(self, job_id):
288
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
289

    
290
  def ArchiveJob(self, job_id):
291
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
292

    
293
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
294
    timeout = (DEF_RWTO - 1) / 2
295
    while True:
296
      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
297
                               (job_id, fields, prev_job_info,
298
                                prev_log_serial, timeout))
299
      if result != constants.JOB_NOTCHANGED:
300
        break
301
    return result
302

    
303
  def QueryJobs(self, job_ids, fields):
304
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
305

    
306
  def QueryInstances(self, names, fields):
307
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
308

    
309
  def QueryNodes(self, names, fields):
310
    return self.CallMethod(REQ_QUERY_NODES, (names, fields))
311

    
312
  def QueryExports(self, nodes):
313
    return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
314

    
315
  def QueryConfigValues(self, fields):
316
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
317

    
318
# TODO: class Server(object)