Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 3d8548c4

History | View | Annotate | Download (8.3 kB)

1 c2a03789 Iustin Pop
#
2 c2a03789 Iustin Pop
#
3 c2a03789 Iustin Pop
4 c2a03789 Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 c2a03789 Iustin Pop
#
6 c2a03789 Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 c2a03789 Iustin Pop
# it under the terms of the GNU General Public License as published by
8 c2a03789 Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 c2a03789 Iustin Pop
# (at your option) any later version.
10 c2a03789 Iustin Pop
#
11 c2a03789 Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 c2a03789 Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 c2a03789 Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 c2a03789 Iustin Pop
# General Public License for more details.
15 c2a03789 Iustin Pop
#
16 c2a03789 Iustin Pop
# You should have received a copy of the GNU General Public License
17 c2a03789 Iustin Pop
# along with this program; if not, write to the Free Software
18 c2a03789 Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 c2a03789 Iustin Pop
# 02110-1301, USA.
20 c2a03789 Iustin Pop
21 c2a03789 Iustin Pop
22 c2a03789 Iustin Pop
"""Module for the unix socket protocol
23 c2a03789 Iustin Pop

24 c2a03789 Iustin Pop
This module implements the local unix socket protocl. You only need
25 c2a03789 Iustin Pop
this module and the opcodes module in the client program in order to
26 c2a03789 Iustin Pop
communicate with the master.
27 c2a03789 Iustin Pop

28 c2a03789 Iustin Pop
The module is also be used by the master daemon.
29 c2a03789 Iustin Pop

30 c2a03789 Iustin Pop
"""
31 c2a03789 Iustin Pop
32 c2a03789 Iustin Pop
import socket
33 c2a03789 Iustin Pop
import collections
34 c2a03789 Iustin Pop
import simplejson
35 c2a03789 Iustin Pop
import time
36 03a8dbdc Iustin Pop
import errno
37 c2a03789 Iustin Pop
38 c2a03789 Iustin Pop
from ganeti import opcodes
39 fad50141 Michael Hanselmann
from ganeti import serializer
40 ceab32dd Iustin Pop
from ganeti import constants
41 c2a03789 Iustin Pop
42 c2a03789 Iustin Pop
43 3d8548c4 Michael Hanselmann
KEY_METHOD = 'method'
44 3d8548c4 Michael Hanselmann
KEY_ARGS = 'args'
45 3d8548c4 Michael Hanselmann
KEY_SUCCESS = "success"
46 3d8548c4 Michael Hanselmann
KEY_RESULT = "result"
47 3d8548c4 Michael Hanselmann
48 c2a03789 Iustin Pop
REQ_SUBMIT = 'submit'
49 c2a03789 Iustin Pop
REQ_ABORT = 'abort'
50 c2a03789 Iustin Pop
REQ_QUERY = 'query'
51 c2a03789 Iustin Pop
52 c2a03789 Iustin Pop
DEF_CTMO = 10
53 c2a03789 Iustin Pop
DEF_RWTO = 60
54 c2a03789 Iustin Pop
55 c2a03789 Iustin Pop
56 c2a03789 Iustin Pop
class ProtocolError(Exception):
57 c2a03789 Iustin Pop
  """Denotes an error in the server communication"""
58 c2a03789 Iustin Pop
59 c2a03789 Iustin Pop
60 c2a03789 Iustin Pop
class ConnectionClosedError(ProtocolError):
61 c2a03789 Iustin Pop
  """Connection closed error"""
62 c2a03789 Iustin Pop
63 c2a03789 Iustin Pop
64 c2a03789 Iustin Pop
class TimeoutError(ProtocolError):
65 c2a03789 Iustin Pop
  """Operation timeout error"""
66 c2a03789 Iustin Pop
67 c2a03789 Iustin Pop
68 c2a03789 Iustin Pop
class EncodingError(ProtocolError):
69 c2a03789 Iustin Pop
  """Encoding failure on the sending side"""
