symlinks: Add DISK_LINKS_DIR constant
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocl. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also be used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import simplejson
35 import time
36 import errno
37
38 from ganeti import opcodes
39 from ganeti import serializer
40 from ganeti import constants
41
42
43 KEY_REQUEST = 'request'
44 KEY_DATA = 'data'
45 REQ_SUBMIT = 'submit'
46 REQ_ABORT = 'abort'
47 REQ_QUERY = 'query'
48
49 DEF_CTMO = 10
50 DEF_RWTO = 60
51
52
53 class ProtocolError(Exception):
54   """Denotes an error in the server communication"""
55
56
57 class ConnectionClosedError(ProtocolError):
58   """Connection closed error"""
59
60
61 class TimeoutError(ProtocolError):
62   """Operation timeout error"""
63
64
65 class EncodingError(ProtocolError):
66   """Encoding failure on the sending side"""
67
68
69 class DecodingError(ProtocolError):
70   """Decoding failure on the receiving side"""
71
72
73 class RequestError(ProtocolError):
74   """Error on request
75
76   This signifies an error in the request format or request handling,
77   but not (e.g.) an error in starting up an instance.
78
79   Some common conditions that can trigger this exception:
80     - job submission failed because the job data was wrong
81     - query failed because required fields were missing
82
83   """
84
85 class NoMasterError(ProtocolError):
86   """The master cannot be reached
87
88   This means that the master daemon is not running or the socket has
89   been removed.
90
91   """
92
93
94 def SerializeJob(job):
95   """Convert a job description to a string format.
96
97   """
98   return simplejson.dumps(job.__getstate__())
99
100
101 def UnserializeJob(data):
102   """Load a job from a string format"""
103   try:
104     new_data = simplejson.loads(data)
105   except Exception, err:
106     raise DecodingError("Error while unserializing: %s" % str(err))
107   job = opcodes.Job()
108   job.__setstate__(new_data)
109   return job
110
111
112 class Transport:
113   """Low-level transport class.
114
115   This is used on the client side.
116
117   This could be replace by any other class that provides the same
118   semantics to the Client. This means:
119     - can send messages and receive messages
120     - safe for multithreading
121
122   """
123
124   def __init__(self, address, timeouts=None, eom=None):
125     """Constructor for the Client class.
126
127     Arguments:
128       - address: a valid address the the used transport class
129       - timeout: a list of timeouts, to be used on connect and read/write
130       - eom: an identifier to be used as end-of-message which the
131         upper-layer will guarantee that this identifier will not appear
132         in any message
133
134     There are two timeouts used since we might want to wait for a long
135     time for a response, but the connect timeout should be lower.
136
137     If not passed, we use a default of 10 and respectively 60 seconds.
138
139     Note that on reading data, since the timeout applies to an
140     invidual receive, it might be that the total duration is longer
141     than timeout value passed (we make a hard limit at twice the read
142     timeout).
143
144     """
145     self.address = address
146     if timeouts is None:
147       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
148     else:
149       self._ctimeout, self._rwtimeout = timeouts
150
151     self.socket = None
152     self._buffer = ""
153     self._msgs = collections.deque()
154
155     if eom is None:
156       self.eom = '\3'
157     else:
158       self.eom = eom
159
160     try:
161       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
162       self.socket.settimeout(self._ctimeout)
163       try:
164         self.socket.connect(address)
165       except socket.timeout, err:
166         raise TimeoutError("Connect timed out: %s" % str(err))
167       except socket.error, err:
168         if err.args[0] == errno.ENOENT:
169           raise NoMasterError((address,))
170         raise
171       self.socket.settimeout(self._rwtimeout)
172     except (socket.error, NoMasterError):
173       if self.socket is not None:
174         self.socket.close()
175       self.socket = None
176       raise
177
178   def _CheckSocket(self):
179     """Make sure we are connected.
180
181     """
182     if self.socket is None:
183       raise ProtocolError("Connection is closed")
184
185   def Send(self, msg):
186     """Send a message.
187
188     This just sends a message and doesn't wait for the response.
189
190     """
191     if self.eom in msg:
192       raise EncodingError("Message terminator found in payload")
193     self._CheckSocket()
194     try:
195       self.socket.sendall(msg + self.eom)
196     except socket.timeout, err:
197       raise TimeoutError("Sending timeout: %s" % str(err))
198
199   def Recv(self):
200     """Try to receive a messae from the socket.
201
202     In case we already have messages queued, we just return from the
203     queue. Otherwise, we try to read data with a _rwtimeout network
204     timeout, and making sure we don't go over 2x_rwtimeout as a global
205     limit.
206
207     """
208     self._CheckSocket()
209     etime = time.time() + self._rwtimeout
210     while not self._msgs:
211       if time.time() > etime:
212         raise TimeoutError("Extended receive timeout")
213       try:
214         data = self.socket.recv(4096)
215       except socket.timeout, err:
216         raise TimeoutError("Receive timeout: %s" % str(err))
217       if not data:
218         raise ConnectionClosedError("Connection closed while reading")
219       new_msgs = (self._buffer + data).split(self.eom)
220       self._buffer = new_msgs.pop()
221       self._msgs.extend(new_msgs)
222     return self._msgs.popleft()
223
224   def Call(self, msg):
225     """Send a message and wait for the response.
226
227     This is just a wrapper over Send and Recv.
228
229     """
230     self.Send(msg)
231     return self.Recv()
232
233   def Close(self):
234     """Close the socket"""
235     if self.socket is not None:
236       self.socket.close()
237       self.socket = None
238
239
240 class Client(object):
241   """High-level client implementation.
242
243   This uses a backing Transport-like class on top of which it
244   implements data serialization/deserialization.
245
246   """
247   def __init__(self, address=None, timeouts=None, transport=Transport):
248     """Constructor for the Client class.
249
250     Arguments:
251       - address: a valid address the the used transport class
252       - timeout: a list of timeouts, to be used on connect and read/write
253       - transport: a Transport-like class
254
255
256     If timeout is not passed, the default timeouts of the transport
257     class are used.
258
259     """
260     if address is None:
261       address = constants.MASTER_SOCKET
262     self.transport = transport(address, timeouts=timeouts)
263
264   def SendRequest(self, request, data):
265     """Send a generic request and return the response.
266
267     """
268     msg = {KEY_REQUEST: request, KEY_DATA: data}
269     result = self.transport.Call(serializer.DumpJson(msg, indent=False))
270     try:
271       data = serializer.LoadJson(result)
272     except Exception, err:
273       raise ProtocolError("Error while deserializing response: %s" % str(err))
274     if (not isinstance(data, dict) or
275         'success' not in data or
276         'result' not in data):
277       raise DecodingError("Invalid response from server: %s" % str(data))
278     return data
279
280   def SubmitJob(self, job):
281     """Submit a job"""
282     result = self.SendRequest(REQ_SUBMIT, SerializeJob(job))
283     if not result['success']:
284       raise RequestError(result['result'])
285     return result['result']
286
287   def Query(self, data):
288     """Make a query"""
289     result = self.SendRequest(REQ_QUERY, data)
290     if not result['success']:
291       raise RequestError(result[result])
292     result = result['result']
293     if data["object"] == "jobs":
294       # custom job processing of query values
295       for row in result:
296         for idx, field in enumerate(data["fields"]):
297           if field == "op_list":
298             row[idx] = [opcodes.OpCode.LoadOpCode(i) for i in row[idx]]
299     return result