Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 38e250ba

History | View | Annotate | Download (9.4 kB)

1 c2a03789 Iustin Pop
#
2 c2a03789 Iustin Pop
#
3 c2a03789 Iustin Pop
4 c2a03789 Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 c2a03789 Iustin Pop
#
6 c2a03789 Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 c2a03789 Iustin Pop
# it under the terms of the GNU General Public License as published by
8 c2a03789 Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 c2a03789 Iustin Pop
# (at your option) any later version.
10 c2a03789 Iustin Pop
#
11 c2a03789 Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 c2a03789 Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 c2a03789 Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 c2a03789 Iustin Pop
# General Public License for more details.
15 c2a03789 Iustin Pop
#
16 c2a03789 Iustin Pop
# You should have received a copy of the GNU General Public License
17 c2a03789 Iustin Pop
# along with this program; if not, write to the Free Software
18 c2a03789 Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 c2a03789 Iustin Pop
# 02110-1301, USA.
20 c2a03789 Iustin Pop
21 c2a03789 Iustin Pop
22 c2a03789 Iustin Pop
"""Module for the unix socket protocol
23 c2a03789 Iustin Pop

24 c2a03789 Iustin Pop
This module implements the local unix socket protocl. You only need
25 c2a03789 Iustin Pop
this module and the opcodes module in the client program in order to
26 c2a03789 Iustin Pop
communicate with the master.
27 c2a03789 Iustin Pop

28 7577196d Guido Trotter
The module is also used by the master daemon.
29 c2a03789 Iustin Pop

30 c2a03789 Iustin Pop
"""
31 c2a03789 Iustin Pop
32 c2a03789 Iustin Pop
import socket
33 c2a03789 Iustin Pop
import collections
34 c2a03789 Iustin Pop
import time
35 03a8dbdc Iustin Pop
import errno
36 c2a03789 Iustin Pop
37 fad50141 Michael Hanselmann
from ganeti import serializer
38 ceab32dd Iustin Pop
from ganeti import constants
39 6797ec29 Iustin Pop
from ganeti import errors
40 c2a03789 Iustin Pop
41 c2a03789 Iustin Pop
42 3d8548c4 Michael Hanselmann
KEY_METHOD = 'method'
43 3d8548c4 Michael Hanselmann
KEY_ARGS = 'args'
44 3d8548c4 Michael Hanselmann
KEY_SUCCESS = "success"
45 3d8548c4 Michael Hanselmann
KEY_RESULT = "result"
46 3d8548c4 Michael Hanselmann
47 0bbe448c Michael Hanselmann
REQ_SUBMIT_JOB = "SubmitJob"
48 dfe57c22 Michael Hanselmann
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
49 0bbe448c Michael Hanselmann
REQ_CANCEL_JOB = "CancelJob"
50 0bbe448c Michael Hanselmann
REQ_ARCHIVE_JOB = "ArchiveJob"
51 07cd723a Iustin Pop
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
52 0bbe448c Michael Hanselmann
REQ_QUERY_JOBS = "QueryJobs"
53 ee6c7b94 Michael Hanselmann
REQ_QUERY_INSTANCES = "QueryInstances"
54 02f7fe54 Michael Hanselmann
REQ_QUERY_NODES = "QueryNodes"
55 32f93223 Michael Hanselmann
REQ_QUERY_EXPORTS = "QueryExports"
56 ae5849b5 Michael Hanselmann
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
57 3ccafd0e Iustin Pop
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
58 c2a03789 Iustin Pop
59 c2a03789 Iustin Pop
DEF_CTMO = 10
60 c2a03789 Iustin Pop
DEF_RWTO = 60
61 c2a03789 Iustin Pop
62 c2a03789 Iustin Pop
63 c2a03789 Iustin Pop
class ProtocolError(Exception):
64 c2a03789 Iustin Pop
  """Denotes an error in the server communication"""
65 c2a03789 Iustin Pop
66 c2a03789 Iustin Pop
67 c2a03789 Iustin Pop
class ConnectionClosedError(ProtocolError):
68 c2a03789 Iustin Pop
  """Connection closed error"""
69 c2a03789 Iustin Pop
70 c2a03789 Iustin Pop
71 c2a03789 Iustin Pop
class TimeoutError(ProtocolError):
72 c2a03789 Iustin Pop
  """Operation timeout error"""
