Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ dfe57c22

History | View | Annotate | Download (8.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocl. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also be used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36

    
37
from ganeti import serializer
38
from ganeti import constants
39

    
40

    
41
KEY_METHOD = 'method'
42
KEY_ARGS = 'args'
43
KEY_SUCCESS = "success"
44
KEY_RESULT = "result"
45

    
46
REQ_SUBMIT_JOB = "SubmitJob"
47
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
48
REQ_CANCEL_JOB = "CancelJob"
49
REQ_ARCHIVE_JOB = "ArchiveJob"
50
REQ_QUERY_JOBS = "QueryJobs"
51
REQ_QUERY_INSTANCES = "QueryInstances"
52
REQ_QUERY_NODES = "QueryNodes"
53
REQ_QUERY_EXPORTS = "QueryExports"
54

    
55
DEF_CTMO = 10
56
DEF_RWTO = 60
57

    
58

    
59
class ProtocolError(Exception):
60
  """Denotes an error in the server communication"""
61

    
62

    
63
class ConnectionClosedError(ProtocolError):
64
  """Connection closed error"""
65

    
66

    
67
class TimeoutError(ProtocolError):
68
  """Operation timeout error"""
69

    
70

    
71
class EncodingError(ProtocolError):
72
  """Encoding failure on the sending side"""
73

    
74

    
75
class DecodingError(ProtocolError):
76
  """Decoding failure on the receiving side"""
77

    
78

    
79
class RequestError(ProtocolError):
80
  """Error on request
81

82
  This signifies an error in the request format or request handling,
83
  but not (e.g.) an error in starting up an instance.
84

85
  Some common conditions that can trigger this exception:
86
    - job submission failed because the job data was wrong
87
    - query failed because required fields were missing
88

89
  """
90

    
91

    
92
class NoMasterError(ProtocolError):
93
  """The master cannot be reached
94

95
  This means that the master daemon is not running or the socket has
96
  been removed.
97

98
  """
99

    
100

    
101
class Transport:
102
  """Low-level transport class.
103

104
  This is used on the client side.
105

106
  This could be replace by any other class that provides the same
107
  semantics to the Client. This means:
108
    - can send messages and receive messages
109
    - safe for multithreading
110

111
  """
112

    
113
  def __init__(self, address, timeouts=None, eom=None):
114
    """Constructor for the Client class.
115

116
    Arguments:
117
      - address: a valid address the the used transport class
118
      - timeout: a list of timeouts, to be used on connect and read/write
119
      - eom: an identifier to be used as end-of-message which the
120
        upper-layer will guarantee that this identifier will not appear
121
        in any message
122

123
    There are two timeouts used since we might want to wait for a long
124
    time for a response, but the connect timeout should be lower.
125

126
    If not passed, we use a default of 10 and respectively 60 seconds.
127

128
    Note that on reading data, since the timeout applies to an
129
    invidual receive, it might be that the total duration is longer
130
    than timeout value passed (we make a hard limit at twice the read
131
    timeout).
132

133
    """
134
    self.address = address
135
    if timeouts is None:
136
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
137
    else:
138
      self._ctimeout, self._rwtimeout = timeouts
139

    
140
    self.socket = None
141
    self._buffer = ""
142
    self._msgs = collections.deque()
143

    
144
    if eom is None:
145
      self.eom = '\3'
146
    else:
147
      self.eom = eom
148

    
149
    try:
150
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
151
      self.socket.settimeout(self._ctimeout)
152
      try:
153
        self.socket.connect(address)
154
      except socket.timeout, err:
155
        raise TimeoutError("Connect timed out: %s" % str(err))
156
      except socket.error, err:
157
        if err.args[0] == errno.ENOENT:
158
          raise NoMasterError((address,))
159
        raise
160
      self.socket.settimeout(self._rwtimeout)
161
    except (socket.error, NoMasterError):
162
      if self.socket is not None:
163
        self.socket.close()
164
      self.socket = None
165
      raise
166

    
167
  def _CheckSocket(self):
168
    """Make sure we are connected.
169

170
    """
171
    if self.socket is None:
172
      raise ProtocolError("Connection is closed")
173

    
174
  def Send(self, msg):
175
    """Send a message.
176

177
    This just sends a message and doesn't wait for the response.
178

179
    """
180
    if self.eom in msg:
181
      raise EncodingError("Message terminator found in payload")
182
    self._CheckSocket()
183
    try:
184
      self.socket.sendall(msg + self.eom)
185
    except socket.timeout, err:
186
      raise TimeoutError("Sending timeout: %s" % str(err))
187

    
188
  def Recv(self):
189
    """Try to receive a messae from the socket.
190

191
    In case we already have messages queued, we just return from the
192
    queue. Otherwise, we try to read data with a _rwtimeout network
193
    timeout, and making sure we don't go over 2x_rwtimeout as a global
194
    limit.
195

196
    """
197
    self._CheckSocket()
198
    etime = time.time() + self._rwtimeout
199
    while not self._msgs:
200
      if time.time() > etime:
201
        raise TimeoutError("Extended receive timeout")
202
      try:
203
        data = self.socket.recv(4096)
204
      except socket.timeout, err:
205
        raise TimeoutError("Receive timeout: %s" % str(err))
206
      if not data:
207
        raise ConnectionClosedError("Connection closed while reading")
208
      new_msgs = (self._buffer + data).split(self.eom)
209
      self._buffer = new_msgs.pop()
210
      self._msgs.extend(new_msgs)
211
    return self._msgs.popleft()
212

    
213
  def Call(self, msg):
214
    """Send a message and wait for the response.
215

216
    This is just a wrapper over Send and Recv.
217

218
    """
219
    self.Send(msg)
220
    return self.Recv()
221

    
222
  def Close(self):
223
    """Close the socket"""
224
    if self.socket is not None:
225
      self.socket.close()
226
      self.socket = None
227

    
228

    
229
class Client(object):
230
  """High-level client implementation.
231

232
  This uses a backing Transport-like class on top of which it
233
  implements data serialization/deserialization.
234

235
  """
236
  def __init__(self, address=None, timeouts=None, transport=Transport):
237
    """Constructor for the Client class.
238

239
    Arguments:
240
      - address: a valid address the the used transport class
241
      - timeout: a list of timeouts, to be used on connect and read/write
242
      - transport: a Transport-like class
243

244

245
    If timeout is not passed, the default timeouts of the transport
246
    class are used.
247

248
    """
249
    if address is None:
250
      address = constants.MASTER_SOCKET
251
    self.transport = transport(address, timeouts=timeouts)
252

    
253
  def CallMethod(self, method, args):
254
    """Send a generic request and return the response.
255

256
    """
257
    # Build request
258
    request = {
259
      KEY_METHOD: method,
260
      KEY_ARGS: args,
261
      }
262

    
263
    # Send request and wait for response
264
    result = self.transport.Call(serializer.DumpJson(request, indent=False))
265
    try:
266
      data = serializer.LoadJson(result)
267
    except Exception, err:
268
      raise ProtocolError("Error while deserializing response: %s" % str(err))
269

    
270
    # Validate response
271
    if (not isinstance(data, dict) or
272
        KEY_SUCCESS not in data or
273
        KEY_RESULT not in data):
274
      raise DecodingError("Invalid response from server: %s" % str(data))
275

    
276
    if not data[KEY_SUCCESS]:
277
      # TODO: decide on a standard exception
278
      raise RequestError(data[KEY_RESULT])
279

    
280
    return data[KEY_RESULT]
281

    
282
  def SubmitJob(self, ops):
283
    ops_state = map(lambda op: op.__getstate__(), ops)
284
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
285

    
286
  def CancelJob(self, job_id):
287
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
288

    
289
  def ArchiveJob(self, job_id):
290
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
291

    
292
  def WaitForJobChange(self, job_id, fields, previous):
293
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
294
                           (job_id, fields, previous))
295

    
296
  def QueryJobs(self, job_ids, fields):
297
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
298

    
299
  def QueryInstances(self, names, fields):
300
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
301

    
302
  def QueryNodes(self, names, fields):
303
    return self.CallMethod(REQ_QUERY_NODES, (names, fields))
304

    
305
  def QueryExports(self, nodes):
306
    return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
307

    
308
# TODO: class Server(object)