Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 16687b98

History | View | Annotate | Download (7 kB)

1 c2a03789 Iustin Pop
#
2 c2a03789 Iustin Pop
#
3 c2a03789 Iustin Pop
4 c2a03789 Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 c2a03789 Iustin Pop
#
6 c2a03789 Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 c2a03789 Iustin Pop
# it under the terms of the GNU General Public License as published by
8 c2a03789 Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 c2a03789 Iustin Pop
# (at your option) any later version.
10 c2a03789 Iustin Pop
#
11 c2a03789 Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 c2a03789 Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 c2a03789 Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 c2a03789 Iustin Pop
# General Public License for more details.
15 c2a03789 Iustin Pop
#
16 c2a03789 Iustin Pop
# You should have received a copy of the GNU General Public License
17 c2a03789 Iustin Pop
# along with this program; if not, write to the Free Software
18 c2a03789 Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 c2a03789 Iustin Pop
# 02110-1301, USA.
20 c2a03789 Iustin Pop
21 c2a03789 Iustin Pop
22 c2a03789 Iustin Pop
"""Module for the unix socket protocol
23 c2a03789 Iustin Pop

24 c2a03789 Iustin Pop
This module implements the local unix socket protocl. You only need
25 c2a03789 Iustin Pop
this module and the opcodes module in the client program in order to
26 c2a03789 Iustin Pop
communicate with the master.
27 c2a03789 Iustin Pop

28 c2a03789 Iustin Pop
The module is also be used by the master daemon.
29 c2a03789 Iustin Pop

30 c2a03789 Iustin Pop
"""
31 c2a03789 Iustin Pop
32 c2a03789 Iustin Pop
import socket
33 c2a03789 Iustin Pop
import collections
34 c2a03789 Iustin Pop
import simplejson
35 c2a03789 Iustin Pop
import time
36 c2a03789 Iustin Pop
37 c2a03789 Iustin Pop
from ganeti import opcodes
38 ceab32dd Iustin Pop
from ganeti import constants
39 c2a03789 Iustin Pop
40 c2a03789 Iustin Pop
41 c2a03789 Iustin Pop
KEY_REQUEST = 'request'
42 c2a03789 Iustin Pop
KEY_DATA = 'data'
43 c2a03789 Iustin Pop
REQ_SUBMIT = 'submit'
44 c2a03789 Iustin Pop
REQ_ABORT = 'abort'
45 c2a03789 Iustin Pop
REQ_QUERY = 'query'
46 c2a03789 Iustin Pop
47 c2a03789 Iustin Pop
DEF_CTMO = 10
48 c2a03789 Iustin Pop
DEF_RWTO = 60
49 c2a03789 Iustin Pop
50 c2a03789 Iustin Pop
51 c2a03789 Iustin Pop
class ProtocolError(Exception):
52 c2a03789 Iustin Pop
  """Denotes an error in the server communication"""
53 c2a03789 Iustin Pop
54 c2a03789 Iustin Pop
55 c2a03789 Iustin Pop
class ConnectionClosedError(ProtocolError):
56 c2a03789 Iustin Pop
  """Connection closed error"""
57 c2a03789 Iustin Pop
58 c2a03789 Iustin Pop
59 c2a03789 Iustin Pop
class TimeoutError(ProtocolError):
60 c2a03789 Iustin Pop
  """Operation timeout error"""
61 c2a03789 Iustin Pop
62 c2a03789 Iustin Pop
63 c2a03789 Iustin Pop
class EncodingError(ProtocolError):
64 c2a03789 Iustin Pop
  """Encoding failure on the sending side"""
65 c2a03789 Iustin Pop
66 c2a03789 Iustin Pop
67 c2a03789 Iustin Pop
class DecodingError(ProtocolError):
68 c2a03789 Iustin Pop
  """Decoding failure on the receiving side"""
69 c2a03789 Iustin Pop
70 c2a03789 Iustin Pop
71 c2a03789 Iustin Pop
def SerializeJob(job):
72 c2a03789 Iustin Pop
  """Convert a job description to a string format.
73 c2a03789 Iustin Pop

74 c2a03789 Iustin Pop
  """
