Implement query for instances
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocl. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also be used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36
37 from ganeti import serializer
38 from ganeti import constants
39
40
41 KEY_METHOD = 'method'
42 KEY_ARGS = 'args'
43 KEY_SUCCESS = "success"
44 KEY_RESULT = "result"
45
46 REQ_SUBMIT_JOB = "SubmitJob"
47 REQ_CANCEL_JOB = "CancelJob"
48 REQ_ARCHIVE_JOB = "ArchiveJob"
49 REQ_QUERY_JOBS = "QueryJobs"
50 REQ_QUERY_INSTANCES = "QueryInstances"
51
52 DEF_CTMO = 10
53 DEF_RWTO = 60
54
55
56 class ProtocolError(Exception):
57   """Denotes an error in the server communication"""
58
59
60 class ConnectionClosedError(ProtocolError):
61   """Connection closed error"""
62
63
64 class TimeoutError(ProtocolError):
65   """Operation timeout error"""
66
67
68 class EncodingError(ProtocolError):
69   """Encoding failure on the sending side"""
70
71
72 class DecodingError(ProtocolError):
73   """Decoding failure on the receiving side"""
74
75
76 class RequestError(ProtocolError):
77   """Error on request
78
79   This signifies an error in the request format or request handling,
80   but not (e.g.) an error in starting up an instance.
81
82   Some common conditions that can trigger this exception:
83     - job submission failed because the job data was wrong
84     - query failed because required fields were missing
85
86   """
87
88
89 class NoMasterError(ProtocolError):
90   """The master cannot be reached
91
92   This means that the master daemon is not running or the socket has
93   been removed.
94
95   """
96
97
98 class Transport:
99   """Low-level transport class.
100
101   This is used on the client side.
102
103   This could be replace by any other class that provides the same
104   semantics to the Client. This means:
105     - can send messages and receive messages
106     - safe for multithreading
107
108   """
109
110   def __init__(self, address, timeouts=None, eom=None):
111     """Constructor for the Client class.
112
113     Arguments:
114       - address: a valid address the the used transport class
115       - timeout: a list of timeouts, to be used on connect and read/write
116       - eom: an identifier to be used as end-of-message which the
117         upper-layer will guarantee that this identifier will not appear
118         in any message
119
120     There are two timeouts used since we might want to wait for a long
121     time for a response, but the connect timeout should be lower.
122
123     If not passed, we use a default of 10 and respectively 60 seconds.
124
125     Note that on reading data, since the timeout applies to an
126     invidual receive, it might be that the total duration is longer
127     than timeout value passed (we make a hard limit at twice the read
128     timeout).
129
130     """
131     self.address = address
132     if timeouts is None:
133       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
134     else:
135       self._ctimeout, self._rwtimeout = timeouts
136
137     self.socket = None
138     self._buffer = ""
139     self._msgs = collections.deque()
140
141     if eom is None:
142       self.eom = '\3'
143     else:
144       self.eom = eom
145
146     try:
147       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
148       self.socket.settimeout(self._ctimeout)
149       try:
150         self.socket.connect(address)
151       except socket.timeout, err:
152         raise TimeoutError("Connect timed out: %s" % str(err))
153       except socket.error, err:
154         if err.args[0] == errno.ENOENT:
155           raise NoMasterError((address,))
156         raise
157       self.socket.settimeout(self._rwtimeout)
158     except (socket.error, NoMasterError):
159       if self.socket is not None:
160         self.socket.close()
161       self.socket = None
162       raise
163
164   def _CheckSocket(self):
165     """Make sure we are connected.
166
167     """
168     if self.socket is None:
169       raise ProtocolError("Connection is closed")
170
171   def Send(self, msg):
172     """Send a message.
173
174     This just sends a message and doesn't wait for the response.
175
176     """
177     if self.eom in msg:
178       raise EncodingError("Message terminator found in payload")
179     self._CheckSocket()
180     try:
181       self.socket.sendall(msg + self.eom)
182     except socket.timeout, err:
183       raise TimeoutError("Sending timeout: %s" % str(err))
184
185   def Recv(self):
186     """Try to receive a messae from the socket.
187
188     In case we already have messages queued, we just return from the
189     queue. Otherwise, we try to read data with a _rwtimeout network
190     timeout, and making sure we don't go over 2x_rwtimeout as a global
191     limit.
192
193     """
194     self._CheckSocket()
195     etime = time.time() + self._rwtimeout
196     while not self._msgs:
197       if time.time() > etime:
198         raise TimeoutError("Extended receive timeout")
199       try:
200         data = self.socket.recv(4096)
201       except socket.timeout, err:
202         raise TimeoutError("Receive timeout: %s" % str(err))
203       if not data:
204         raise ConnectionClosedError("Connection closed while reading")
205       new_msgs = (self._buffer + data).split(self.eom)
206       self._buffer = new_msgs.pop()
207       self._msgs.extend(new_msgs)
208     return self._msgs.popleft()
209
210   def Call(self, msg):
211     """Send a message and wait for the response.
212
213     This is just a wrapper over Send and Recv.
214
215     """
216     self.Send(msg)
217     return self.Recv()
218
219   def Close(self):
220     """Close the socket"""
221     if self.socket is not None:
222       self.socket.close()
223       self.socket = None
224
225
226 class Client(object):
227   """High-level client implementation.
228
229   This uses a backing Transport-like class on top of which it
230   implements data serialization/deserialization.
231
232   """
233   def __init__(self, address=None, timeouts=None, transport=Transport):
234     """Constructor for the Client class.
235
236     Arguments:
237       - address: a valid address the the used transport class
238       - timeout: a list of timeouts, to be used on connect and read/write
239       - transport: a Transport-like class
240
241
242     If timeout is not passed, the default timeouts of the transport
243     class are used.
244
245     """
246     if address is None:
247       address = constants.MASTER_SOCKET
248     self.transport = transport(address, timeouts=timeouts)
249
250   def CallMethod(self, method, args):
251     """Send a generic request and return the response.
252
253     """
254     # Build request
255     request = {
256       KEY_METHOD: method,
257       KEY_ARGS: args,
258       }
259
260     # Send request and wait for response
261     result = self.transport.Call(serializer.DumpJson(request, indent=False))
262     try:
263       data = serializer.LoadJson(result)
264     except Exception, err:
265       raise ProtocolError("Error while deserializing response: %s" % str(err))
266
267     # Validate response
268     if (not isinstance(data, dict) or
269         KEY_SUCCESS not in data or
270         KEY_RESULT not in data):
271       raise DecodingError("Invalid response from server: %s" % str(data))
272
273     if not data[KEY_SUCCESS]:
274       # TODO: decide on a standard exception
275       raise RequestError(data[KEY_RESULT])
276
277     return data[KEY_RESULT]
278
279   def SubmitJob(self, ops):
280     ops_state = map(lambda op: op.__getstate__(), ops)
281     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
282
283   def CancelJob(self, job_id):
284     return self.CallMethod(REQ_CANCEL_JOB, job_id)
285
286   def ArchiveJob(self, job_id):
287     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
288
289   def QueryJobs(self, job_ids, fields):
290     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
291
292   def QueryInstances(self, names, fields):
293     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
294
295 # TODO: class Server(object)