73 c2a03789 Iustin Pop
74 c2a03789 Iustin Pop
75 c2a03789 Iustin Pop
class EncodingError(ProtocolError):
76 c2a03789 Iustin Pop
  """Encoding failure on the sending side"""
77 c2a03789 Iustin Pop
78 c2a03789 Iustin Pop
79 c2a03789 Iustin Pop
class DecodingError(ProtocolError):
80 c2a03789 Iustin Pop
  """Decoding failure on the receiving side"""
81 c2a03789 Iustin Pop
82 c2a03789 Iustin Pop
83 b77acb3e Iustin Pop
class RequestError(ProtocolError):
84 b77acb3e Iustin Pop
  """Error on request
85 b77acb3e Iustin Pop

86 b77acb3e Iustin Pop
  This signifies an error in the request format or request handling,
87 b77acb3e Iustin Pop
  but not (e.g.) an error in starting up an instance.
88 b77acb3e Iustin Pop

89 b77acb3e Iustin Pop
  Some common conditions that can trigger this exception:
90 b77acb3e Iustin Pop
    - job submission failed because the job data was wrong
91 b77acb3e Iustin Pop
    - query failed because required fields were missing
92 b77acb3e Iustin Pop

93 b77acb3e Iustin Pop
  """
94 b77acb3e Iustin Pop
95 3d8548c4 Michael Hanselmann
96 03a8dbdc Iustin Pop
class NoMasterError(ProtocolError):
97 03a8dbdc Iustin Pop
  """The master cannot be reached
98 03a8dbdc Iustin Pop

99 03a8dbdc Iustin Pop
  This means that the master daemon is not running or the socket has
100 03a8dbdc Iustin Pop
  been removed.
101 03a8dbdc Iustin Pop

102 03a8dbdc Iustin Pop
  """
103 03a8dbdc Iustin Pop
104 b77acb3e Iustin Pop
105 c2a03789 Iustin Pop
class Transport:
106 c2a03789 Iustin Pop
  """Low-level transport class.
107 c2a03789 Iustin Pop

108 c2a03789 Iustin Pop
  This is used on the client side.
109 c2a03789 Iustin Pop

110 c2a03789 Iustin Pop
  This could be replace by any other class that provides the same
111 c2a03789 Iustin Pop
  semantics to the Client. This means:
112 c2a03789 Iustin Pop
    - can send messages and receive messages
113 c2a03789 Iustin Pop
    - safe for multithreading
114 c2a03789 Iustin Pop

115 c2a03789 Iustin Pop
  """
116 c2a03789 Iustin Pop
117 c2a03789 Iustin Pop
  def __init__(self, address, timeouts=None, eom=None):
118 c2a03789 Iustin Pop
    """Constructor for the Client class.
119 c2a03789 Iustin Pop

120 c2a03789 Iustin Pop
    Arguments:
121 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
122 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
123 c2a03789 Iustin Pop
      - eom: an identifier to be used as end-of-message which the
124 c2a03789 Iustin Pop
        upper-layer will guarantee that this identifier will not appear
125 c2a03789 Iustin Pop
        in any message
126 c2a03789 Iustin Pop

127 c2a03789 Iustin Pop
    There are two timeouts used since we might want to wait for a long
128 c2a03789 Iustin Pop
    time for a response, but the connect timeout should be lower.
129 c2a03789 Iustin Pop

130 c2a03789 Iustin Pop
    If not passed, we use a default of 10 and respectively 60 seconds.
131 c2a03789 Iustin Pop

132 c2a03789 Iustin Pop
    Note that on reading data, since the timeout applies to an
133 c2a03789 Iustin Pop
    invidual receive, it might be that the total duration is longer
134 c2a03789 Iustin Pop
    than timeout value passed (we make a hard limit at twice the read
135 c2a03789 Iustin Pop
    timeout).
136 c2a03789 Iustin Pop

137 c2a03789 Iustin Pop
    """
138 c2a03789 Iustin Pop
    self.address = address
139 c2a03789 Iustin Pop
    if timeouts is None:
140 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
141 c2a03789 Iustin Pop
    else:
