Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 6605411d

History | View | Annotate | Download (8.9 kB)

1 c2a03789 Iustin Pop
#
2 c2a03789 Iustin Pop
#
3 c2a03789 Iustin Pop
4 c2a03789 Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 c2a03789 Iustin Pop
#
6 c2a03789 Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 c2a03789 Iustin Pop
# it under the terms of the GNU General Public License as published by
8 c2a03789 Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 c2a03789 Iustin Pop
# (at your option) any later version.
10 c2a03789 Iustin Pop
#
11 c2a03789 Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 c2a03789 Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 c2a03789 Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 c2a03789 Iustin Pop
# General Public License for more details.
15 c2a03789 Iustin Pop
#
16 c2a03789 Iustin Pop
# You should have received a copy of the GNU General Public License
17 c2a03789 Iustin Pop
# along with this program; if not, write to the Free Software
18 c2a03789 Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 c2a03789 Iustin Pop
# 02110-1301, USA.
20 c2a03789 Iustin Pop
21 c2a03789 Iustin Pop
22 c2a03789 Iustin Pop
"""Module for the unix socket protocol
23 c2a03789 Iustin Pop

24 c2a03789 Iustin Pop
This module implements the local unix socket protocl. You only need
25 c2a03789 Iustin Pop
this module and the opcodes module in the client program in order to
26 c2a03789 Iustin Pop
communicate with the master.
27 c2a03789 Iustin Pop

28 c2a03789 Iustin Pop
The module is also be used by the master daemon.
29 c2a03789 Iustin Pop

30 c2a03789 Iustin Pop
"""
31 c2a03789 Iustin Pop
32 c2a03789 Iustin Pop
import socket
33 c2a03789 Iustin Pop
import collections
34 c2a03789 Iustin Pop
import time
35 03a8dbdc Iustin Pop
import errno
36 c2a03789 Iustin Pop
37 fad50141 Michael Hanselmann
from ganeti import serializer
38 ceab32dd Iustin Pop
from ganeti import constants
39 c2a03789 Iustin Pop
40 c2a03789 Iustin Pop
41 3d8548c4 Michael Hanselmann
KEY_METHOD = 'method'
42 3d8548c4 Michael Hanselmann
KEY_ARGS = 'args'
43 3d8548c4 Michael Hanselmann
KEY_SUCCESS = "success"
44 3d8548c4 Michael Hanselmann
KEY_RESULT = "result"
45 3d8548c4 Michael Hanselmann
46 0bbe448c Michael Hanselmann
REQ_SUBMIT_JOB = "SubmitJob"
47 dfe57c22 Michael Hanselmann
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
48 0bbe448c Michael Hanselmann
REQ_CANCEL_JOB = "CancelJob"
49 0bbe448c Michael Hanselmann
REQ_ARCHIVE_JOB = "ArchiveJob"
50 07cd723a Iustin Pop
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
51 0bbe448c Michael Hanselmann
REQ_QUERY_JOBS = "QueryJobs"
52 ee6c7b94 Michael Hanselmann
REQ_QUERY_INSTANCES = "QueryInstances"
53 02f7fe54 Michael Hanselmann
REQ_QUERY_NODES = "QueryNodes"
54 32f93223 Michael Hanselmann
REQ_QUERY_EXPORTS = "QueryExports"
55 ae5849b5 Michael Hanselmann
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
56 c2a03789 Iustin Pop
57 c2a03789 Iustin Pop
DEF_CTMO = 10
58 c2a03789 Iustin Pop
DEF_RWTO = 60
59 c2a03789 Iustin Pop
60 c2a03789 Iustin Pop
61 c2a03789 Iustin Pop
class ProtocolError(Exception):
62 c2a03789 Iustin Pop
  """Denotes an error in the server communication"""
63 c2a03789 Iustin Pop
64 c2a03789 Iustin Pop
65 c2a03789 Iustin Pop
class ConnectionClosedError(ProtocolError):
66 c2a03789 Iustin Pop
  """Connection closed error"""