75 c2a03789 Iustin Pop
  return simplejson.dumps(job.__getstate__())
76 c2a03789 Iustin Pop
77 c2a03789 Iustin Pop
78 c2a03789 Iustin Pop
def UnserializeJob(data):
79 c2a03789 Iustin Pop
  """Load a job from a string format"""
80 c2a03789 Iustin Pop
  try:
81 c2a03789 Iustin Pop
    new_data = simplejson.loads(data)
82 c2a03789 Iustin Pop
  except Exception, err:
83 c2a03789 Iustin Pop
    raise DecodingError("Error while unserializing: %s" % str(err))
84 c2a03789 Iustin Pop
  job = opcodes.Job()
85 c2a03789 Iustin Pop
  job.__setstate__(new_data)
86 c2a03789 Iustin Pop
  return job
87 c2a03789 Iustin Pop
88 c2a03789 Iustin Pop
89 c2a03789 Iustin Pop
class Transport:
90 c2a03789 Iustin Pop
  """Low-level transport class.
91 c2a03789 Iustin Pop

92 c2a03789 Iustin Pop
  This is used on the client side.
93 c2a03789 Iustin Pop

94 c2a03789 Iustin Pop
  This could be replace by any other class that provides the same
95 c2a03789 Iustin Pop
  semantics to the Client. This means:
96 c2a03789 Iustin Pop
    - can send messages and receive messages
97 c2a03789 Iustin Pop
    - safe for multithreading
98 c2a03789 Iustin Pop

99 c2a03789 Iustin Pop
  """
100 c2a03789 Iustin Pop
101 c2a03789 Iustin Pop
  def __init__(self, address, timeouts=None, eom=None):
102 c2a03789 Iustin Pop
    """Constructor for the Client class.
103 c2a03789 Iustin Pop

104 c2a03789 Iustin Pop
    Arguments:
105 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
106 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
107 c2a03789 Iustin Pop
      - eom: an identifier to be used as end-of-message which the
108 c2a03789 Iustin Pop
        upper-layer will guarantee that this identifier will not appear
109 c2a03789 Iustin Pop
        in any message
110 c2a03789 Iustin Pop

111 c2a03789 Iustin Pop
    There are two timeouts used since we might want to wait for a long
112 c2a03789 Iustin Pop
    time for a response, but the connect timeout should be lower.
113 c2a03789 Iustin Pop

114 c2a03789 Iustin Pop
    If not passed, we use a default of 10 and respectively 60 seconds.
115 c2a03789 Iustin Pop

116 c2a03789 Iustin Pop
    Note that on reading data, since the timeout applies to an
117 c2a03789 Iustin Pop
    invidual receive, it might be that the total duration is longer
118 c2a03789 Iustin Pop
    than timeout value passed (we make a hard limit at twice the read
119 c2a03789 Iustin Pop
    timeout).
120 c2a03789 Iustin Pop

121 c2a03789 Iustin Pop
    """
122 c2a03789 Iustin Pop
    self.address = address
123 c2a03789 Iustin Pop
    if timeouts is None:
124 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
125 c2a03789 Iustin Pop
    else:
126 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = timeouts
127 c2a03789 Iustin Pop
128 c2a03789 Iustin Pop
    self.socket = None
129 c2a03789 Iustin Pop
    self._buffer = ""
130 c2a03789 Iustin Pop
    self._msgs = collections.deque()
131 c2a03789 Iustin Pop
132 c2a03789 Iustin Pop
    if eom is None:
133 c2a03789 Iustin Pop
      self.eom = '\3'
134 c2a03789 Iustin Pop
    else:
135 c2a03789 Iustin Pop
      self.eom = eom
136 c2a03789 Iustin Pop
137 c2a03789 Iustin Pop
    try:
138 c2a03789 Iustin Pop
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
139 c2a03789 Iustin Pop
      self.socket.settimeout(self._ctimeout)