142 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = timeouts
143 c2a03789 Iustin Pop
144 c2a03789 Iustin Pop
    self.socket = None
145 c2a03789 Iustin Pop
    self._buffer = ""
146 c2a03789 Iustin Pop
    self._msgs = collections.deque()
147 c2a03789 Iustin Pop
148 c2a03789 Iustin Pop
    if eom is None:
149 c2a03789 Iustin Pop
      self.eom = '\3'
150 c2a03789 Iustin Pop
    else:
151 c2a03789 Iustin Pop
      self.eom = eom
152 c2a03789 Iustin Pop
153 c2a03789 Iustin Pop
    try:
154 c2a03789 Iustin Pop
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
155 c2a03789 Iustin Pop
      self.socket.settimeout(self._ctimeout)
156 c2a03789 Iustin Pop
      try:
157 c2a03789 Iustin Pop
        self.socket.connect(address)
158 c2a03789 Iustin Pop
      except socket.timeout, err:
159 03a8dbdc Iustin Pop
        raise TimeoutError("Connect timed out: %s" % str(err))
160 03a8dbdc Iustin Pop
      except socket.error, err:
161 082c5adb Michael Hanselmann
        if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
162 03a8dbdc Iustin Pop
          raise NoMasterError((address,))
163 03a8dbdc Iustin Pop
        raise
164 c2a03789 Iustin Pop
      self.socket.settimeout(self._rwtimeout)
165 03a8dbdc Iustin Pop
    except (socket.error, NoMasterError):
166 c2a03789 Iustin Pop
      if self.socket is not None:
167 c2a03789 Iustin Pop
        self.socket.close()
168 c2a03789 Iustin Pop
      self.socket = None
169 c2a03789 Iustin Pop
      raise
170 c2a03789 Iustin Pop
171 c2a03789 Iustin Pop
  def _CheckSocket(self):
172 c2a03789 Iustin Pop
    """Make sure we are connected.
173 c2a03789 Iustin Pop

174 c2a03789 Iustin Pop
    """
175 c2a03789 Iustin Pop
    if self.socket is None:
176 c2a03789 Iustin Pop
      raise ProtocolError("Connection is closed")
177 c2a03789 Iustin Pop
178 c2a03789 Iustin Pop
  def Send(self, msg):
179 c2a03789 Iustin Pop
    """Send a message.
180 c2a03789 Iustin Pop

181 c2a03789 Iustin Pop
    This just sends a message and doesn't wait for the response.
182 c2a03789 Iustin Pop

183 c2a03789 Iustin Pop
    """
184 c2a03789 Iustin Pop
    if self.eom in msg:
185 c2a03789 Iustin Pop
      raise EncodingError("Message terminator found in payload")
186 c2a03789 Iustin Pop
    self._CheckSocket()
187 c2a03789 Iustin Pop
    try:
188 c2a03789 Iustin Pop
      self.socket.sendall(msg + self.eom)
189 c2a03789 Iustin Pop
    except socket.timeout, err:
190 c2a03789 Iustin Pop
      raise TimeoutError("Sending timeout: %s" % str(err))
191 c2a03789 Iustin Pop
192 c2a03789 Iustin Pop
  def Recv(self):
193 c2a03789 Iustin Pop
    """Try to receive a messae from the socket.
194 c2a03789 Iustin Pop

195 c2a03789 Iustin Pop
    In case we already have messages queued, we just return from the
196 c2a03789 Iustin Pop
    queue. Otherwise, we try to read data with a _rwtimeout network
197 c2a03789 Iustin Pop
    timeout, and making sure we don't go over 2x_rwtimeout as a global
198 c2a03789 Iustin Pop
    limit.
199 c2a03789 Iustin Pop

200 c2a03789 Iustin Pop
    """
201 c2a03789 Iustin Pop
    self._CheckSocket()
202 c2a03789 Iustin Pop
    etime = time.time() + self._rwtimeout
203 c2a03789 Iustin Pop
    while not self._msgs:
204 c2a03789 Iustin Pop
      if time.time() > etime:
205 c2a03789 Iustin Pop
        raise TimeoutError("Extended receive timeout")
206 c2a03789 Iustin Pop
      try:
207 c2a03789 Iustin Pop
        data = self.socket.recv(4096)