67 c2a03789 Iustin Pop
68 c2a03789 Iustin Pop
69 c2a03789 Iustin Pop
class TimeoutError(ProtocolError):
70 c2a03789 Iustin Pop
  """Operation timeout error"""
71 c2a03789 Iustin Pop
72 c2a03789 Iustin Pop
73 c2a03789 Iustin Pop
class EncodingError(ProtocolError):
74 c2a03789 Iustin Pop
  """Encoding failure on the sending side"""
75 c2a03789 Iustin Pop
76 c2a03789 Iustin Pop
77 c2a03789 Iustin Pop
class DecodingError(ProtocolError):
78 c2a03789 Iustin Pop
  """Decoding failure on the receiving side"""
79 c2a03789 Iustin Pop
80 c2a03789 Iustin Pop
81 b77acb3e Iustin Pop
class RequestError(ProtocolError):
82 b77acb3e Iustin Pop
  """Error on request
83 b77acb3e Iustin Pop

84 b77acb3e Iustin Pop
  This signifies an error in the request format or request handling,
85 b77acb3e Iustin Pop
  but not (e.g.) an error in starting up an instance.
86 b77acb3e Iustin Pop

87 b77acb3e Iustin Pop
  Some common conditions that can trigger this exception:
88 b77acb3e Iustin Pop
    - job submission failed because the job data was wrong
89 b77acb3e Iustin Pop
    - query failed because required fields were missing
90 b77acb3e Iustin Pop

91 b77acb3e Iustin Pop
  """
92 b77acb3e Iustin Pop
93 3d8548c4 Michael Hanselmann
94 03a8dbdc Iustin Pop
class NoMasterError(ProtocolError):
95 03a8dbdc Iustin Pop
  """The master cannot be reached
96 03a8dbdc Iustin Pop

97 03a8dbdc Iustin Pop
  This means that the master daemon is not running or the socket has
98 03a8dbdc Iustin Pop
  been removed.
99 03a8dbdc Iustin Pop

100 03a8dbdc Iustin Pop
  """
101 03a8dbdc Iustin Pop
102 b77acb3e Iustin Pop
103 c2a03789 Iustin Pop
class Transport:
104 c2a03789 Iustin Pop
  """Low-level transport class.
105 c2a03789 Iustin Pop

106 c2a03789 Iustin Pop
  This is used on the client side.
107 c2a03789 Iustin Pop

108 c2a03789 Iustin Pop
  This could be replace by any other class that provides the same
109 c2a03789 Iustin Pop
  semantics to the Client. This means:
110 c2a03789 Iustin Pop
    - can send messages and receive messages
111 c2a03789 Iustin Pop
    - safe for multithreading
112 c2a03789 Iustin Pop

113 c2a03789 Iustin Pop
  """
114 c2a03789 Iustin Pop
115 c2a03789 Iustin Pop
  def __init__(self, address, timeouts=None, eom=None):
116 c2a03789 Iustin Pop
    """Constructor for the Client class.
117 c2a03789 Iustin Pop

118 c2a03789 Iustin Pop
    Arguments:
119 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
120 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
121 c2a03789 Iustin Pop
      - eom: an identifier to be used as end-of-message which the
122 c2a03789 Iustin Pop
        upper-layer will guarantee that this identifier will not appear
123 c2a03789 Iustin Pop
        in any message
124 c2a03789 Iustin Pop

125 c2a03789 Iustin Pop
    There are two timeouts used since we might want to wait for a long
126 c2a03789 Iustin Pop
    time for a response, but the connect timeout should be lower.
127 c2a03789 Iustin Pop

128 c2a03789 Iustin Pop
    If not passed, we use a default of 10 and respectively 60 seconds.
129 c2a03789 Iustin Pop

130 c2a03789 Iustin Pop
    Note that on reading data, since the timeout applies to an
131 c2a03789 Iustin Pop
    invidual receive, it might be that the total duration is longer
132 c2a03789 Iustin Pop
    than timeout value passed (we make a hard limit at twice the read
133 c2a03789 Iustin Pop
    timeout).
134 c2a03789 Iustin Pop

135 c2a03789 Iustin Pop
    """
