Remove old job queue code
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocl. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also be used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import simplejson
35 import time
36 import errno
37
38 from ganeti import opcodes
39 from ganeti import serializer
40 from ganeti import constants
41
42
43 KEY_METHOD = 'method'
44 KEY_ARGS = 'args'
45 KEY_SUCCESS = "success"
46 KEY_RESULT = "result"
47
48 REQ_SUBMIT_JOB = "SubmitJob"
49 REQ_CANCEL_JOB = "CancelJob"
50 REQ_ARCHIVE_JOB = "ArchiveJob"
51 REQ_QUERY_JOBS = "QueryJobs"
52
53 DEF_CTMO = 10
54 DEF_RWTO = 60
55
56
57 class ProtocolError(Exception):
58   """Denotes an error in the server communication"""
59
60
61 class ConnectionClosedError(ProtocolError):
62   """Connection closed error"""
63
64
65 class TimeoutError(ProtocolError):
66   """Operation timeout error"""
67
68
69 class EncodingError(ProtocolError):
70   """Encoding failure on the sending side"""
71
72
73 class DecodingError(ProtocolError):
74   """Decoding failure on the receiving side"""
75
76
77 class RequestError(ProtocolError):
78   """Error on request
79
80   This signifies an error in the request format or request handling,
81   but not (e.g.) an error in starting up an instance.
82
83   Some common conditions that can trigger this exception:
84     - job submission failed because the job data was wrong
85     - query failed because required fields were missing
86
87   """
88
89
90 class NoMasterError(ProtocolError):
91   """The master cannot be reached
92
93   This means that the master daemon is not running or the socket has
94   been removed.
95
96   """
97
98
99 class Transport:
100   """Low-level transport class.
101
102   This is used on the client side.
103
104   This could be replace by any other class that provides the same
105   semantics to the Client. This means:
106     - can send messages and receive messages
107     - safe for multithreading
108
109   """
110
111   def __init__(self, address, timeouts=None, eom=None):
112     """Constructor for the Client class.
113
114     Arguments:
115       - address: a valid address the the used transport class
116       - timeout: a list of timeouts, to be used on connect and read/write
117       - eom: an identifier to be used as end-of-message which the
118         upper-layer will guarantee that this identifier will not appear
119         in any message
120
121     There are two timeouts used since we might want to wait for a long
122     time for a response, but the connect timeout should be lower.
123
124     If not passed, we use a default of 10 and respectively 60 seconds.
125
126     Note that on reading data, since the timeout applies to an
127     invidual receive, it might be that the total duration is longer
128     than timeout value passed (we make a hard limit at twice the read
129     timeout).
130
131     """
132     self.address = address
133     if timeouts is None:
134       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
135     else:
136       self._ctimeout, self._rwtimeout = timeouts
137
138     self.socket = None
139     self._buffer = ""
140     self._msgs = collections.deque()
141
142     if eom is None:
143       self.eom = '\3'
144     else:
145       self.eom = eom
146
147     try:
148       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
149       self.socket.settimeout(self._ctimeout)
150       try:
151         self.socket.connect(address)
152       except socket.timeout, err:
153         raise TimeoutError("Connect timed out: %s" % str(err))
154       except socket.error, err:
155         if err.args[0] == errno.ENOENT:
156           raise NoMasterError((address,))
157         raise
158       self.socket.settimeout(self._rwtimeout)
159     except (socket.error, NoMasterError):
160       if self.socket is not None:
161         self.socket.close()
162       self.socket = None
163       raise
164
165   def _CheckSocket(self):
166     """Make sure we are connected.
167
168     """
169     if self.socket is None:
170       raise ProtocolError("Connection is closed")
171
172   def Send(self, msg):
173     """Send a message.
174
175     This just sends a message and doesn't wait for the response.
176
177     """
178     if self.eom in msg:
179       raise EncodingError("Message terminator found in payload")
180     self._CheckSocket()
181     try:
182       self.socket.sendall(msg + self.eom)
183     except socket.timeout, err:
184       raise TimeoutError("Sending timeout: %s" % str(err))
185
186   def Recv(self):
187     """Try to receive a messae from the socket.
188
189     In case we already have messages queued, we just return from the
190     queue. Otherwise, we try to read data with a _rwtimeout network
191     timeout, and making sure we don't go over 2x_rwtimeout as a global
192     limit.
193
194     """
195     self._CheckSocket()
196     etime = time.time() + self._rwtimeout
197     while not self._msgs:
198       if time.time() > etime:
199         raise TimeoutError("Extended receive timeout")
200       try:
201         data = self.socket.recv(4096)
202       except socket.timeout, err:
203         raise TimeoutError("Receive timeout: %s" % str(err))
204       if not data:
205         raise ConnectionClosedError("Connection closed while reading")
206       new_msgs = (self._buffer + data).split(self.eom)
207       self._buffer = new_msgs.pop()
208       self._msgs.extend(new_msgs)
209     return self._msgs.popleft()
210
211   def Call(self, msg):
212     """Send a message and wait for the response.
213
214     This is just a wrapper over Send and Recv.
215
216     """
217     self.Send(msg)
218     return self.Recv()
219
220   def Close(self):
221     """Close the socket"""
222     if self.socket is not None:
223       self.socket.close()
224       self.socket = None
225
226
227 class Client(object):
228   """High-level client implementation.
229
230   This uses a backing Transport-like class on top of which it
231   implements data serialization/deserialization.
232
233   """
234   def __init__(self, address=None, timeouts=None, transport=Transport):
235     """Constructor for the Client class.
236
237     Arguments:
238       - address: a valid address the the used transport class
239       - timeout: a list of timeouts, to be used on connect and read/write
240       - transport: a Transport-like class
241
242
243     If timeout is not passed, the default timeouts of the transport
244     class are used.
245
246     """
247     if address is None:
248       address = constants.MASTER_SOCKET
249     self.transport = transport(address, timeouts=timeouts)
250
251   def CallMethod(self, method, args):
252     """Send a generic request and return the response.
253
254     """
255     # Build request
256     request = {
257       KEY_METHOD: method,
258       KEY_ARGS: args,
259       }
260
261     # Send request and wait for response
262     result = self.transport.Call(serializer.DumpJson(request, indent=False))
263     try:
264       data = serializer.LoadJson(result)
265     except Exception, err:
266       raise ProtocolError("Error while deserializing response: %s" % str(err))
267
268     # Validate response
269     if (not isinstance(data, dict) or
270         KEY_SUCCESS not in data or
271         KEY_RESULT not in data):
272       raise DecodingError("Invalid response from server: %s" % str(data))
273
274     if not data[KEY_SUCCESS]:
275       # TODO: decide on a standard exception
276       raise RequestError(data[KEY_RESULT])
277
278     return data[KEY_RESULT]
279
280   def SubmitJob(self, ops):
281     ops_state = map(lambda op: op.__getstate__(), ops)
282     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
283
284   def CancelJob(self, job_id):
285     return self.CallMethod(REQ_CANCEL_JOB, job_id)
286
287   def ArchiveJob(self, job_id):
288     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
289
290   def QueryJobs(self, job_ids, fields):
291     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
292
293 # TODO: class Server(object)