70 c2a03789 Iustin Pop
71 c2a03789 Iustin Pop
72 c2a03789 Iustin Pop
class DecodingError(ProtocolError):
73 c2a03789 Iustin Pop
  """Decoding failure on the receiving side"""
74 c2a03789 Iustin Pop
75 c2a03789 Iustin Pop
76 b77acb3e Iustin Pop
class RequestError(ProtocolError):
77 b77acb3e Iustin Pop
  """Error on request
78 b77acb3e Iustin Pop

79 b77acb3e Iustin Pop
  This signifies an error in the request format or request handling,
80 b77acb3e Iustin Pop
  but not (e.g.) an error in starting up an instance.
81 b77acb3e Iustin Pop

82 b77acb3e Iustin Pop
  Some common conditions that can trigger this exception:
83 b77acb3e Iustin Pop
    - job submission failed because the job data was wrong
84 b77acb3e Iustin Pop
    - query failed because required fields were missing
85 b77acb3e Iustin Pop

86 b77acb3e Iustin Pop
  """
87 b77acb3e Iustin Pop
88 3d8548c4 Michael Hanselmann
89 03a8dbdc Iustin Pop
class NoMasterError(ProtocolError):
90 03a8dbdc Iustin Pop
  """The master cannot be reached
91 03a8dbdc Iustin Pop

92 03a8dbdc Iustin Pop
  This means that the master daemon is not running or the socket has
93 03a8dbdc Iustin Pop
  been removed.
94 03a8dbdc Iustin Pop

95 03a8dbdc Iustin Pop
  """
96 03a8dbdc Iustin Pop
97 b77acb3e Iustin Pop
98 c2a03789 Iustin Pop
def SerializeJob(job):
99 c2a03789 Iustin Pop
  """Convert a job description to a string format.
100 c2a03789 Iustin Pop

101 c2a03789 Iustin Pop
  """
102 c2a03789 Iustin Pop
  return simplejson.dumps(job.__getstate__())
103 c2a03789 Iustin Pop
104 c2a03789 Iustin Pop
105 c2a03789 Iustin Pop
def UnserializeJob(data):
106 c2a03789 Iustin Pop
  """Load a job from a string format"""
107 c2a03789 Iustin Pop
  try:
108 c2a03789 Iustin Pop
    new_data = simplejson.loads(data)
109 c2a03789 Iustin Pop
  except Exception, err:
110 c2a03789 Iustin Pop
    raise DecodingError("Error while unserializing: %s" % str(err))
111 c2a03789 Iustin Pop
  job = opcodes.Job()
112 c2a03789 Iustin Pop
  job.__setstate__(new_data)
113 c2a03789 Iustin Pop
  return job
114 c2a03789 Iustin Pop
115 c2a03789 Iustin Pop
116 c2a03789 Iustin Pop
class Transport:
117 c2a03789 Iustin Pop
  """Low-level transport class.
118 c2a03789 Iustin Pop

119 c2a03789 Iustin Pop
  This is used on the client side.
120 c2a03789 Iustin Pop

121 c2a03789 Iustin Pop
  This could be replace by any other class that provides the same
122 c2a03789 Iustin Pop
  semantics to the Client. This means:
123 c2a03789 Iustin Pop
    - can send messages and receive messages
124 c2a03789 Iustin Pop
    - safe for multithreading
125 c2a03789 Iustin Pop

126 c2a03789 Iustin Pop
  """
127 c2a03789 Iustin Pop
128 c2a03789 Iustin Pop
  def __init__(self, address, timeouts=None, eom=None):