136 c2a03789 Iustin Pop
    self.address = address
137 c2a03789 Iustin Pop
    if timeouts is None:
138 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
139 c2a03789 Iustin Pop
    else:
140 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = timeouts
141 c2a03789 Iustin Pop
142 c2a03789 Iustin Pop
    self.socket = None
143 c2a03789 Iustin Pop
    self._buffer = ""
144 c2a03789 Iustin Pop
    self._msgs = collections.deque()
145 c2a03789 Iustin Pop
146 c2a03789 Iustin Pop
    if eom is None:
147 c2a03789 Iustin Pop
      self.eom = '\3'
148 c2a03789 Iustin Pop
    else:
149 c2a03789 Iustin Pop
      self.eom = eom
150 c2a03789 Iustin Pop
151 c2a03789 Iustin Pop
    try:
152 c2a03789 Iustin Pop
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
153 c2a03789 Iustin Pop
      self.socket.settimeout(self._ctimeout)
154 c2a03789 Iustin Pop
      try:
155 c2a03789 Iustin Pop
        self.socket.connect(address)
156 c2a03789 Iustin Pop
      except socket.timeout, err:
157 03a8dbdc Iustin Pop
        raise TimeoutError("Connect timed out: %s" % str(err))
158 03a8dbdc Iustin Pop
      except socket.error, err:
159 082c5adb Michael Hanselmann
        if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
160 03a8dbdc Iustin Pop
          raise NoMasterError((address,))
161 03a8dbdc Iustin Pop
        raise
162 c2a03789 Iustin Pop
      self.socket.settimeout(self._rwtimeout)
163 03a8dbdc Iustin Pop
    except (socket.error, NoMasterError):
164 c2a03789 Iustin Pop
      if self.socket is not None:
165 c2a03789 Iustin Pop
        self.socket.close()
166 c2a03789 Iustin Pop
      self.socket = None
167 c2a03789 Iustin Pop
      raise
168 c2a03789 Iustin Pop
169 c2a03789 Iustin Pop
  def _CheckSocket(self):
170 c2a03789 Iustin Pop
    """Make sure we are connected.
171 c2a03789 Iustin Pop

172 c2a03789 Iustin Pop
    """
173 c2a03789 Iustin Pop
    if self.socket is None:
174 c2a03789 Iustin Pop
      raise ProtocolError("Connection is closed")
175 c2a03789 Iustin Pop
176 c2a03789 Iustin Pop
  def Send(self, msg):
177 c2a03789 Iustin Pop
    """Send a message.
178 c2a03789 Iustin Pop

179 c2a03789 Iustin Pop
    This just sends a message and doesn't wait for the response.
180 c2a03789 Iustin Pop

181 c2a03789 Iustin Pop
    """
182 c2a03789 Iustin Pop
    if self.eom in msg:
183 c2a03789 Iustin Pop
      raise EncodingError("Message terminator found in payload")
184 c2a03789 Iustin Pop
    self._CheckSocket()
185 c2a03789 Iustin Pop
    try:
186 c2a03789 Iustin Pop
      self.socket.sendall(msg + self.eom)
187 c2a03789 Iustin Pop
    except socket.timeout, err:
188 c2a03789 Iustin Pop
      raise TimeoutError("Sending timeout: %s" % str(err))
189 c2a03789 Iustin Pop
190 c2a03789 Iustin Pop
  def Recv(self):
191 c2a03789 Iustin Pop
    """Try to receive a messae from the socket.
192 c2a03789 Iustin Pop

193 c2a03789 Iustin Pop
    In case we already have messages queued, we just return from the
194 c2a03789 Iustin Pop
    queue. Otherwise, we try to read data with a _rwtimeout network
195 c2a03789 Iustin Pop
    timeout, and making sure we don't go over 2x_rwtimeout as a global
196 c2a03789 Iustin Pop
    limit.
197 c2a03789 Iustin Pop

198 c2a03789 Iustin Pop
    """