140 c2a03789 Iustin Pop
      try:
141 c2a03789 Iustin Pop
        self.socket.connect(address)
142 c2a03789 Iustin Pop
      except socket.timeout, err:
143 c2a03789 Iustin Pop
        raise TimeoutError("Connection timed out: %s" % str(err))
144 c2a03789 Iustin Pop
      self.socket.settimeout(self._rwtimeout)
145 c2a03789 Iustin Pop
    except socket.error:
146 c2a03789 Iustin Pop
      if self.socket is not None:
147 c2a03789 Iustin Pop
        self.socket.close()
148 c2a03789 Iustin Pop
      self.socket = None
149 c2a03789 Iustin Pop
      raise
150 c2a03789 Iustin Pop
151 c2a03789 Iustin Pop
  def _CheckSocket(self):
152 c2a03789 Iustin Pop
    """Make sure we are connected.
153 c2a03789 Iustin Pop

154 c2a03789 Iustin Pop
    """
155 c2a03789 Iustin Pop
    if self.socket is None:
156 c2a03789 Iustin Pop
      raise ProtocolError("Connection is closed")
157 c2a03789 Iustin Pop
158 c2a03789 Iustin Pop
  def Send(self, msg):
159 c2a03789 Iustin Pop
    """Send a message.
160 c2a03789 Iustin Pop

161 c2a03789 Iustin Pop
    This just sends a message and doesn't wait for the response.
162 c2a03789 Iustin Pop

163 c2a03789 Iustin Pop
    """
164 c2a03789 Iustin Pop
    if self.eom in msg:
165 c2a03789 Iustin Pop
      raise EncodingError("Message terminator found in payload")
166 c2a03789 Iustin Pop
    self._CheckSocket()
167 c2a03789 Iustin Pop
    try:
168 c2a03789 Iustin Pop
      self.socket.sendall(msg + self.eom)
169 c2a03789 Iustin Pop
    except socket.timeout, err:
170 c2a03789 Iustin Pop
      raise TimeoutError("Sending timeout: %s" % str(err))
171 c2a03789 Iustin Pop
172 c2a03789 Iustin Pop
  def Recv(self):
173 c2a03789 Iustin Pop
    """Try to receive a messae from the socket.
174 c2a03789 Iustin Pop

175 c2a03789 Iustin Pop
    In case we already have messages queued, we just return from the
176 c2a03789 Iustin Pop
    queue. Otherwise, we try to read data with a _rwtimeout network
177 c2a03789 Iustin Pop
    timeout, and making sure we don't go over 2x_rwtimeout as a global
178 c2a03789 Iustin Pop
    limit.
179 c2a03789 Iustin Pop

180 c2a03789 Iustin Pop
    """
181 c2a03789 Iustin Pop
    self._CheckSocket()
182 c2a03789 Iustin Pop
    etime = time.time() + self._rwtimeout
183 c2a03789 Iustin Pop
    while not self._msgs:
184 c2a03789 Iustin Pop
      if time.time() > etime:
185 c2a03789 Iustin Pop
        raise TimeoutError("Extended receive timeout")
186 c2a03789 Iustin Pop
      try:
187 c2a03789 Iustin Pop
        data = self.socket.recv(4096)
188 c2a03789 Iustin Pop
      except socket.timeout, err:
189 c2a03789 Iustin Pop
        raise TimeoutError("Receive timeout: %s" % str(err))
190 c2a03789 Iustin Pop
      if not data:
191 c2a03789 Iustin Pop
        raise ConnectionClosedError("Connection closed while reading")
192 c2a03789 Iustin Pop
      new_msgs = (self._buffer + data).split(self.eom)
193 c2a03789 Iustin Pop
      self._buffer = new_msgs.pop()
194 c2a03789 Iustin Pop
      self._msgs.extend(new_msgs)
195 c2a03789 Iustin Pop
    return self._msgs.popleft()
