root / lib / luxi.py @ a9369c6e
History | View | Annotate | Download (7.8 kB)
1 | c2a03789 | Iustin Pop | #
|
---|---|---|---|
2 | c2a03789 | Iustin Pop | #
|
3 | c2a03789 | Iustin Pop | |
4 | c2a03789 | Iustin Pop | # Copyright (C) 2006, 2007 Google Inc.
|
5 | c2a03789 | Iustin Pop | #
|
6 | c2a03789 | Iustin Pop | # This program is free software; you can redistribute it and/or modify
|
7 | c2a03789 | Iustin Pop | # it under the terms of the GNU General Public License as published by
|
8 | c2a03789 | Iustin Pop | # the Free Software Foundation; either version 2 of the License, or
|
9 | c2a03789 | Iustin Pop | # (at your option) any later version.
|
10 | c2a03789 | Iustin Pop | #
|
11 | c2a03789 | Iustin Pop | # This program is distributed in the hope that it will be useful, but
|
12 | c2a03789 | Iustin Pop | # WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 | c2a03789 | Iustin Pop | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
14 | c2a03789 | Iustin Pop | # General Public License for more details.
|
15 | c2a03789 | Iustin Pop | #
|
16 | c2a03789 | Iustin Pop | # You should have received a copy of the GNU General Public License
|
17 | c2a03789 | Iustin Pop | # along with this program; if not, write to the Free Software
|
18 | c2a03789 | Iustin Pop | # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
19 | c2a03789 | Iustin Pop | # 02110-1301, USA.
|
20 | c2a03789 | Iustin Pop | |
21 | c2a03789 | Iustin Pop | |
22 | c2a03789 | Iustin Pop | """Module for the unix socket protocol
|
23 | c2a03789 | Iustin Pop |
|
24 | c2a03789 | Iustin Pop | This module implements the local unix socket protocl. You only need
|
25 | c2a03789 | Iustin Pop | this module and the opcodes module in the client program in order to
|
26 | c2a03789 | Iustin Pop | communicate with the master.
|
27 | c2a03789 | Iustin Pop |
|
28 | c2a03789 | Iustin Pop | The module is also be used by the master daemon.
|
29 | c2a03789 | Iustin Pop |
|
30 | c2a03789 | Iustin Pop | """
|
31 | c2a03789 | Iustin Pop | |
32 | c2a03789 | Iustin Pop | import socket |
33 | c2a03789 | Iustin Pop | import collections |
34 | c2a03789 | Iustin Pop | import simplejson |
35 | c2a03789 | Iustin Pop | import time |
36 | c2a03789 | Iustin Pop | |
37 | c2a03789 | Iustin Pop | from ganeti import opcodes |
38 | ceab32dd | Iustin Pop | from ganeti import constants |
39 | c2a03789 | Iustin Pop | |
40 | c2a03789 | Iustin Pop | |
41 | c2a03789 | Iustin Pop | KEY_REQUEST = 'request'
|
42 | c2a03789 | Iustin Pop | KEY_DATA = 'data'
|
43 | c2a03789 | Iustin Pop | REQ_SUBMIT = 'submit'
|
44 | c2a03789 | Iustin Pop | REQ_ABORT = 'abort'
|
45 | c2a03789 | Iustin Pop | REQ_QUERY = 'query'
|
46 | c2a03789 | Iustin Pop | |
47 | c2a03789 | Iustin Pop | DEF_CTMO = 10
|
48 | c2a03789 | Iustin Pop | DEF_RWTO = 60
|
49 | c2a03789 | Iustin Pop | |
50 | c2a03789 | Iustin Pop | |
51 | c2a03789 | Iustin Pop | class ProtocolError(Exception): |
52 | c2a03789 | Iustin Pop | """Denotes an error in the server communication"""
|
53 | c2a03789 | Iustin Pop | |
54 | c2a03789 | Iustin Pop | |
55 | c2a03789 | Iustin Pop | class ConnectionClosedError(ProtocolError): |
56 | c2a03789 | Iustin Pop | """Connection closed error"""
|
57 | c2a03789 | Iustin Pop | |
58 | c2a03789 | Iustin Pop | |
59 | c2a03789 | Iustin Pop | class TimeoutError(ProtocolError): |
60 | c2a03789 | Iustin Pop | """Operation timeout error"""
|
61 | c2a03789 | Iustin Pop | |
62 | c2a03789 | Iustin Pop | |
63 | c2a03789 | Iustin Pop | class EncodingError(ProtocolError): |
64 | c2a03789 | Iustin Pop | """Encoding failure on the sending side"""
|
65 | c2a03789 | Iustin Pop | |
66 | c2a03789 | Iustin Pop | |
67 | c2a03789 | Iustin Pop | class DecodingError(ProtocolError): |
68 | c2a03789 | Iustin Pop | """Decoding failure on the receiving side"""
|
69 | c2a03789 | Iustin Pop | |
70 | c2a03789 | Iustin Pop | |
71 | b77acb3e | Iustin Pop | class RequestError(ProtocolError): |
72 | b77acb3e | Iustin Pop | """Error on request
|
73 | b77acb3e | Iustin Pop |
|
74 | b77acb3e | Iustin Pop | This signifies an error in the request format or request handling,
|
75 | b77acb3e | Iustin Pop | but not (e.g.) an error in starting up an instance.
|
76 | b77acb3e | Iustin Pop |
|
77 | b77acb3e | Iustin Pop | Some common conditions that can trigger this exception:
|
78 | b77acb3e | Iustin Pop | - job submission failed because the job data was wrong
|
79 | b77acb3e | Iustin Pop | - query failed because required fields were missing
|
80 | b77acb3e | Iustin Pop |
|
81 | b77acb3e | Iustin Pop | """
|
82 | b77acb3e | Iustin Pop | |
83 | b77acb3e | Iustin Pop | |
84 | c2a03789 | Iustin Pop | def SerializeJob(job): |
85 | c2a03789 | Iustin Pop | """Convert a job description to a string format.
|
86 | c2a03789 | Iustin Pop |
|
87 | c2a03789 | Iustin Pop | """
|
88 | c2a03789 | Iustin Pop | return simplejson.dumps(job.__getstate__())
|
89 | c2a03789 | Iustin Pop | |
90 | c2a03789 | Iustin Pop | |
91 | c2a03789 | Iustin Pop | def UnserializeJob(data): |
92 | c2a03789 | Iustin Pop | """Load a job from a string format"""
|
93 | c2a03789 | Iustin Pop | try:
|
94 | c2a03789 | Iustin Pop | new_data = simplejson.loads(data) |
95 | c2a03789 | Iustin Pop | except Exception, err: |
96 | c2a03789 | Iustin Pop | raise DecodingError("Error while unserializing: %s" % str(err)) |
97 | c2a03789 | Iustin Pop | job = opcodes.Job() |
98 | c2a03789 | Iustin Pop | job.__setstate__(new_data) |
99 | c2a03789 | Iustin Pop | return job
|
100 | c2a03789 | Iustin Pop | |
101 | c2a03789 | Iustin Pop | |
102 | c2a03789 | Iustin Pop | class Transport: |
103 | c2a03789 | Iustin Pop | """Low-level transport class.
|
104 | c2a03789 | Iustin Pop |
|
105 | c2a03789 | Iustin Pop | This is used on the client side.
|
106 | c2a03789 | Iustin Pop |
|
107 | c2a03789 | Iustin Pop | This could be replace by any other class that provides the same
|
108 | c2a03789 | Iustin Pop | semantics to the Client. This means:
|
109 | c2a03789 | Iustin Pop | - can send messages and receive messages
|
110 | c2a03789 | Iustin Pop | - safe for multithreading
|
111 | c2a03789 | Iustin Pop |
|
112 | c2a03789 | Iustin Pop | """
|
113 | c2a03789 | Iustin Pop | |
114 | c2a03789 | Iustin Pop | def __init__(self, address, timeouts=None, eom=None): |
115 | c2a03789 | Iustin Pop | """Constructor for the Client class.
|
116 | c2a03789 | Iustin Pop |
|
117 | c2a03789 | Iustin Pop | Arguments:
|
118 | c2a03789 | Iustin Pop | - address: a valid address the the used transport class
|
119 | c2a03789 | Iustin Pop | - timeout: a list of timeouts, to be used on connect and read/write
|
120 | c2a03789 | Iustin Pop | - eom: an identifier to be used as end-of-message which the
|
121 | c2a03789 | Iustin Pop | upper-layer will guarantee that this identifier will not appear
|
122 | c2a03789 | Iustin Pop | in any message
|
123 | c2a03789 | Iustin Pop |
|
124 | c2a03789 | Iustin Pop | There are two timeouts used since we might want to wait for a long
|
125 | c2a03789 | Iustin Pop | time for a response, but the connect timeout should be lower.
|
126 | c2a03789 | Iustin Pop |
|
127 | c2a03789 | Iustin Pop | If not passed, we use a default of 10 and respectively 60 seconds.
|
128 | c2a03789 | Iustin Pop |
|
129 | c2a03789 | Iustin Pop | Note that on reading data, since the timeout applies to an
|
130 | c2a03789 | Iustin Pop | invidual receive, it might be that the total duration is longer
|
131 | c2a03789 | Iustin Pop | than timeout value passed (we make a hard limit at twice the read
|
132 | c2a03789 | Iustin Pop | timeout).
|
133 | c2a03789 | Iustin Pop |
|
134 | c2a03789 | Iustin Pop | """
|
135 | c2a03789 | Iustin Pop | self.address = address
|
136 | c2a03789 | Iustin Pop | if timeouts is None: |
137 | c2a03789 | Iustin Pop | self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO |
138 | c2a03789 | Iustin Pop | else:
|
139 | c2a03789 | Iustin Pop | self._ctimeout, self._rwtimeout = timeouts |
140 | c2a03789 | Iustin Pop | |
141 | c2a03789 | Iustin Pop | self.socket = None |
142 | c2a03789 | Iustin Pop | self._buffer = "" |
143 | c2a03789 | Iustin Pop | self._msgs = collections.deque()
|
144 | c2a03789 | Iustin Pop | |
145 | c2a03789 | Iustin Pop | if eom is None: |
146 | c2a03789 | Iustin Pop | self.eom = '\3' |
147 | c2a03789 | Iustin Pop | else:
|
148 | c2a03789 | Iustin Pop | self.eom = eom
|
149 | c2a03789 | Iustin Pop | |
150 | c2a03789 | Iustin Pop | try:
|
151 | c2a03789 | Iustin Pop | self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
152 | c2a03789 | Iustin Pop | self.socket.settimeout(self._ctimeout) |
153 | c2a03789 | Iustin Pop | try:
|
154 | c2a03789 | Iustin Pop | self.socket.connect(address)
|
155 | c2a03789 | Iustin Pop | except socket.timeout, err:
|
156 | c2a03789 | Iustin Pop | raise TimeoutError("Connection timed out: %s" % str(err)) |
157 | c2a03789 | Iustin Pop | self.socket.settimeout(self._rwtimeout) |
158 | c2a03789 | Iustin Pop | except socket.error:
|
159 | c2a03789 | Iustin Pop | if self.socket is not None: |
160 | c2a03789 | Iustin Pop | self.socket.close()
|
161 | c2a03789 | Iustin Pop | self.socket = None |
162 | c2a03789 | Iustin Pop | raise
|
163 | c2a03789 | Iustin Pop | |
164 | c2a03789 | Iustin Pop | def _CheckSocket(self): |
165 | c2a03789 | Iustin Pop | """Make sure we are connected.
|
166 | c2a03789 | Iustin Pop |
|
167 | c2a03789 | Iustin Pop | """
|
168 | c2a03789 | Iustin Pop | if self.socket is None: |
169 | c2a03789 | Iustin Pop | raise ProtocolError("Connection is closed") |
170 | c2a03789 | Iustin Pop | |
171 | c2a03789 | Iustin Pop | def Send(self, msg): |
172 | c2a03789 | Iustin Pop | """Send a message.
|
173 | c2a03789 | Iustin Pop |
|
174 | c2a03789 | Iustin Pop | This just sends a message and doesn't wait for the response.
|
175 | c2a03789 | Iustin Pop |
|
176 | c2a03789 | Iustin Pop | """
|
177 | c2a03789 | Iustin Pop | if self.eom in msg: |
178 | c2a03789 | Iustin Pop | raise EncodingError("Message terminator found in payload") |
179 | c2a03789 | Iustin Pop | self._CheckSocket()
|
180 | c2a03789 | Iustin Pop | try:
|
181 | c2a03789 | Iustin Pop | self.socket.sendall(msg + self.eom) |
182 | c2a03789 | Iustin Pop | except socket.timeout, err:
|
183 | c2a03789 | Iustin Pop | raise TimeoutError("Sending timeout: %s" % str(err)) |
184 | c2a03789 | Iustin Pop | |
185 | c2a03789 | Iustin Pop | def Recv(self): |
186 | c2a03789 | Iustin Pop | """Try to receive a messae from the socket.
|
187 | c2a03789 | Iustin Pop |
|
188 | c2a03789 | Iustin Pop | In case we already have messages queued, we just return from the
|
189 | c2a03789 | Iustin Pop | queue. Otherwise, we try to read data with a _rwtimeout network
|
190 | c2a03789 | Iustin Pop | timeout, and making sure we don't go over 2x_rwtimeout as a global
|
191 | c2a03789 | Iustin Pop | limit.
|
192 | c2a03789 | Iustin Pop |
|
193 | c2a03789 | Iustin Pop | """
|
194 | c2a03789 | Iustin Pop | self._CheckSocket()
|
195 | c2a03789 | Iustin Pop | etime = time.time() + self._rwtimeout
|
196 | c2a03789 | Iustin Pop | while not self._msgs: |
197 | c2a03789 | Iustin Pop | if time.time() > etime:
|
198 | c2a03789 | Iustin Pop | raise TimeoutError("Extended receive timeout") |
199 | c2a03789 | Iustin Pop | try:
|
200 | c2a03789 | Iustin Pop | data = self.socket.recv(4096) |
201 | c2a03789 | Iustin Pop | except socket.timeout, err:
|
202 | c2a03789 | Iustin Pop | raise TimeoutError("Receive timeout: %s" % str(err)) |
203 | c2a03789 | Iustin Pop | if not data: |
204 | c2a03789 | Iustin Pop | raise ConnectionClosedError("Connection closed while reading") |
205 | c2a03789 | Iustin Pop | new_msgs = (self._buffer + data).split(self.eom) |
206 | c2a03789 | Iustin Pop | self._buffer = new_msgs.pop()
|
207 | c2a03789 | Iustin Pop | self._msgs.extend(new_msgs)
|
208 | c2a03789 | Iustin Pop | return self._msgs.popleft() |
209 | c2a03789 | Iustin Pop | |
210 | c2a03789 | Iustin Pop | def Call(self, msg): |
211 | c2a03789 | Iustin Pop | """Send a message and wait for the response.
|
212 | c2a03789 | Iustin Pop |
|
213 | c2a03789 | Iustin Pop | This is just a wrapper over Send and Recv.
|
214 | c2a03789 | Iustin Pop |
|
215 | c2a03789 | Iustin Pop | """
|
216 | c2a03789 | Iustin Pop | self.Send(msg)
|
217 | c2a03789 | Iustin Pop | return self.Recv() |
218 | c2a03789 | Iustin Pop | |
219 | c2a03789 | Iustin Pop | def Close(self): |
220 | c2a03789 | Iustin Pop | """Close the socket"""
|
221 | c2a03789 | Iustin Pop | if self.socket is not None: |
222 | c2a03789 | Iustin Pop | self.socket.close()
|
223 | c2a03789 | Iustin Pop | self.socket = None |
224 | c2a03789 | Iustin Pop | |
225 | c2a03789 | Iustin Pop | |
226 | c2a03789 | Iustin Pop | class Client(object): |
227 | c2a03789 | Iustin Pop | """High-level client implementation.
|
228 | c2a03789 | Iustin Pop |
|
229 | c2a03789 | Iustin Pop | This uses a backing Transport-like class on top of which it
|
230 | c2a03789 | Iustin Pop | implements data serialization/deserialization.
|
231 | c2a03789 | Iustin Pop |
|
232 | c2a03789 | Iustin Pop | """
|
233 | ceab32dd | Iustin Pop | def __init__(self, address=None, timeouts=None, transport=Transport): |
234 | c2a03789 | Iustin Pop | """Constructor for the Client class.
|
235 | c2a03789 | Iustin Pop |
|
236 | c2a03789 | Iustin Pop | Arguments:
|
237 | c2a03789 | Iustin Pop | - address: a valid address the the used transport class
|
238 | c2a03789 | Iustin Pop | - timeout: a list of timeouts, to be used on connect and read/write
|
239 | c2a03789 | Iustin Pop | - transport: a Transport-like class
|
240 | c2a03789 | Iustin Pop |
|
241 | c2a03789 | Iustin Pop |
|
242 | c2a03789 | Iustin Pop | If timeout is not passed, the default timeouts of the transport
|
243 | c2a03789 | Iustin Pop | class are used.
|
244 | c2a03789 | Iustin Pop |
|
245 | c2a03789 | Iustin Pop | """
|
246 | ceab32dd | Iustin Pop | if address is None: |
247 | ceab32dd | Iustin Pop | address = constants.MASTER_SOCKET |
248 | c2a03789 | Iustin Pop | self.transport = transport(address, timeouts=timeouts)
|
249 | c2a03789 | Iustin Pop | |
250 | c2a03789 | Iustin Pop | def SendRequest(self, request, data): |
251 | c2a03789 | Iustin Pop | """Send a generic request and return the response.
|
252 | c2a03789 | Iustin Pop |
|
253 | c2a03789 | Iustin Pop | """
|
254 | c2a03789 | Iustin Pop | msg = {KEY_REQUEST: request, KEY_DATA: data} |
255 | c2a03789 | Iustin Pop | result = self.transport.Call(simplejson.dumps(msg))
|
256 | c2a03789 | Iustin Pop | try:
|
257 | c2a03789 | Iustin Pop | data = simplejson.loads(result) |
258 | c2a03789 | Iustin Pop | except Exception, err: |
259 | c2a03789 | Iustin Pop | raise ProtocolError("Error while deserializing response: %s" % str(err)) |
260 | a14a17fc | Iustin Pop | if (not isinstance(data, dict) or |
261 | a14a17fc | Iustin Pop | 'success' not in data or |
262 | a14a17fc | Iustin Pop | 'result' not in data): |
263 | a14a17fc | Iustin Pop | raise DecodingError("Invalid response from server: %s" % str(data)) |
264 | c2a03789 | Iustin Pop | return data
|
265 | c2a03789 | Iustin Pop | |
266 | c2a03789 | Iustin Pop | def SubmitJob(self, job): |
267 | c2a03789 | Iustin Pop | """Submit a job"""
|
268 | b77acb3e | Iustin Pop | result = self.SendRequest(REQ_SUBMIT, SerializeJob(job))
|
269 | b77acb3e | Iustin Pop | if not result['success']: |
270 | b77acb3e | Iustin Pop | raise RequestError(result['result']) |
271 | b77acb3e | Iustin Pop | return result['result'] |
272 | c2a03789 | Iustin Pop | |
273 | c2a03789 | Iustin Pop | def Query(self, data): |
274 | c2a03789 | Iustin Pop | """Make a query"""
|
275 | b77acb3e | Iustin Pop | result = self.SendRequest(REQ_QUERY, data)
|
276 | b77acb3e | Iustin Pop | if not result['success']: |
277 | b77acb3e | Iustin Pop | raise RequestError(result[result])
|
278 | b77acb3e | Iustin Pop | result = result['result']
|
279 | b77acb3e | Iustin Pop | if data["object"] == "jobs": |
280 | b77acb3e | Iustin Pop | # custom job processing of query values
|
281 | b77acb3e | Iustin Pop | for row in result: |
282 | b77acb3e | Iustin Pop | for idx, field in enumerate(data["fields"]): |
283 | b77acb3e | Iustin Pop | if field == "op_list": |
284 | b77acb3e | Iustin Pop | row[idx] = [opcodes.OpCode.LoadOpCode(i) for i in row[idx]] |
285 | b77acb3e | Iustin Pop | return result |