129 c2a03789 Iustin Pop
    """Constructor for the Client class.
130 c2a03789 Iustin Pop

131 c2a03789 Iustin Pop
    Arguments:
132 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
133 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
134 c2a03789 Iustin Pop
      - eom: an identifier to be used as end-of-message which the
135 c2a03789 Iustin Pop
        upper-layer will guarantee that this identifier will not appear
136 c2a03789 Iustin Pop
        in any message
137 c2a03789 Iustin Pop

138 c2a03789 Iustin Pop
    There are two timeouts used since we might want to wait for a long
139 c2a03789 Iustin Pop
    time for a response, but the connect timeout should be lower.
140 c2a03789 Iustin Pop

141 c2a03789 Iustin Pop
    If not passed, we use a default of 10 and respectively 60 seconds.
142 c2a03789 Iustin Pop

143 c2a03789 Iustin Pop
    Note that on reading data, since the timeout applies to an
144 c2a03789 Iustin Pop
    invidual receive, it might be that the total duration is longer
145 c2a03789 Iustin Pop
    than timeout value passed (we make a hard limit at twice the read
146 c2a03789 Iustin Pop
    timeout).
147 c2a03789 Iustin Pop

148 c2a03789 Iustin Pop
    """
149 c2a03789 Iustin Pop
    self.address = address
150 c2a03789 Iustin Pop
    if timeouts is None:
151 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
152 c2a03789 Iustin Pop
    else:
153 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = timeouts
154 c2a03789 Iustin Pop
155 c2a03789 Iustin Pop
    self.socket = None
156 c2a03789 Iustin Pop
    self._buffer = ""
157 c2a03789 Iustin Pop
    self._msgs = collections.deque()
158 c2a03789 Iustin Pop
159 c2a03789 Iustin Pop
    if eom is None:
160 c2a03789 Iustin Pop
      self.eom = '\3'
161 c2a03789 Iustin Pop
    else:
162 c2a03789 Iustin Pop
      self.eom = eom
163 c2a03789 Iustin Pop
164 c2a03789 Iustin Pop
    try:
165 c2a03789 Iustin Pop
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
166 c2a03789 Iustin Pop
      self.socket.settimeout(self._ctimeout)
167 c2a03789 Iustin Pop
      try:
168 c2a03789 Iustin Pop
        self.socket.connect(address)
169 c2a03789 Iustin Pop
      except socket.timeout, err:
170 03a8dbdc Iustin Pop
        raise TimeoutError("Connect timed out: %s" % str(err))
171 03a8dbdc Iustin Pop
      except socket.error, err:
172 03a8dbdc Iustin Pop
        if err.args[0] == errno.ENOENT:
173 03a8dbdc Iustin Pop
          raise NoMasterError((address,))
174 03a8dbdc Iustin Pop
        raise
175 c2a03789 Iustin Pop
      self.socket.settimeout(self._rwtimeout)
176 03a8dbdc Iustin Pop
    except (socket.error, NoMasterError):
177 c2a03789 Iustin Pop
      if self.socket is not None:
178 c2a03789 Iustin Pop
        self.socket.close()
179 c2a03789 Iustin Pop
      self.socket = None
180 c2a03789 Iustin Pop
      raise
181 c2a03789 Iustin Pop
182 c2a03789 Iustin Pop
  def _CheckSocket(self):
183 c2a03789 Iustin Pop
    """Make sure we are connected.
184 c2a03789 Iustin Pop

185 c2a03789 Iustin Pop
    """
186 c2a03789 Iustin Pop
    if self.socket is None:
187 c2a03789 Iustin Pop
      raise ProtocolError("Connection is closed")
188 c2a03789 Iustin Pop
189 c2a03789 Iustin Pop
  def Send(self, msg):
190 c2a03789 Iustin Pop
    """Send a message.
191 c2a03789 Iustin Pop

192 c2a03789 Iustin Pop
    This just sends a message and doesn't wait for the response.
193 c2a03789 Iustin Pop

194 c2a03789 Iustin Pop
    """
195 c2a03789 Iustin Pop
    if self.eom in msg:
196 c2a03789 Iustin Pop
      raise EncodingError("Message terminator found in payload")
197 c2a03789 Iustin Pop
    self._CheckSocket()
