Initial implementation of the client unix socket
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocl. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also be used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import simplejson
35 import time
36
37 from ganeti import opcodes
38
39
40 KEY_REQUEST = 'request'
41 KEY_DATA = 'data'
42 REQ_SUBMIT = 'submit'
43 REQ_ABORT = 'abort'
44 REQ_QUERY = 'query'
45
46 DEF_CTMO = 10
47 DEF_RWTO = 60
48
49
50 class ProtocolError(Exception):
51   """Denotes an error in the server communication"""
52
53
54 class ConnectionClosedError(ProtocolError):
55   """Connection closed error"""
56
57
58 class TimeoutError(ProtocolError):
59   """Operation timeout error"""
60
61
62 class EncodingError(ProtocolError):
63   """Encoding failure on the sending side"""
64
65
66 class DecodingError(ProtocolError):
67   """Decoding failure on the receiving side"""
68
69
70 def SerializeJob(job):
71   """Convert a job description to a string format.
72
73   """
74   return simplejson.dumps(job.__getstate__())
75
76
77 def UnserializeJob(data):
78   """Load a job from a string format"""
79   try:
80     new_data = simplejson.loads(data)
81   except Exception, err:
82     raise DecodingError("Error while unserializing: %s" % str(err))
83   job = opcodes.Job()
84   job.__setstate__(new_data)
85   return job
86
87
88 class Transport:
89   """Low-level transport class.
90
91   This is used on the client side.
92
93   This could be replace by any other class that provides the same
94   semantics to the Client. This means:
95     - can send messages and receive messages
96     - safe for multithreading
97
98   """
99
100   def __init__(self, address, timeouts=None, eom=None):
101     """Constructor for the Client class.
102
103     Arguments:
104       - address: a valid address the the used transport class
105       - timeout: a list of timeouts, to be used on connect and read/write
106       - eom: an identifier to be used as end-of-message which the
107         upper-layer will guarantee that this identifier will not appear
108         in any message
109
110     There are two timeouts used since we might want to wait for a long
111     time for a response, but the connect timeout should be lower.
112
113     If not passed, we use a default of 10 and respectively 60 seconds.
114
115     Note that on reading data, since the timeout applies to an
116     invidual receive, it might be that the total duration is longer
117     than timeout value passed (we make a hard limit at twice the read
118     timeout).
119
120     """
121     self.address = address
122     if timeouts is None:
123       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
124     else:
125       self._ctimeout, self._rwtimeout = timeouts
126
127     self.socket = None
128     self._buffer = ""
129     self._msgs = collections.deque()
130
131     if eom is None:
132       self.eom = '\3'
133     else:
134       self.eom = eom
135
136     try:
137       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
138       self.socket.settimeout(self._ctimeout)
139       try:
140         self.socket.connect(address)
141       except socket.timeout, err:
142         raise TimeoutError("Connection timed out: %s" % str(err))
143       self.socket.settimeout(self._rwtimeout)
144     except socket.error:
145       if self.socket is not None:
146         self.socket.close()
147       self.socket = None
148       raise
149
150   def _CheckSocket(self):
151     """Make sure we are connected.
152
153     """
154     if self.socket is None:
155       raise ProtocolError("Connection is closed")
156
157   def Send(self, msg):
158     """Send a message.
159
160     This just sends a message and doesn't wait for the response.
161
162     """
163     if self.eom in msg:
164       raise EncodingError("Message terminator found in payload")
165     self._CheckSocket()
166     try:
167       self.socket.sendall(msg + self.eom)
168     except socket.timeout, err:
169       raise TimeoutError("Sending timeout: %s" % str(err))
170
171   def Recv(self):
172     """Try to receive a messae from the socket.
173
174     In case we already have messages queued, we just return from the
175     queue. Otherwise, we try to read data with a _rwtimeout network
176     timeout, and making sure we don't go over 2x_rwtimeout as a global
177     limit.
178
179     """
180     self._CheckSocket()
181     etime = time.time() + self._rwtimeout
182     while not self._msgs:
183       if time.time() > etime:
184         raise TimeoutError("Extended receive timeout")
185       try:
186         data = self.socket.recv(4096)
187       except socket.timeout, err:
188         raise TimeoutError("Receive timeout: %s" % str(err))
189       if not data:
190         raise ConnectionClosedError("Connection closed while reading")
191       new_msgs = (self._buffer + data).split(self.eom)
192       self._buffer = new_msgs.pop()
193       self._msgs.extend(new_msgs)
194     return self._msgs.popleft()
195
196   def Call(self, msg):
197     """Send a message and wait for the response.
198
199     This is just a wrapper over Send and Recv.
200
201     """
202     self.Send(msg)
203     return self.Recv()
204
205   def Close(self):
206     """Close the socket"""
207     if self.socket is not None:
208       self.socket.close()
209       self.socket = None
210
211
212 class Client(object):
213   """High-level client implementation.
214
215   This uses a backing Transport-like class on top of which it
216   implements data serialization/deserialization.
217
218   """
219   def __init__(self, address, timeouts=None, transport=Transport):
220     """Constructor for the Client class.
221
222     Arguments:
223       - address: a valid address the the used transport class
224       - timeout: a list of timeouts, to be used on connect and read/write
225       - transport: a Transport-like class
226
227
228     If timeout is not passed, the default timeouts of the transport
229     class are used.
230
231     """
232     self.transport = transport(address, timeouts=timeouts)
233
234   def SendRequest(self, request, data):
235     """Send a generic request and return the response.
236
237     """
238     msg = {KEY_REQUEST: request, KEY_DATA: data}
239     result = self.transport.Call(simplejson.dumps(msg))
240     try:
241       data = simplejson.loads(result)
242     except Exception, err:
243       raise ProtocolError("Error while deserializing response: %s" % str(err))
244     return data
245
246   def SubmitJob(self, job):
247     """Submit a job"""
248     return self.SendRequest(REQ_SUBMIT, SerializeJob(job))
249
250   def Query(self, data):
251     """Make a query"""
252     return self.SendRequest(REQ_QUERY, data)