208 c2a03789 Iustin Pop
      except socket.timeout, err:
209 c2a03789 Iustin Pop
        raise TimeoutError("Receive timeout: %s" % str(err))
210 c2a03789 Iustin Pop
      if not data:
211 c2a03789 Iustin Pop
        raise ConnectionClosedError("Connection closed while reading")
212 c2a03789 Iustin Pop
      new_msgs = (self._buffer + data).split(self.eom)
213 c2a03789 Iustin Pop
      self._buffer = new_msgs.pop()
214 c2a03789 Iustin Pop
      self._msgs.extend(new_msgs)
215 c2a03789 Iustin Pop
    return self._msgs.popleft()
216 c2a03789 Iustin Pop
217 c2a03789 Iustin Pop
  def Call(self, msg):
218 c2a03789 Iustin Pop
    """Send a message and wait for the response.
219 c2a03789 Iustin Pop

220 c2a03789 Iustin Pop
    This is just a wrapper over Send and Recv.
221 c2a03789 Iustin Pop

222 c2a03789 Iustin Pop
    """
223 c2a03789 Iustin Pop
    self.Send(msg)
224 c2a03789 Iustin Pop
    return self.Recv()
225 c2a03789 Iustin Pop
226 c2a03789 Iustin Pop
  def Close(self):
227 c2a03789 Iustin Pop
    """Close the socket"""
228 c2a03789 Iustin Pop
    if self.socket is not None:
229 c2a03789 Iustin Pop
      self.socket.close()
230 c2a03789 Iustin Pop
      self.socket = None
231 c2a03789 Iustin Pop
232 c2a03789 Iustin Pop
233 c2a03789 Iustin Pop
class Client(object):
234 c2a03789 Iustin Pop
  """High-level client implementation.
235 c2a03789 Iustin Pop

236 c2a03789 Iustin Pop
  This uses a backing Transport-like class on top of which it
237 c2a03789 Iustin Pop
  implements data serialization/deserialization.
238 c2a03789 Iustin Pop

239 c2a03789 Iustin Pop
  """
240 ceab32dd Iustin Pop
  def __init__(self, address=None, timeouts=None, transport=Transport):
241 c2a03789 Iustin Pop
    """Constructor for the Client class.
242 c2a03789 Iustin Pop

243 c2a03789 Iustin Pop
    Arguments:
244 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
245 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
246 c2a03789 Iustin Pop
      - transport: a Transport-like class
247 c2a03789 Iustin Pop

248 c2a03789 Iustin Pop

249 c2a03789 Iustin Pop
    If timeout is not passed, the default timeouts of the transport
250 c2a03789 Iustin Pop
    class are used.
251 c2a03789 Iustin Pop

252 c2a03789 Iustin Pop
    """
253 ceab32dd Iustin Pop
    if address is None:
254 ceab32dd Iustin Pop
      address = constants.MASTER_SOCKET
255 c2a03789 Iustin Pop
    self.transport = transport(address, timeouts=timeouts)
256 c2a03789 Iustin Pop
257 3d8548c4 Michael Hanselmann
  def CallMethod(self, method, args):
258 c2a03789 Iustin Pop
    """Send a generic request and return the response.
259 c2a03789 Iustin Pop

260 c2a03789 Iustin Pop
    """
261 3d8548c4 Michael Hanselmann
    # Build request
262 3d8548c4 Michael Hanselmann
    request = {
263 3d8548c4 Michael Hanselmann
      KEY_METHOD: method,
264 3d8548c4 Michael Hanselmann
      KEY_ARGS: args,
265 3d8548c4 Michael Hanselmann
      }
266 3d8548c4 Michael Hanselmann
267 3d8548c4 Michael Hanselmann
    # Send request and wait for response
268 3d8548c4 Michael Hanselmann
    result = self.transport.Call(serializer.DumpJson(request, indent=False))
269 c2a03789 Iustin Pop
    try:
270 fad50141 Michael Hanselmann
      data = serializer.LoadJson(result)
271 c2a03789 Iustin Pop
    except Exception, err:
272 c2a03789 Iustin Pop
      raise ProtocolError("Error while deserializing response: %s" % str(err))