198 c2a03789 Iustin Pop
    try:
199 c2a03789 Iustin Pop
      self.socket.sendall(msg + self.eom)
200 c2a03789 Iustin Pop
    except socket.timeout, err:
201 c2a03789 Iustin Pop
      raise TimeoutError("Sending timeout: %s" % str(err))
202 c2a03789 Iustin Pop
203 c2a03789 Iustin Pop
  def Recv(self):
204 c2a03789 Iustin Pop
    """Try to receive a messae from the socket.
205 c2a03789 Iustin Pop

206 c2a03789 Iustin Pop
    In case we already have messages queued, we just return from the
207 c2a03789 Iustin Pop
    queue. Otherwise, we try to read data with a _rwtimeout network
208 c2a03789 Iustin Pop
    timeout, and making sure we don't go over 2x_rwtimeout as a global
209 c2a03789 Iustin Pop
    limit.
210 c2a03789 Iustin Pop

211 c2a03789 Iustin Pop
    """
212 c2a03789 Iustin Pop
    self._CheckSocket()
213 c2a03789 Iustin Pop
    etime = time.time() + self._rwtimeout
214 c2a03789 Iustin Pop
    while not self._msgs:
215 c2a03789 Iustin Pop
      if time.time() > etime:
216 c2a03789 Iustin Pop
        raise TimeoutError("Extended receive timeout")
217 c2a03789 Iustin Pop
      try:
218 c2a03789 Iustin Pop
        data = self.socket.recv(4096)
219 c2a03789 Iustin Pop
      except socket.timeout, err:
220 c2a03789 Iustin Pop
        raise TimeoutError("Receive timeout: %s" % str(err))
221 c2a03789 Iustin Pop
      if not data:
222 c2a03789 Iustin Pop
        raise ConnectionClosedError("Connection closed while reading")
223 c2a03789 Iustin Pop
      new_msgs = (self._buffer + data).split(self.eom)
224 c2a03789 Iustin Pop
      self._buffer = new_msgs.pop()
225 c2a03789 Iustin Pop
      self._msgs.extend(new_msgs)
226 c2a03789 Iustin Pop
    return self._msgs.popleft()
227 c2a03789 Iustin Pop
228 c2a03789 Iustin Pop
  def Call(self, msg):
229 c2a03789 Iustin Pop
    """Send a message and wait for the response.
230 c2a03789 Iustin Pop

231 c2a03789 Iustin Pop
    This is just a wrapper over Send and Recv.
232 c2a03789 Iustin Pop

233 c2a03789 Iustin Pop
    """
234 c2a03789 Iustin Pop
    self.Send(msg)
235 c2a03789 Iustin Pop
    return self.Recv()
236 c2a03789 Iustin Pop
237 c2a03789 Iustin Pop
  def Close(self):
238 c2a03789 Iustin Pop
    """Close the socket"""
239 c2a03789 Iustin Pop
    if self.socket is not None:
240 c2a03789 Iustin Pop
      self.socket.close()
241 c2a03789 Iustin Pop
      self.socket = None
242 c2a03789 Iustin Pop
243 c2a03789 Iustin Pop
244 c2a03789 Iustin Pop
class Client(object):
245 c2a03789 Iustin Pop
  """High-level client implementation.
246 c2a03789 Iustin Pop

247 c2a03789 Iustin Pop
  This uses a backing Transport-like class on top of which it
248 c2a03789 Iustin Pop
  implements data serialization/deserialization.
249 c2a03789 Iustin Pop

250 c2a03789 Iustin Pop
  """
251 ceab32dd Iustin Pop
  def __init__(self, address=None, timeouts=None, transport=Transport):