199 c2a03789 Iustin Pop
    self._CheckSocket()
200 c2a03789 Iustin Pop
    etime = time.time() + self._rwtimeout
201 c2a03789 Iustin Pop
    while not self._msgs:
202 c2a03789 Iustin Pop
      if time.time() > etime:
203 c2a03789 Iustin Pop
        raise TimeoutError("Extended receive timeout")
204 c2a03789 Iustin Pop
      try:
205 c2a03789 Iustin Pop
        data = self.socket.recv(4096)
206 c2a03789 Iustin Pop
      except socket.timeout, err:
207 c2a03789 Iustin Pop
        raise TimeoutError("Receive timeout: %s" % str(err))
208 c2a03789 Iustin Pop
      if not data:
209 c2a03789 Iustin Pop
        raise ConnectionClosedError("Connection closed while reading")
210 c2a03789 Iustin Pop
      new_msgs = (self._buffer + data).split(self.eom)
211 c2a03789 Iustin Pop
      self._buffer = new_msgs.pop()
212 c2a03789 Iustin Pop
      self._msgs.extend(new_msgs)
213 c2a03789 Iustin Pop
    return self._msgs.popleft()
214 c2a03789 Iustin Pop
215 c2a03789 Iustin Pop
  def Call(self, msg):
216 c2a03789 Iustin Pop
    """Send a message and wait for the response.
217 c2a03789 Iustin Pop

218 c2a03789 Iustin Pop
    This is just a wrapper over Send and Recv.
219 c2a03789 Iustin Pop

220 c2a03789 Iustin Pop
    """
221 c2a03789 Iustin Pop
    self.Send(msg)
222 c2a03789 Iustin Pop
    return self.Recv()
223 c2a03789 Iustin Pop
224 c2a03789 Iustin Pop
  def Close(self):
225 c2a03789 Iustin Pop
    """Close the socket"""
226 c2a03789 Iustin Pop
    if self.socket is not None:
227 c2a03789 Iustin Pop
      self.socket.close()
228 c2a03789 Iustin Pop
      self.socket = None
229 c2a03789 Iustin Pop
230 c2a03789 Iustin Pop
231 c2a03789 Iustin Pop
class Client(object):
232 c2a03789 Iustin Pop
  """High-level client implementation.
233 c2a03789 Iustin Pop

234 c2a03789 Iustin Pop
  This uses a backing Transport-like class on top of which it
235 c2a03789 Iustin Pop
  implements data serialization/deserialization.
236 c2a03789 Iustin Pop

237 c2a03789 Iustin Pop
  """
238 ceab32dd Iustin Pop
  def __init__(self, address=None, timeouts=None, transport=Transport):
239 c2a03789 Iustin Pop
    """Constructor for the Client class.
240 c2a03789 Iustin Pop

241 c2a03789 Iustin Pop
    Arguments:
242 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
243 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
244 c2a03789 Iustin Pop
      - transport: a Transport-like class
245 c2a03789 Iustin Pop

246 c2a03789 Iustin Pop

247 c2a03789 Iustin Pop
    If timeout is not passed, the default timeouts of the transport
248 c2a03789 Iustin Pop
    class are used.
249 c2a03789 Iustin Pop

250 c2a03789 Iustin Pop
    """
251 ceab32dd Iustin Pop
    if address is None:
252 ceab32dd Iustin Pop
      address = constants.MASTER_SOCKET
253 c2a03789 Iustin Pop
    self.transport = transport(address, timeouts=timeouts)
254 c2a03789 Iustin Pop
255 3d8548c4 Michael Hanselmann
  def CallMethod(self, method, args):
256 c2a03789 Iustin Pop
    """Send a generic request and return the response.
257 c2a03789 Iustin Pop

258 c2a03789 Iustin Pop
    """
259 3d8548c4 Michael Hanselmann
    # Build request