196 c2a03789 Iustin Pop
197 c2a03789 Iustin Pop
  def Call(self, msg):
198 c2a03789 Iustin Pop
    """Send a message and wait for the response.
199 c2a03789 Iustin Pop

200 c2a03789 Iustin Pop
    This is just a wrapper over Send and Recv.
201 c2a03789 Iustin Pop

202 c2a03789 Iustin Pop
    """
203 c2a03789 Iustin Pop
    self.Send(msg)
204 c2a03789 Iustin Pop
    return self.Recv()
205 c2a03789 Iustin Pop
206 c2a03789 Iustin Pop
  def Close(self):
207 c2a03789 Iustin Pop
    """Close the socket"""
208 c2a03789 Iustin Pop
    if self.socket is not None:
209 c2a03789 Iustin Pop
      self.socket.close()
210 c2a03789 Iustin Pop
      self.socket = None
211 c2a03789 Iustin Pop
212 c2a03789 Iustin Pop
213 c2a03789 Iustin Pop
class Client(object):
214 c2a03789 Iustin Pop
  """High-level client implementation.
215 c2a03789 Iustin Pop

216 c2a03789 Iustin Pop
  This uses a backing Transport-like class on top of which it
217 c2a03789 Iustin Pop
  implements data serialization/deserialization.
218 c2a03789 Iustin Pop

219 c2a03789 Iustin Pop
  """
220 ceab32dd Iustin Pop
  def __init__(self, address=None, timeouts=None, transport=Transport):
221 c2a03789 Iustin Pop
    """Constructor for the Client class.
222 c2a03789 Iustin Pop

223 c2a03789 Iustin Pop
    Arguments:
224 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
225 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
226 c2a03789 Iustin Pop
      - transport: a Transport-like class
227 c2a03789 Iustin Pop

228 c2a03789 Iustin Pop

229 c2a03789 Iustin Pop
    If timeout is not passed, the default timeouts of the transport
230 c2a03789 Iustin Pop
    class are used.
231 c2a03789 Iustin Pop

232 c2a03789 Iustin Pop
    """
233 ceab32dd Iustin Pop
    if address is None:
234 ceab32dd Iustin Pop
      address = constants.MASTER_SOCKET
235 c2a03789 Iustin Pop
    self.transport = transport(address, timeouts=timeouts)
236 c2a03789 Iustin Pop
237 c2a03789 Iustin Pop
  def SendRequest(self, request, data):
238 c2a03789 Iustin Pop
    """Send a generic request and return the response.
239 c2a03789 Iustin Pop

240 c2a03789 Iustin Pop
    """
241 c2a03789 Iustin Pop
    msg = {KEY_REQUEST: request, KEY_DATA: data}
242 c2a03789 Iustin Pop
    result = self.transport.Call(simplejson.dumps(msg))
243 c2a03789 Iustin Pop
    try:
244 c2a03789 Iustin Pop
      data = simplejson.loads(result)
245 c2a03789 Iustin Pop
    except Exception, err:
246 c2a03789 Iustin Pop
      raise ProtocolError("Error while deserializing response: %s" % str(err))
247 a14a17fc Iustin Pop
    if (not isinstance(data, dict) or
248 a14a17fc Iustin Pop
        'success' not in data or
249 a14a17fc Iustin Pop
        'result' not in data):
250 a14a17fc Iustin Pop
      raise DecodingError("Invalid response from server: %s" % str(data))
251 c2a03789 Iustin Pop
    return data
252 c2a03789 Iustin Pop
253 c2a03789 Iustin Pop
  def SubmitJob(self, job):
254 c2a03789 Iustin Pop
    """Submit a job"""
255 c2a03789 Iustin Pop
    return self.SendRequest(REQ_SUBMIT, SerializeJob(job))
256 c2a03789 Iustin Pop
257 c2a03789 Iustin Pop
  def Query(self, data):
258 c2a03789 Iustin Pop
    """Make a query"""
259 c2a03789 Iustin Pop
    return self.SendRequest(REQ_QUERY, data)