273 3d8548c4 Michael Hanselmann
274 3d8548c4 Michael Hanselmann
    # Validate response
275 a14a17fc Iustin Pop
    if (not isinstance(data, dict) or
276 3d8548c4 Michael Hanselmann
        KEY_SUCCESS not in data or
277 3d8548c4 Michael Hanselmann
        KEY_RESULT not in data):
278 a14a17fc Iustin Pop
      raise DecodingError("Invalid response from server: %s" % str(data))
279 3d8548c4 Michael Hanselmann
280 6797ec29 Iustin Pop
    result = data[KEY_RESULT]
281 6797ec29 Iustin Pop
282 3d8548c4 Michael Hanselmann
    if not data[KEY_SUCCESS]:
283 3d8548c4 Michael Hanselmann
      # TODO: decide on a standard exception
284 6797ec29 Iustin Pop
      if (isinstance(result, (tuple, list)) and len(result) == 2 and
285 6797ec29 Iustin Pop
          isinstance(result[1], (tuple, list))):
286 6797ec29 Iustin Pop
        # custom ganeti errors
287 6797ec29 Iustin Pop
        err_class = errors.GetErrorClass(result[0])
288 6797ec29 Iustin Pop
        if err_class is not None:
289 6797ec29 Iustin Pop
          raise err_class, tuple(result[1])
290 6797ec29 Iustin Pop
291 6797ec29 Iustin Pop
      raise RequestError(result)
292 3d8548c4 Michael Hanselmann
293 6797ec29 Iustin Pop
    return result
294 c2a03789 Iustin Pop
295 3ccafd0e Iustin Pop
  def SetQueueDrainFlag(self, drain_flag):
296 3ccafd0e Iustin Pop
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
297 3ccafd0e Iustin Pop
298 0bbe448c Michael Hanselmann
  def SubmitJob(self, ops):
299 0bbe448c Michael Hanselmann
    ops_state = map(lambda op: op.__getstate__(), ops)
300 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
301 0bbe448c Michael Hanselmann
302 0bbe448c Michael Hanselmann
  def CancelJob(self, job_id):
303 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
304 0bbe448c Michael Hanselmann
305 0bbe448c Michael Hanselmann
  def ArchiveJob(self, job_id):
306 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
307 0bbe448c Michael Hanselmann
308 07cd723a Iustin Pop
  def AutoArchiveJobs(self, age):
309 f8ad5591 Michael Hanselmann
    timeout = (DEF_RWTO - 1) / 2
310 f8ad5591 Michael Hanselmann
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
311 07cd723a Iustin Pop
312 6c5a7090 Michael Hanselmann
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
313 5c735209 Iustin Pop
    timeout = (DEF_RWTO - 1) / 2
314 5c735209 Iustin Pop
    while True:
315 5c735209 Iustin Pop
      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
316 5c735209 Iustin Pop
                               (job_id, fields, prev_job_info,
317 5c735209 Iustin Pop
                                prev_log_serial, timeout))
318 5c735209 Iustin Pop
      if result != constants.JOB_NOTCHANGED:
319 5c735209 Iustin Pop
        break
320 5c735209 Iustin Pop
    return result
321 dfe57c22 Michael Hanselmann
322 0bbe448c Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
323 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
324 3d8548c4 Michael Hanselmann
325 ee6c7b94 Michael Hanselmann
  def QueryInstances(self, names, fields):
326 ee6c7b94 Michael Hanselmann
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
327 ee6c7b94 Michael Hanselmann
328 02f7fe54 Michael Hanselmann
  def QueryNodes(self, names, fields):
329 02f7fe54 Michael Hanselmann
    return self.CallMethod(REQ_QUERY_NODES, (names, fields))
330 02f7fe54 Michael Hanselmann
331 32f93223 Michael Hanselmann
  def QueryExports(self, nodes):
332 32f93223 Michael Hanselmann
    return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
333 32f93223 Michael Hanselmann
334 ae5849b5 Michael Hanselmann
  def QueryConfigValues(self, fields):
335 ae5849b5 Michael Hanselmann
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
336 ae5849b5 Michael Hanselmann
337 3ccafd0e Iustin Pop
338 3d8548c4 Michael Hanselmann
# TODO: class Server(object)