260 3d8548c4 Michael Hanselmann
    request = {
261 3d8548c4 Michael Hanselmann
      KEY_METHOD: method,
262 3d8548c4 Michael Hanselmann
      KEY_ARGS: args,
263 3d8548c4 Michael Hanselmann
      }
264 3d8548c4 Michael Hanselmann
265 3d8548c4 Michael Hanselmann
    # Send request and wait for response
266 3d8548c4 Michael Hanselmann
    result = self.transport.Call(serializer.DumpJson(request, indent=False))
267 c2a03789 Iustin Pop
    try:
268 fad50141 Michael Hanselmann
      data = serializer.LoadJson(result)
269 c2a03789 Iustin Pop
    except Exception, err:
270 c2a03789 Iustin Pop
      raise ProtocolError("Error while deserializing response: %s" % str(err))
271 3d8548c4 Michael Hanselmann
272 3d8548c4 Michael Hanselmann
    # Validate response
273 a14a17fc Iustin Pop
    if (not isinstance(data, dict) or
274 3d8548c4 Michael Hanselmann
        KEY_SUCCESS not in data or
275 3d8548c4 Michael Hanselmann
        KEY_RESULT not in data):
276 a14a17fc Iustin Pop
      raise DecodingError("Invalid response from server: %s" % str(data))
277 3d8548c4 Michael Hanselmann
278 3d8548c4 Michael Hanselmann
    if not data[KEY_SUCCESS]:
279 3d8548c4 Michael Hanselmann
      # TODO: decide on a standard exception
280 3d8548c4 Michael Hanselmann
      raise RequestError(data[KEY_RESULT])
281 3d8548c4 Michael Hanselmann
282 3d8548c4 Michael Hanselmann
    return data[KEY_RESULT]
283 c2a03789 Iustin Pop
284 0bbe448c Michael Hanselmann
  def SubmitJob(self, ops):
285 0bbe448c Michael Hanselmann
    ops_state = map(lambda op: op.__getstate__(), ops)
286 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
287 0bbe448c Michael Hanselmann
288 0bbe448c Michael Hanselmann
  def CancelJob(self, job_id):
289 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
290 0bbe448c Michael Hanselmann
291 0bbe448c Michael Hanselmann
  def ArchiveJob(self, job_id):
292 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
293 0bbe448c Michael Hanselmann
294 07cd723a Iustin Pop
  def AutoArchiveJobs(self, age):
295 07cd723a Iustin Pop
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, age)
296 07cd723a Iustin Pop
297 6c5a7090 Michael Hanselmann
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
298 5c735209 Iustin Pop
    timeout = (DEF_RWTO - 1) / 2
299 5c735209 Iustin Pop
    while True:
300 5c735209 Iustin Pop
      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
301 5c735209 Iustin Pop
                               (job_id, fields, prev_job_info,
302 5c735209 Iustin Pop
                                prev_log_serial, timeout))
303 5c735209 Iustin Pop
      if result != constants.JOB_NOTCHANGED:
304 5c735209 Iustin Pop
        break
305 5c735209 Iustin Pop
    return result
306 dfe57c22 Michael Hanselmann
307 0bbe448c Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
308 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
309 3d8548c4 Michael Hanselmann
310 ee6c7b94 Michael Hanselmann
  def QueryInstances(self, names, fields):
311 ee6c7b94 Michael Hanselmann
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
312 ee6c7b94 Michael Hanselmann
313 02f7fe54 Michael Hanselmann
  def QueryNodes(self, names, fields):
314 02f7fe54 Michael Hanselmann
    return self.CallMethod(REQ_QUERY_NODES, (names, fields))
315 02f7fe54 Michael Hanselmann
316 32f93223 Michael Hanselmann
  def QueryExports(self, nodes):
317 32f93223 Michael Hanselmann
    return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
318 32f93223 Michael Hanselmann
319 ae5849b5 Michael Hanselmann
  def QueryConfigValues(self, fields):
320 ae5849b5 Michael Hanselmann
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
321 ae5849b5 Michael Hanselmann
322 3d8548c4 Michael Hanselmann
# TODO: class Server(object)