252 c2a03789 Iustin Pop
    """Constructor for the Client class.
253 c2a03789 Iustin Pop

254 c2a03789 Iustin Pop
    Arguments:
255 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
256 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
257 c2a03789 Iustin Pop
      - transport: a Transport-like class
258 c2a03789 Iustin Pop

259 c2a03789 Iustin Pop

260 c2a03789 Iustin Pop
    If timeout is not passed, the default timeouts of the transport
261 c2a03789 Iustin Pop
    class are used.
262 c2a03789 Iustin Pop

263 c2a03789 Iustin Pop
    """
264 ceab32dd Iustin Pop
    if address is None:
265 ceab32dd Iustin Pop
      address = constants.MASTER_SOCKET
266 c2a03789 Iustin Pop
    self.transport = transport(address, timeouts=timeouts)
267 c2a03789 Iustin Pop
268 3d8548c4 Michael Hanselmann
  def CallMethod(self, method, args):
269 c2a03789 Iustin Pop
    """Send a generic request and return the response.
270 c2a03789 Iustin Pop

271 c2a03789 Iustin Pop
    """
272 3d8548c4 Michael Hanselmann
    # Build request
273 3d8548c4 Michael Hanselmann
    request = {
274 3d8548c4 Michael Hanselmann
      KEY_METHOD: method,
275 3d8548c4 Michael Hanselmann
      KEY_ARGS: args,
276 3d8548c4 Michael Hanselmann
      }
277 3d8548c4 Michael Hanselmann
278 3d8548c4 Michael Hanselmann
    # Send request and wait for response
279 3d8548c4 Michael Hanselmann
    result = self.transport.Call(serializer.DumpJson(request, indent=False))
280 c2a03789 Iustin Pop
    try:
281 fad50141 Michael Hanselmann
      data = serializer.LoadJson(result)
282 c2a03789 Iustin Pop
    except Exception, err:
283 c2a03789 Iustin Pop
      raise ProtocolError("Error while deserializing response: %s" % str(err))
284 3d8548c4 Michael Hanselmann
285 3d8548c4 Michael Hanselmann
    # Validate response
286 a14a17fc Iustin Pop
    if (not isinstance(data, dict) or
287 3d8548c4 Michael Hanselmann
        KEY_SUCCESS not in data or
288 3d8548c4 Michael Hanselmann
        KEY_RESULT not in data):
289 a14a17fc Iustin Pop
      raise DecodingError("Invalid response from server: %s" % str(data))
290 3d8548c4 Michael Hanselmann
291 3d8548c4 Michael Hanselmann
    if not data[KEY_SUCCESS]:
292 3d8548c4 Michael Hanselmann
      # TODO: decide on a standard exception
293 3d8548c4 Michael Hanselmann
      raise RequestError(data[KEY_RESULT])
294 3d8548c4 Michael Hanselmann
295 3d8548c4 Michael Hanselmann
    return data[KEY_RESULT]
296 c2a03789 Iustin Pop
297 c2a03789 Iustin Pop
  def SubmitJob(self, job):
298 c2a03789 Iustin Pop
    """Submit a job"""
299 3d8548c4 Michael Hanselmann
    return self.CallMethod(REQ_SUBMIT, SerializeJob(job))
300 c2a03789 Iustin Pop
301 c2a03789 Iustin Pop
  def Query(self, data):
302 c2a03789 Iustin Pop
    """Make a query"""
303 3d8548c4 Michael Hanselmann
    result = self.CallMethod(REQ_QUERY, data)
304 b77acb3e Iustin Pop
    if data["object"] == "jobs":
305 b77acb3e Iustin Pop
      # custom job processing of query values
306 b77acb3e Iustin Pop
      for row in result:
307 b77acb3e Iustin Pop
        for idx, field in enumerate(data["fields"]):
308 b77acb3e Iustin Pop
          if field == "op_list":
309 b77acb3e Iustin Pop
            row[idx] = [opcodes.OpCode.LoadOpCode(i) for i in row[idx]]
310 b77acb3e Iustin Pop
    return result
311 3d8548c4 Michael Hanselmann
312 3d8548c4 Michael Hanselmann
# TODO: class Server(object)