Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 32f93223

History | View | Annotate | Download (8.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocl. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also be used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36

    
37
from ganeti import serializer
38
from ganeti import constants
39

    
40

    
41
KEY_METHOD = 'method'
42
KEY_ARGS = 'args'
43
KEY_SUCCESS = "success"
44
KEY_RESULT = "result"
45

    
46
REQ_SUBMIT_JOB = "SubmitJob"
47
REQ_CANCEL_JOB = "CancelJob"
48
REQ_ARCHIVE_JOB = "ArchiveJob"
49
REQ_QUERY_JOBS = "QueryJobs"
50
REQ_QUERY_INSTANCES = "QueryInstances"
51
REQ_QUERY_NODES = "QueryNodes"
52
REQ_QUERY_EXPORTS = "QueryExports"
53

    
54
DEF_CTMO = 10
55
DEF_RWTO = 60
56

    
57

    
58
class ProtocolError(Exception):
59
  """Denotes an error in the server communication"""
60

    
61

    
62
class ConnectionClosedError(ProtocolError):
63
  """Connection closed error"""
64

    
65

    
66
class TimeoutError(ProtocolError):
67
  """Operation timeout error"""
68

    
69

    
70
class EncodingError(ProtocolError):
71
  """Encoding failure on the sending side"""
72

    
73

    
74
class DecodingError(ProtocolError):
75
  """Decoding failure on the receiving side"""
76

    
77

    
78
class RequestError(ProtocolError):
79
  """Error on request
80

81
  This signifies an error in the request format or request handling,
82
  but not (e.g.) an error in starting up an instance.
83

84
  Some common conditions that can trigger this exception:
85
    - job submission failed because the job data was wrong
86
    - query failed because required fields were missing
87

88
  """
89

    
90

    
91
class NoMasterError(ProtocolError):
92
  """The master cannot be reached
93

94
  This means that the master daemon is not running or the socket has
95
  been removed.
96

97
  """
98

    
99

    
100
class Transport:
101
  """Low-level transport class.
102

103
  This is used on the client side.
104

105
  This could be replace by any other class that provides the same
106
  semantics to the Client. This means:
107
    - can send messages and receive messages
108
    - safe for multithreading
109

110
  """
111

    
112
  def __init__(self, address, timeouts=None, eom=None):
113
    """Constructor for the Client class.
114

115
    Arguments:
116
      - address: a valid address the the used transport class
117
      - timeout: a list of timeouts, to be used on connect and read/write
118
      - eom: an identifier to be used as end-of-message which the
119
        upper-layer will guarantee that this identifier will not appear
120
        in any message
121

122
    There are two timeouts used since we might want to wait for a long
123
    time for a response, but the connect timeout should be lower.
124

125
    If not passed, we use a default of 10 and respectively 60 seconds.
126

127
    Note that on reading data, since the timeout applies to an
128
    invidual receive, it might be that the total duration is longer
129
    than timeout value passed (we make a hard limit at twice the read
130
    timeout).
131

132
    """
133
    self.address = address
134
    if timeouts is None:
135
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
136
    else:
137
      self._ctimeout, self._rwtimeout = timeouts
138

    
139
    self.socket = None
140
    self._buffer = ""
141
    self._msgs = collections.deque()
142

    
143
    if eom is None:
144
      self.eom = '\3'
145
    else:
146
      self.eom = eom
147

    
148
    try:
149
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
150
      self.socket.settimeout(self._ctimeout)
151
      try:
152
        self.socket.connect(address)
153
      except socket.timeout, err:
154
        raise TimeoutError("Connect timed out: %s" % str(err))
155
      except socket.error, err:
156
        if err.args[0] == errno.ENOENT:
157
          raise NoMasterError((address,))
158
        raise
159
      self.socket.settimeout(self._rwtimeout)
160
    except (socket.error, NoMasterError):
161
      if self.socket is not None:
162
        self.socket.close()
163
      self.socket = None
164
      raise
165

    
166
  def _CheckSocket(self):
167
    """Make sure we are connected.
168

169
    """
170
    if self.socket is None:
171
      raise ProtocolError("Connection is closed")
172

    
173
  def Send(self, msg):
174
    """Send a message.
175

176
    This just sends a message and doesn't wait for the response.
177

178
    """
179
    if self.eom in msg:
180
      raise EncodingError("Message terminator found in payload")
181
    self._CheckSocket()
182
    try:
183
      self.socket.sendall(msg + self.eom)
184
    except socket.timeout, err:
185
      raise TimeoutError("Sending timeout: %s" % str(err))
186

    
187
  def Recv(self):
188
    """Try to receive a messae from the socket.
189

190
    In case we already have messages queued, we just return from the
191
    queue. Otherwise, we try to read data with a _rwtimeout network
192
    timeout, and making sure we don't go over 2x_rwtimeout as a global
193
    limit.
194

195
    """
196
    self._CheckSocket()
197
    etime = time.time() + self._rwtimeout
198
    while not self._msgs:
199
      if time.time() > etime:
200
        raise TimeoutError("Extended receive timeout")
201
      try:
202
        data = self.socket.recv(4096)
203
      except socket.timeout, err:
204
        raise TimeoutError("Receive timeout: %s" % str(err))
205
      if not data:
206
        raise ConnectionClosedError("Connection closed while reading")
207
      new_msgs = (self._buffer + data).split(self.eom)
208
      self._buffer = new_msgs.pop()
209
      self._msgs.extend(new_msgs)
210
    return self._msgs.popleft()
211

    
212
  def Call(self, msg):
213
    """Send a message and wait for the response.
214

215
    This is just a wrapper over Send and Recv.
216

217
    """
218
    self.Send(msg)
219
    return self.Recv()
220

    
221
  def Close(self):
222
    """Close the socket"""
223
    if self.socket is not None:
224
      self.socket.close()
225
      self.socket = None
226

    
227

    
228
class Client(object):
229
  """High-level client implementation.
230

231
  This uses a backing Transport-like class on top of which it
232
  implements data serialization/deserialization.
233

234
  """
235
  def __init__(self, address=None, timeouts=None, transport=Transport):
236
    """Constructor for the Client class.
237

238
    Arguments:
239
      - address: a valid address the the used transport class
240
      - timeout: a list of timeouts, to be used on connect and read/write
241
      - transport: a Transport-like class
242

243

244
    If timeout is not passed, the default timeouts of the transport
245
    class are used.
246

247
    """
248
    if address is None:
249
      address = constants.MASTER_SOCKET
250
    self.transport = transport(address, timeouts=timeouts)
251

    
252
  def CallMethod(self, method, args):
253
    """Send a generic request and return the response.
254

255
    """
256
    # Build request
257
    request = {
258
      KEY_METHOD: method,
259
      KEY_ARGS: args,
260
      }
261

    
262
    # Send request and wait for response
263
    result = self.transport.Call(serializer.DumpJson(request, indent=False))
264
    try:
265
      data = serializer.LoadJson(result)
266
    except Exception, err:
267
      raise ProtocolError("Error while deserializing response: %s" % str(err))
268

    
269
    # Validate response
270
    if (not isinstance(data, dict) or
271
        KEY_SUCCESS not in data or
272
        KEY_RESULT not in data):
273
      raise DecodingError("Invalid response from server: %s" % str(data))
274

    
275
    if not data[KEY_SUCCESS]:
276
      # TODO: decide on a standard exception
277
      raise RequestError(data[KEY_RESULT])
278

    
279
    return data[KEY_RESULT]
280

    
281
  def SubmitJob(self, ops):
282
    ops_state = map(lambda op: op.__getstate__(), ops)
283
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
284

    
285
  def CancelJob(self, job_id):
286
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
287

    
288
  def ArchiveJob(self, job_id):
289
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
290

    
291
  def QueryJobs(self, job_ids, fields):
292
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
293

    
294
  def QueryInstances(self, names, fields):
295
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
296

    
297
  def QueryNodes(self, names, fields):
298
    return self.CallMethod(REQ_QUERY_NODES, (names, fields))
299

    
300
  def QueryExports(self, nodes):
301
    return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
302

    
303
# TODO: class Server(object)