Fix error message when masterd is not listening
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocl. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also be used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36
37 from ganeti import serializer
38 from ganeti import constants
39
40
41 KEY_METHOD = 'method'
42 KEY_ARGS = 'args'
43 KEY_SUCCESS = "success"
44 KEY_RESULT = "result"
45
46 REQ_SUBMIT_JOB = "SubmitJob"
47 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
48 REQ_CANCEL_JOB = "CancelJob"
49 REQ_ARCHIVE_JOB = "ArchiveJob"
50 REQ_QUERY_JOBS = "QueryJobs"
51 REQ_QUERY_INSTANCES = "QueryInstances"
52 REQ_QUERY_NODES = "QueryNodes"
53 REQ_QUERY_EXPORTS = "QueryExports"
54
55 DEF_CTMO = 10
56 DEF_RWTO = 60
57
58
59 class ProtocolError(Exception):
60   """Denotes an error in the server communication"""
61
62
63 class ConnectionClosedError(ProtocolError):
64   """Connection closed error"""
65
66
67 class TimeoutError(ProtocolError):
68   """Operation timeout error"""
69
70
71 class EncodingError(ProtocolError):
72   """Encoding failure on the sending side"""
73
74
75 class DecodingError(ProtocolError):
76   """Decoding failure on the receiving side"""
77
78
79 class RequestError(ProtocolError):
80   """Error on request
81
82   This signifies an error in the request format or request handling,
83   but not (e.g.) an error in starting up an instance.
84
85   Some common conditions that can trigger this exception:
86     - job submission failed because the job data was wrong
87     - query failed because required fields were missing
88
89   """
90
91
92 class NoMasterError(ProtocolError):
93   """The master cannot be reached
94
95   This means that the master daemon is not running or the socket has
96   been removed.
97
98   """
99
100
101 class Transport:
102   """Low-level transport class.
103
104   This is used on the client side.
105
106   This could be replace by any other class that provides the same
107   semantics to the Client. This means:
108     - can send messages and receive messages
109     - safe for multithreading
110
111   """
112
113   def __init__(self, address, timeouts=None, eom=None):
114     """Constructor for the Client class.
115
116     Arguments:
117       - address: a valid address the the used transport class
118       - timeout: a list of timeouts, to be used on connect and read/write
119       - eom: an identifier to be used as end-of-message which the
120         upper-layer will guarantee that this identifier will not appear
121         in any message
122
123     There are two timeouts used since we might want to wait for a long
124     time for a response, but the connect timeout should be lower.
125
126     If not passed, we use a default of 10 and respectively 60 seconds.
127
128     Note that on reading data, since the timeout applies to an
129     invidual receive, it might be that the total duration is longer
130     than timeout value passed (we make a hard limit at twice the read
131     timeout).
132
133     """
134     self.address = address
135     if timeouts is None:
136       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
137     else:
138       self._ctimeout, self._rwtimeout = timeouts
139
140     self.socket = None
141     self._buffer = ""
142     self._msgs = collections.deque()
143
144     if eom is None:
145       self.eom = '\3'
146     else:
147       self.eom = eom
148
149     try:
150       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
151       self.socket.settimeout(self._ctimeout)
152       try:
153         self.socket.connect(address)
154       except socket.timeout, err:
155         raise TimeoutError("Connect timed out: %s" % str(err))
156       except socket.error, err:
157         if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
158           raise NoMasterError((address,))
159         raise
160       self.socket.settimeout(self._rwtimeout)
161     except (socket.error, NoMasterError):
162       if self.socket is not None:
163         self.socket.close()
164       self.socket = None
165       raise
166
167   def _CheckSocket(self):
168     """Make sure we are connected.
169
170     """
171     if self.socket is None:
172       raise ProtocolError("Connection is closed")
173
174   def Send(self, msg):
175     """Send a message.
176
177     This just sends a message and doesn't wait for the response.
178
179     """
180     if self.eom in msg:
181       raise EncodingError("Message terminator found in payload")
182     self._CheckSocket()
183     try:
184       self.socket.sendall(msg + self.eom)
185     except socket.timeout, err:
186       raise TimeoutError("Sending timeout: %s" % str(err))
187
188   def Recv(self):
189     """Try to receive a messae from the socket.
190
191     In case we already have messages queued, we just return from the
192     queue. Otherwise, we try to read data with a _rwtimeout network
193     timeout, and making sure we don't go over 2x_rwtimeout as a global
194     limit.
195
196     """
197     self._CheckSocket()
198     etime = time.time() + self._rwtimeout
199     while not self._msgs:
200       if time.time() > etime:
201         raise TimeoutError("Extended receive timeout")
202       try:
203         data = self.socket.recv(4096)
204       except socket.timeout, err:
205         raise TimeoutError("Receive timeout: %s" % str(err))
206       if not data:
207         raise ConnectionClosedError("Connection closed while reading")
208       new_msgs = (self._buffer + data).split(self.eom)
209       self._buffer = new_msgs.pop()
210       self._msgs.extend(new_msgs)
211     return self._msgs.popleft()
212
213   def Call(self, msg):
214     """Send a message and wait for the response.
215
216     This is just a wrapper over Send and Recv.
217
218     """
219     self.Send(msg)
220     return self.Recv()
221
222   def Close(self):
223     """Close the socket"""
224     if self.socket is not None:
225       self.socket.close()
226       self.socket = None
227
228
229 class Client(object):
230   """High-level client implementation.
231
232   This uses a backing Transport-like class on top of which it
233   implements data serialization/deserialization.
234
235   """
236   def __init__(self, address=None, timeouts=None, transport=Transport):
237     """Constructor for the Client class.
238
239     Arguments:
240       - address: a valid address the the used transport class
241       - timeout: a list of timeouts, to be used on connect and read/write
242       - transport: a Transport-like class
243
244
245     If timeout is not passed, the default timeouts of the transport
246     class are used.
247
248     """
249     if address is None:
250       address = constants.MASTER_SOCKET
251     self.transport = transport(address, timeouts=timeouts)
252
253   def CallMethod(self, method, args):
254     """Send a generic request and return the response.
255
256     """
257     # Build request
258     request = {
259       KEY_METHOD: method,
260       KEY_ARGS: args,
261       }
262
263     # Send request and wait for response
264     result = self.transport.Call(serializer.DumpJson(request, indent=False))
265     try:
266       data = serializer.LoadJson(result)
267     except Exception, err:
268       raise ProtocolError("Error while deserializing response: %s" % str(err))
269
270     # Validate response
271     if (not isinstance(data, dict) or
272         KEY_SUCCESS not in data or
273         KEY_RESULT not in data):
274       raise DecodingError("Invalid response from server: %s" % str(data))
275
276     if not data[KEY_SUCCESS]:
277       # TODO: decide on a standard exception
278       raise RequestError(data[KEY_RESULT])
279
280     return data[KEY_RESULT]
281
282   def SubmitJob(self, ops):
283     ops_state = map(lambda op: op.__getstate__(), ops)
284     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
285
286   def CancelJob(self, job_id):
287     return self.CallMethod(REQ_CANCEL_JOB, job_id)
288
289   def ArchiveJob(self, job_id):
290     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
291
292   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
293     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
294                            (job_id, fields, prev_job_info, prev_log_serial))
295
296   def QueryJobs(self, job_ids, fields):
297     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
298
299   def QueryInstances(self, names, fields):
300     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
301
302   def QueryNodes(self, names, fields):
303     return self.CallMethod(REQ_QUERY_NODES, (names, fields))
304
305   def QueryExports(self, nodes):
306     return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
307
308 # TODO: class Server(object)