root / lib / luxi.py @ d9d1e541
History | View | Annotate | Download (16.8 kB)
1 | c2a03789 | Iustin Pop | #
|
---|---|---|---|
2 | c2a03789 | Iustin Pop | #
|
3 | c2a03789 | Iustin Pop | |
4 | 83c046a2 | Iustin Pop | # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
|
5 | c2a03789 | Iustin Pop | #
|
6 | c2a03789 | Iustin Pop | # This program is free software; you can redistribute it and/or modify
|
7 | c2a03789 | Iustin Pop | # it under the terms of the GNU General Public License as published by
|
8 | c2a03789 | Iustin Pop | # the Free Software Foundation; either version 2 of the License, or
|
9 | c2a03789 | Iustin Pop | # (at your option) any later version.
|
10 | c2a03789 | Iustin Pop | #
|
11 | c2a03789 | Iustin Pop | # This program is distributed in the hope that it will be useful, but
|
12 | c2a03789 | Iustin Pop | # WITHOUT ANY WARRANTY; without even the implied warranty of
|
13 | c2a03789 | Iustin Pop | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
14 | c2a03789 | Iustin Pop | # General Public License for more details.
|
15 | c2a03789 | Iustin Pop | #
|
16 | c2a03789 | Iustin Pop | # You should have received a copy of the GNU General Public License
|
17 | c2a03789 | Iustin Pop | # along with this program; if not, write to the Free Software
|
18 | c2a03789 | Iustin Pop | # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
19 | c2a03789 | Iustin Pop | # 02110-1301, USA.
|
20 | c2a03789 | Iustin Pop | |
21 | c2a03789 | Iustin Pop | |
22 | c2a03789 | Iustin Pop | """Module for the unix socket protocol
|
23 | c2a03789 | Iustin Pop |
|
24 | 8d5b316c | Iustin Pop | This module implements the local unix socket protocol. You only need
|
25 | c2a03789 | Iustin Pop | this module and the opcodes module in the client program in order to
|
26 | c2a03789 | Iustin Pop | communicate with the master.
|
27 | c2a03789 | Iustin Pop |
|
28 | 7577196d | Guido Trotter | The module is also used by the master daemon.
|
29 | c2a03789 | Iustin Pop |
|
30 | c2a03789 | Iustin Pop | """
|
31 | c2a03789 | Iustin Pop | |
32 | c2a03789 | Iustin Pop | import socket |
33 | c2a03789 | Iustin Pop | import collections |
34 | c2a03789 | Iustin Pop | import time |
35 | 03a8dbdc | Iustin Pop | import errno |
36 | 231db3a5 | Michael Hanselmann | import logging |
37 | c2a03789 | Iustin Pop | |
38 | b8028dcf | Michael Hanselmann | from ganeti import compat |
39 | fad50141 | Michael Hanselmann | from ganeti import serializer |
40 | ceab32dd | Iustin Pop | from ganeti import constants |
41 | 6797ec29 | Iustin Pop | from ganeti import errors |
42 | cb462b06 | Michael Hanselmann | from ganeti import utils |
43 | 28b71a76 | Michael Hanselmann | from ganeti import objects |
44 | b87ee98f | Michael Hanselmann | from ganeti import pathutils |
45 | c2a03789 | Iustin Pop | |
46 | c2a03789 | Iustin Pop | |
47 | 231db3a5 | Michael Hanselmann | KEY_METHOD = "method"
|
48 | 231db3a5 | Michael Hanselmann | KEY_ARGS = "args"
|
49 | 3d8548c4 | Michael Hanselmann | KEY_SUCCESS = "success"
|
50 | 3d8548c4 | Michael Hanselmann | KEY_RESULT = "result"
|
51 | e986f20c | Michael Hanselmann | KEY_VERSION = "version"
|
52 | 3d8548c4 | Michael Hanselmann | |
53 | 0bbe448c | Michael Hanselmann | REQ_SUBMIT_JOB = "SubmitJob"
|
54 | 346c3037 | Klaus Aehlig | REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = "SubmitJobToDrainedQueue"
|
55 | 2971c913 | Iustin Pop | REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
|
56 | d9d1e541 | Klaus Aehlig | REQ_PICKUP_JOB = "PickupJob"
|
57 | dfe57c22 | Michael Hanselmann | REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
|
58 | 0bbe448c | Michael Hanselmann | REQ_CANCEL_JOB = "CancelJob"
|
59 | 0bbe448c | Michael Hanselmann | REQ_ARCHIVE_JOB = "ArchiveJob"
|
60 | f63ffb37 | Michael Hanselmann | REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority"
|
61 | 83c046a2 | Iustin Pop | REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
|
62 | 28b71a76 | Michael Hanselmann | REQ_QUERY = "Query"
|
63 | 28b71a76 | Michael Hanselmann | REQ_QUERY_FIELDS = "QueryFields"
|
64 | 0bbe448c | Michael Hanselmann | REQ_QUERY_JOBS = "QueryJobs"
|
65 | ee6c7b94 | Michael Hanselmann | REQ_QUERY_INSTANCES = "QueryInstances"
|
66 | 02f7fe54 | Michael Hanselmann | REQ_QUERY_NODES = "QueryNodes"
|
67 | a79ef2a5 | Adeodato Simo | REQ_QUERY_GROUPS = "QueryGroups"
|
68 | 306bed0e | Apollon Oikonomopoulos | REQ_QUERY_NETWORKS = "QueryNetworks"
|
69 | 32f93223 | Michael Hanselmann | REQ_QUERY_EXPORTS = "QueryExports"
|
70 | ae5849b5 | Michael Hanselmann | REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
|
71 | 66baeccc | Iustin Pop | REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
|
72 | 7699c3af | Iustin Pop | REQ_QUERY_TAGS = "QueryTags"
|
73 | 83c046a2 | Iustin Pop | REQ_SET_DRAIN_FLAG = "SetDrainFlag"
|
74 | 05e50653 | Michael Hanselmann | REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
|
75 | c2a03789 | Iustin Pop | |
76 | e3a25810 | Michael Hanselmann | #: List of all LUXI requests
|
77 | b8028dcf | Michael Hanselmann | REQ_ALL = compat.UniqueFrozenset([ |
78 | e3a25810 | Michael Hanselmann | REQ_ARCHIVE_JOB, |
79 | 83c046a2 | Iustin Pop | REQ_AUTO_ARCHIVE_JOBS, |
80 | e3a25810 | Michael Hanselmann | REQ_CANCEL_JOB, |
81 | f63ffb37 | Michael Hanselmann | REQ_CHANGE_JOB_PRIORITY, |
82 | d9d1e541 | Klaus Aehlig | REQ_PICKUP_JOB, |
83 | e3a25810 | Michael Hanselmann | REQ_QUERY, |
84 | e3a25810 | Michael Hanselmann | REQ_QUERY_CLUSTER_INFO, |
85 | e3a25810 | Michael Hanselmann | REQ_QUERY_CONFIG_VALUES, |
86 | e3a25810 | Michael Hanselmann | REQ_QUERY_EXPORTS, |
87 | e3a25810 | Michael Hanselmann | REQ_QUERY_FIELDS, |
88 | e3a25810 | Michael Hanselmann | REQ_QUERY_GROUPS, |
89 | e3a25810 | Michael Hanselmann | REQ_QUERY_INSTANCES, |
90 | e3a25810 | Michael Hanselmann | REQ_QUERY_JOBS, |
91 | e3a25810 | Michael Hanselmann | REQ_QUERY_NODES, |
92 | 795d035d | Klaus Aehlig | REQ_QUERY_NETWORKS, |
93 | e3a25810 | Michael Hanselmann | REQ_QUERY_TAGS, |
94 | 83c046a2 | Iustin Pop | REQ_SET_DRAIN_FLAG, |
95 | e3a25810 | Michael Hanselmann | REQ_SET_WATCHER_PAUSE, |
96 | e3a25810 | Michael Hanselmann | REQ_SUBMIT_JOB, |
97 | 346c3037 | Klaus Aehlig | REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, |
98 | e3a25810 | Michael Hanselmann | REQ_SUBMIT_MANY_JOBS, |
99 | e3a25810 | Michael Hanselmann | REQ_WAIT_FOR_JOB_CHANGE, |
100 | e3a25810 | Michael Hanselmann | ]) |
101 | e3a25810 | Michael Hanselmann | |
102 | c2a03789 | Iustin Pop | DEF_CTMO = 10
|
103 | c2a03789 | Iustin Pop | DEF_RWTO = 60
|
104 | c2a03789 | Iustin Pop | |
105 | 793a8f7c | Michael Hanselmann | # WaitForJobChange timeout
|
106 | 793a8f7c | Michael Hanselmann | WFJC_TIMEOUT = (DEF_RWTO - 1) / 2 |
107 | 793a8f7c | Michael Hanselmann | |
108 | c2a03789 | Iustin Pop | |
109 | 7a8bda3f | Michael Hanselmann | class ProtocolError(errors.LuxiError): |
110 | 5a1c22fe | Iustin Pop | """Denotes an error in the LUXI protocol."""
|
111 | c2a03789 | Iustin Pop | |
112 | c2a03789 | Iustin Pop | |
113 | c2a03789 | Iustin Pop | class ConnectionClosedError(ProtocolError): |
114 | 5a1c22fe | Iustin Pop | """Connection closed error."""
|
115 | c2a03789 | Iustin Pop | |
116 | c2a03789 | Iustin Pop | |
117 | c2a03789 | Iustin Pop | class TimeoutError(ProtocolError): |
118 | 5a1c22fe | Iustin Pop | """Operation timeout error."""
|
119 | c2a03789 | Iustin Pop | |
120 | c2a03789 | Iustin Pop | |
121 | b77acb3e | Iustin Pop | class RequestError(ProtocolError): |
122 | 5a1c22fe | Iustin Pop | """Error on request.
|
123 | b77acb3e | Iustin Pop |
|
124 | b77acb3e | Iustin Pop | This signifies an error in the request format or request handling,
|
125 | b77acb3e | Iustin Pop | but not (e.g.) an error in starting up an instance.
|
126 | b77acb3e | Iustin Pop |
|
127 | b77acb3e | Iustin Pop | Some common conditions that can trigger this exception:
|
128 | b77acb3e | Iustin Pop | - job submission failed because the job data was wrong
|
129 | b77acb3e | Iustin Pop | - query failed because required fields were missing
|
130 | b77acb3e | Iustin Pop |
|
131 | b77acb3e | Iustin Pop | """
|
132 | b77acb3e | Iustin Pop | |
133 | 3d8548c4 | Michael Hanselmann | |
134 | 03a8dbdc | Iustin Pop | class NoMasterError(ProtocolError): |
135 | 5a1c22fe | Iustin Pop | """The master cannot be reached.
|
136 | 03a8dbdc | Iustin Pop |
|
137 | 03a8dbdc | Iustin Pop | This means that the master daemon is not running or the socket has
|
138 | 03a8dbdc | Iustin Pop | been removed.
|
139 | 03a8dbdc | Iustin Pop |
|
140 | 03a8dbdc | Iustin Pop | """
|
141 | 03a8dbdc | Iustin Pop | |
142 | b77acb3e | Iustin Pop | |
143 | 5a1c22fe | Iustin Pop | class PermissionError(ProtocolError): |
144 | 5a1c22fe | Iustin Pop | """Permission denied while connecting to the master socket.
|
145 | 5a1c22fe | Iustin Pop |
|
146 | 5a1c22fe | Iustin Pop | This means the user doesn't have the proper rights.
|
147 | 5a1c22fe | Iustin Pop |
|
148 | 5a1c22fe | Iustin Pop | """
|
149 | 5a1c22fe | Iustin Pop | |
150 | 5a1c22fe | Iustin Pop | |
151 | c2a03789 | Iustin Pop | class Transport: |
152 | c2a03789 | Iustin Pop | """Low-level transport class.
|
153 | c2a03789 | Iustin Pop |
|
154 | c2a03789 | Iustin Pop | This is used on the client side.
|
155 | c2a03789 | Iustin Pop |
|
156 | c2a03789 | Iustin Pop | This could be replace by any other class that provides the same
|
157 | c2a03789 | Iustin Pop | semantics to the Client. This means:
|
158 | c2a03789 | Iustin Pop | - can send messages and receive messages
|
159 | c2a03789 | Iustin Pop | - safe for multithreading
|
160 | c2a03789 | Iustin Pop |
|
161 | c2a03789 | Iustin Pop | """
|
162 | c2a03789 | Iustin Pop | |
163 | 25942a6c | Guido Trotter | def __init__(self, address, timeouts=None): |
164 | c2a03789 | Iustin Pop | """Constructor for the Client class.
|
165 | c2a03789 | Iustin Pop |
|
166 | c2a03789 | Iustin Pop | Arguments:
|
167 | c2a03789 | Iustin Pop | - address: a valid address the the used transport class
|
168 | c2a03789 | Iustin Pop | - timeout: a list of timeouts, to be used on connect and read/write
|
169 | c2a03789 | Iustin Pop |
|
170 | c2a03789 | Iustin Pop | There are two timeouts used since we might want to wait for a long
|
171 | c2a03789 | Iustin Pop | time for a response, but the connect timeout should be lower.
|
172 | c2a03789 | Iustin Pop |
|
173 | c2a03789 | Iustin Pop | If not passed, we use a default of 10 and respectively 60 seconds.
|
174 | c2a03789 | Iustin Pop |
|
175 | c2a03789 | Iustin Pop | Note that on reading data, since the timeout applies to an
|
176 | c2a03789 | Iustin Pop | invidual receive, it might be that the total duration is longer
|
177 | c2a03789 | Iustin Pop | than timeout value passed (we make a hard limit at twice the read
|
178 | c2a03789 | Iustin Pop | timeout).
|
179 | c2a03789 | Iustin Pop |
|
180 | c2a03789 | Iustin Pop | """
|
181 | c2a03789 | Iustin Pop | self.address = address
|
182 | c2a03789 | Iustin Pop | if timeouts is None: |
183 | c2a03789 | Iustin Pop | self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO |
184 | c2a03789 | Iustin Pop | else:
|
185 | c2a03789 | Iustin Pop | self._ctimeout, self._rwtimeout = timeouts |
186 | c2a03789 | Iustin Pop | |
187 | c2a03789 | Iustin Pop | self.socket = None |
188 | c2a03789 | Iustin Pop | self._buffer = "" |
189 | c2a03789 | Iustin Pop | self._msgs = collections.deque()
|
190 | c2a03789 | Iustin Pop | |
191 | c2a03789 | Iustin Pop | try:
|
192 | c2a03789 | Iustin Pop | self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
193 | cb462b06 | Michael Hanselmann | |
194 | cb462b06 | Michael Hanselmann | # Try to connect
|
195 | c2a03789 | Iustin Pop | try:
|
196 | cb462b06 | Michael Hanselmann | utils.Retry(self._Connect, 1.0, self._ctimeout, |
197 | cb462b06 | Michael Hanselmann | args=(self.socket, address, self._ctimeout)) |
198 | cb462b06 | Michael Hanselmann | except utils.RetryTimeout:
|
199 | cb462b06 | Michael Hanselmann | raise TimeoutError("Connect timed out") |
200 | cb462b06 | Michael Hanselmann | |
201 | c2a03789 | Iustin Pop | self.socket.settimeout(self._rwtimeout) |
202 | 03a8dbdc | Iustin Pop | except (socket.error, NoMasterError):
|
203 | c2a03789 | Iustin Pop | if self.socket is not None: |
204 | c2a03789 | Iustin Pop | self.socket.close()
|
205 | c2a03789 | Iustin Pop | self.socket = None |
206 | c2a03789 | Iustin Pop | raise
|
207 | c2a03789 | Iustin Pop | |
208 | cb462b06 | Michael Hanselmann | @staticmethod
|
209 | cb462b06 | Michael Hanselmann | def _Connect(sock, address, timeout): |
210 | cb462b06 | Michael Hanselmann | sock.settimeout(timeout) |
211 | cb462b06 | Michael Hanselmann | try:
|
212 | cb462b06 | Michael Hanselmann | sock.connect(address) |
213 | cb462b06 | Michael Hanselmann | except socket.timeout, err:
|
214 | cb462b06 | Michael Hanselmann | raise TimeoutError("Connect timed out: %s" % str(err)) |
215 | cb462b06 | Michael Hanselmann | except socket.error, err:
|
216 | 5a1c22fe | Iustin Pop | error_code = err.args[0]
|
217 | 5a1c22fe | Iustin Pop | if error_code in (errno.ENOENT, errno.ECONNREFUSED): |
218 | cb462b06 | Michael Hanselmann | raise NoMasterError(address)
|
219 | 5a1c22fe | Iustin Pop | elif error_code in (errno.EPERM, errno.EACCES): |
220 | 5a1c22fe | Iustin Pop | raise PermissionError(address)
|
221 | 5a1c22fe | Iustin Pop | elif error_code == errno.EAGAIN:
|
222 | cb462b06 | Michael Hanselmann | # Server's socket backlog is full at the moment
|
223 | cb462b06 | Michael Hanselmann | raise utils.RetryAgain()
|
224 | cb462b06 | Michael Hanselmann | raise
|
225 | cb462b06 | Michael Hanselmann | |
226 | c2a03789 | Iustin Pop | def _CheckSocket(self): |
227 | c2a03789 | Iustin Pop | """Make sure we are connected.
|
228 | c2a03789 | Iustin Pop |
|
229 | c2a03789 | Iustin Pop | """
|
230 | c2a03789 | Iustin Pop | if self.socket is None: |
231 | c2a03789 | Iustin Pop | raise ProtocolError("Connection is closed") |
232 | c2a03789 | Iustin Pop | |
233 | c2a03789 | Iustin Pop | def Send(self, msg): |
234 | c2a03789 | Iustin Pop | """Send a message.
|
235 | c2a03789 | Iustin Pop |
|
236 | c2a03789 | Iustin Pop | This just sends a message and doesn't wait for the response.
|
237 | c2a03789 | Iustin Pop |
|
238 | c2a03789 | Iustin Pop | """
|
239 | 25942a6c | Guido Trotter | if constants.LUXI_EOM in msg: |
240 | 797506fc | Michael Hanselmann | raise ProtocolError("Message terminator found in payload") |
241 | 797506fc | Michael Hanselmann | |
242 | c2a03789 | Iustin Pop | self._CheckSocket()
|
243 | c2a03789 | Iustin Pop | try:
|
244 | 6096ee13 | Michael Hanselmann | # TODO: sendall is not guaranteed to send everything
|
245 | 25942a6c | Guido Trotter | self.socket.sendall(msg + constants.LUXI_EOM)
|
246 | c2a03789 | Iustin Pop | except socket.timeout, err:
|
247 | c2a03789 | Iustin Pop | raise TimeoutError("Sending timeout: %s" % str(err)) |
248 | c2a03789 | Iustin Pop | |
249 | c2a03789 | Iustin Pop | def Recv(self): |
250 | 5bbd3f7f | Michael Hanselmann | """Try to receive a message from the socket.
|
251 | c2a03789 | Iustin Pop |
|
252 | c2a03789 | Iustin Pop | In case we already have messages queued, we just return from the
|
253 | c2a03789 | Iustin Pop | queue. Otherwise, we try to read data with a _rwtimeout network
|
254 | c2a03789 | Iustin Pop | timeout, and making sure we don't go over 2x_rwtimeout as a global
|
255 | c2a03789 | Iustin Pop | limit.
|
256 | c2a03789 | Iustin Pop |
|
257 | c2a03789 | Iustin Pop | """
|
258 | c2a03789 | Iustin Pop | self._CheckSocket()
|
259 | c2a03789 | Iustin Pop | etime = time.time() + self._rwtimeout
|
260 | c2a03789 | Iustin Pop | while not self._msgs: |
261 | c2a03789 | Iustin Pop | if time.time() > etime:
|
262 | c2a03789 | Iustin Pop | raise TimeoutError("Extended receive timeout") |
263 | 6096ee13 | Michael Hanselmann | while True: |
264 | 6096ee13 | Michael Hanselmann | try:
|
265 | 6096ee13 | Michael Hanselmann | data = self.socket.recv(4096) |
266 | 28e3e216 | Michael Hanselmann | except socket.timeout, err:
|
267 | 28e3e216 | Michael Hanselmann | raise TimeoutError("Receive timeout: %s" % str(err)) |
268 | 6096ee13 | Michael Hanselmann | except socket.error, err:
|
269 | 6096ee13 | Michael Hanselmann | if err.args and err.args[0] == errno.EAGAIN: |
270 | 6096ee13 | Michael Hanselmann | continue
|
271 | 6096ee13 | Michael Hanselmann | raise
|
272 | 6096ee13 | Michael Hanselmann | break
|
273 | c2a03789 | Iustin Pop | if not data: |
274 | c2a03789 | Iustin Pop | raise ConnectionClosedError("Connection closed while reading") |
275 | 25942a6c | Guido Trotter | new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
|
276 | c2a03789 | Iustin Pop | self._buffer = new_msgs.pop()
|
277 | c2a03789 | Iustin Pop | self._msgs.extend(new_msgs)
|
278 | c2a03789 | Iustin Pop | return self._msgs.popleft() |
279 | c2a03789 | Iustin Pop | |
280 | c2a03789 | Iustin Pop | def Call(self, msg): |
281 | c2a03789 | Iustin Pop | """Send a message and wait for the response.
|
282 | c2a03789 | Iustin Pop |
|
283 | c2a03789 | Iustin Pop | This is just a wrapper over Send and Recv.
|
284 | c2a03789 | Iustin Pop |
|
285 | c2a03789 | Iustin Pop | """
|
286 | c2a03789 | Iustin Pop | self.Send(msg)
|
287 | c2a03789 | Iustin Pop | return self.Recv() |
288 | c2a03789 | Iustin Pop | |
289 | c2a03789 | Iustin Pop | def Close(self): |
290 | c2a03789 | Iustin Pop | """Close the socket"""
|
291 | c2a03789 | Iustin Pop | if self.socket is not None: |
292 | c2a03789 | Iustin Pop | self.socket.close()
|
293 | c2a03789 | Iustin Pop | self.socket = None |
294 | c2a03789 | Iustin Pop | |
295 | c2a03789 | Iustin Pop | |
296 | 231db3a5 | Michael Hanselmann | def ParseRequest(msg): |
297 | 231db3a5 | Michael Hanselmann | """Parses a LUXI request message.
|
298 | 231db3a5 | Michael Hanselmann |
|
299 | 231db3a5 | Michael Hanselmann | """
|
300 | 231db3a5 | Michael Hanselmann | try:
|
301 | 231db3a5 | Michael Hanselmann | request = serializer.LoadJson(msg) |
302 | 231db3a5 | Michael Hanselmann | except ValueError, err: |
303 | 231db3a5 | Michael Hanselmann | raise ProtocolError("Invalid LUXI request (parsing error): %s" % err) |
304 | 231db3a5 | Michael Hanselmann | |
305 | 231db3a5 | Michael Hanselmann | logging.debug("LUXI request: %s", request)
|
306 | 231db3a5 | Michael Hanselmann | |
307 | 231db3a5 | Michael Hanselmann | if not isinstance(request, dict): |
308 | 231db3a5 | Michael Hanselmann | logging.error("LUXI request not a dict: %r", msg)
|
309 | 231db3a5 | Michael Hanselmann | raise ProtocolError("Invalid LUXI request (not a dict)") |
310 | 231db3a5 | Michael Hanselmann | |
311 | b459a848 | Andrea Spadaccini | method = request.get(KEY_METHOD, None) # pylint: disable=E1103 |
312 | b459a848 | Andrea Spadaccini | args = request.get(KEY_ARGS, None) # pylint: disable=E1103 |
313 | b459a848 | Andrea Spadaccini | version = request.get(KEY_VERSION, None) # pylint: disable=E1103 |
314 | e7a25b08 | Guido Trotter | |
315 | 231db3a5 | Michael Hanselmann | if method is None or args is None: |
316 | 231db3a5 | Michael Hanselmann | logging.error("LUXI request missing method or arguments: %r", msg)
|
317 | 231db3a5 | Michael Hanselmann | raise ProtocolError(("Invalid LUXI request (no method or arguments" |
318 | 231db3a5 | Michael Hanselmann | " in request): %r") % msg)
|
319 | 231db3a5 | Michael Hanselmann | |
320 | e986f20c | Michael Hanselmann | return (method, args, version)
|
321 | 231db3a5 | Michael Hanselmann | |
322 | 231db3a5 | Michael Hanselmann | |
323 | 231db3a5 | Michael Hanselmann | def ParseResponse(msg): |
324 | 231db3a5 | Michael Hanselmann | """Parses a LUXI response message.
|
325 | 231db3a5 | Michael Hanselmann |
|
326 | 231db3a5 | Michael Hanselmann | """
|
327 | 231db3a5 | Michael Hanselmann | # Parse the result
|
328 | 231db3a5 | Michael Hanselmann | try:
|
329 | 231db3a5 | Michael Hanselmann | data = serializer.LoadJson(msg) |
330 | d143f2c6 | Iustin Pop | except KeyboardInterrupt: |
331 | d143f2c6 | Iustin Pop | raise
|
332 | 231db3a5 | Michael Hanselmann | except Exception, err: |
333 | 231db3a5 | Michael Hanselmann | raise ProtocolError("Error while deserializing response: %s" % str(err)) |
334 | 231db3a5 | Michael Hanselmann | |
335 | 231db3a5 | Michael Hanselmann | # Validate response
|
336 | 231db3a5 | Michael Hanselmann | if not (isinstance(data, dict) and |
337 | 231db3a5 | Michael Hanselmann | KEY_SUCCESS in data and |
338 | 231db3a5 | Michael Hanselmann | KEY_RESULT in data):
|
339 | 231db3a5 | Michael Hanselmann | raise ProtocolError("Invalid response from server: %r" % data) |
340 | 231db3a5 | Michael Hanselmann | |
341 | 2317945a | Guido Trotter | return (data[KEY_SUCCESS], data[KEY_RESULT],
|
342 | b459a848 | Andrea Spadaccini | data.get(KEY_VERSION, None)) # pylint: disable=E1103 |
343 | 231db3a5 | Michael Hanselmann | |
344 | 231db3a5 | Michael Hanselmann | |
345 | e986f20c | Michael Hanselmann | def FormatResponse(success, result, version=None): |
346 | 231db3a5 | Michael Hanselmann | """Formats a LUXI response message.
|
347 | 231db3a5 | Michael Hanselmann |
|
348 | 231db3a5 | Michael Hanselmann | """
|
349 | 231db3a5 | Michael Hanselmann | response = { |
350 | 231db3a5 | Michael Hanselmann | KEY_SUCCESS: success, |
351 | 231db3a5 | Michael Hanselmann | KEY_RESULT: result, |
352 | 231db3a5 | Michael Hanselmann | } |
353 | 231db3a5 | Michael Hanselmann | |
354 | e986f20c | Michael Hanselmann | if version is not None: |
355 | e986f20c | Michael Hanselmann | response[KEY_VERSION] = version |
356 | e986f20c | Michael Hanselmann | |
357 | 231db3a5 | Michael Hanselmann | logging.debug("LUXI response: %s", response)
|
358 | 231db3a5 | Michael Hanselmann | |
359 | 231db3a5 | Michael Hanselmann | return serializer.DumpJson(response)
|
360 | 231db3a5 | Michael Hanselmann | |
361 | 231db3a5 | Michael Hanselmann | |
362 | e986f20c | Michael Hanselmann | def FormatRequest(method, args, version=None): |
363 | 231db3a5 | Michael Hanselmann | """Formats a LUXI request message.
|
364 | 231db3a5 | Michael Hanselmann |
|
365 | 231db3a5 | Michael Hanselmann | """
|
366 | 231db3a5 | Michael Hanselmann | # Build request
|
367 | 231db3a5 | Michael Hanselmann | request = { |
368 | 231db3a5 | Michael Hanselmann | KEY_METHOD: method, |
369 | 231db3a5 | Michael Hanselmann | KEY_ARGS: args, |
370 | 231db3a5 | Michael Hanselmann | } |
371 | 231db3a5 | Michael Hanselmann | |
372 | e986f20c | Michael Hanselmann | if version is not None: |
373 | e986f20c | Michael Hanselmann | request[KEY_VERSION] = version |
374 | e986f20c | Michael Hanselmann | |
375 | 231db3a5 | Michael Hanselmann | # Serialize the request
|
376 | a182a3ed | Michael Hanselmann | return serializer.DumpJson(request)
|
377 | 231db3a5 | Michael Hanselmann | |
378 | 231db3a5 | Michael Hanselmann | |
379 | e986f20c | Michael Hanselmann | def CallLuxiMethod(transport_cb, method, args, version=None): |
380 | 231db3a5 | Michael Hanselmann | """Send a LUXI request via a transport and return the response.
|
381 | 231db3a5 | Michael Hanselmann |
|
382 | 231db3a5 | Michael Hanselmann | """
|
383 | 231db3a5 | Michael Hanselmann | assert callable(transport_cb) |
384 | 231db3a5 | Michael Hanselmann | |
385 | e986f20c | Michael Hanselmann | request_msg = FormatRequest(method, args, version=version) |
386 | 231db3a5 | Michael Hanselmann | |
387 | 231db3a5 | Michael Hanselmann | # Send request and wait for response
|
388 | 231db3a5 | Michael Hanselmann | response_msg = transport_cb(request_msg) |
389 | 231db3a5 | Michael Hanselmann | |
390 | e986f20c | Michael Hanselmann | (success, result, resp_version) = ParseResponse(response_msg) |
391 | e986f20c | Michael Hanselmann | |
392 | e986f20c | Michael Hanselmann | # Verify version if there was one in the response
|
393 | e986f20c | Michael Hanselmann | if resp_version is not None and resp_version != version: |
394 | e986f20c | Michael Hanselmann | raise errors.LuxiError("LUXI version mismatch, client %s, response %s" % |
395 | e986f20c | Michael Hanselmann | (version, resp_version)) |
396 | 231db3a5 | Michael Hanselmann | |
397 | 231db3a5 | Michael Hanselmann | if success:
|
398 | 231db3a5 | Michael Hanselmann | return result
|
399 | 231db3a5 | Michael Hanselmann | |
400 | 231db3a5 | Michael Hanselmann | errors.MaybeRaise(result) |
401 | 231db3a5 | Michael Hanselmann | raise RequestError(result)
|
402 | 231db3a5 | Michael Hanselmann | |
403 | 231db3a5 | Michael Hanselmann | |
404 | c2a03789 | Iustin Pop | class Client(object): |
405 | c2a03789 | Iustin Pop | """High-level client implementation.
|
406 | c2a03789 | Iustin Pop |
|
407 | c2a03789 | Iustin Pop | This uses a backing Transport-like class on top of which it
|
408 | c2a03789 | Iustin Pop | implements data serialization/deserialization.
|
409 | c2a03789 | Iustin Pop |
|
410 | c2a03789 | Iustin Pop | """
|
411 | ceab32dd | Iustin Pop | def __init__(self, address=None, timeouts=None, transport=Transport): |
412 | c2a03789 | Iustin Pop | """Constructor for the Client class.
|
413 | c2a03789 | Iustin Pop |
|
414 | c2a03789 | Iustin Pop | Arguments:
|
415 | c2a03789 | Iustin Pop | - address: a valid address the the used transport class
|
416 | c2a03789 | Iustin Pop | - timeout: a list of timeouts, to be used on connect and read/write
|
417 | c2a03789 | Iustin Pop | - transport: a Transport-like class
|
418 | c2a03789 | Iustin Pop |
|
419 | c2a03789 | Iustin Pop |
|
420 | c2a03789 | Iustin Pop | If timeout is not passed, the default timeouts of the transport
|
421 | c2a03789 | Iustin Pop | class are used.
|
422 | c2a03789 | Iustin Pop |
|
423 | c2a03789 | Iustin Pop | """
|
424 | ceab32dd | Iustin Pop | if address is None: |
425 | b87ee98f | Michael Hanselmann | address = pathutils.MASTER_SOCKET |
426 | 8d5b316c | Iustin Pop | self.address = address
|
427 | 8d5b316c | Iustin Pop | self.timeouts = timeouts
|
428 | 8d5b316c | Iustin Pop | self.transport_class = transport
|
429 | 8d5b316c | Iustin Pop | self.transport = None |
430 | 8d5b316c | Iustin Pop | self._InitTransport()
|
431 | 8d5b316c | Iustin Pop | |
432 | 8d5b316c | Iustin Pop | def _InitTransport(self): |
433 | 8d5b316c | Iustin Pop | """(Re)initialize the transport if needed.
|
434 | 8d5b316c | Iustin Pop |
|
435 | 8d5b316c | Iustin Pop | """
|
436 | 8d5b316c | Iustin Pop | if self.transport is None: |
437 | 8d5b316c | Iustin Pop | self.transport = self.transport_class(self.address, |
438 | 8d5b316c | Iustin Pop | timeouts=self.timeouts)
|
439 | 8d5b316c | Iustin Pop | |
440 | 8d5b316c | Iustin Pop | def _CloseTransport(self): |
441 | 8d5b316c | Iustin Pop | """Close the transport, ignoring errors.
|
442 | 8d5b316c | Iustin Pop |
|
443 | 8d5b316c | Iustin Pop | """
|
444 | 8d5b316c | Iustin Pop | if self.transport is None: |
445 | 8d5b316c | Iustin Pop | return
|
446 | 8d5b316c | Iustin Pop | try:
|
447 | 8d5b316c | Iustin Pop | old_transp = self.transport
|
448 | 8d5b316c | Iustin Pop | self.transport = None |
449 | 8d5b316c | Iustin Pop | old_transp.Close() |
450 | b459a848 | Andrea Spadaccini | except Exception: # pylint: disable=W0703 |
451 | 8d5b316c | Iustin Pop | pass
|
452 | c2a03789 | Iustin Pop | |
453 | 231db3a5 | Michael Hanselmann | def _SendMethodCall(self, data): |
454 | 3d8548c4 | Michael Hanselmann | # Send request and wait for response
|
455 | 8d5b316c | Iustin Pop | try:
|
456 | 8d5b316c | Iustin Pop | self._InitTransport()
|
457 | 231db3a5 | Michael Hanselmann | return self.transport.Call(data) |
458 | 8d5b316c | Iustin Pop | except Exception: |
459 | 8d5b316c | Iustin Pop | self._CloseTransport()
|
460 | 8d5b316c | Iustin Pop | raise
|
461 | 8d5b316c | Iustin Pop | |
462 | 2a917701 | Michael Hanselmann | def Close(self): |
463 | 2a917701 | Michael Hanselmann | """Close the underlying connection.
|
464 | 2a917701 | Michael Hanselmann |
|
465 | 2a917701 | Michael Hanselmann | """
|
466 | 2a917701 | Michael Hanselmann | self._CloseTransport()
|
467 | 2a917701 | Michael Hanselmann | |
468 | 231db3a5 | Michael Hanselmann | def CallMethod(self, method, args): |
469 | 231db3a5 | Michael Hanselmann | """Send a generic request and return the response.
|
470 | 3d8548c4 | Michael Hanselmann |
|
471 | 231db3a5 | Michael Hanselmann | """
|
472 | a629ecb9 | Iustin Pop | if not isinstance(args, (list, tuple)): |
473 | a629ecb9 | Iustin Pop | raise errors.ProgrammerError("Invalid parameter passed to CallMethod:" |
474 | a629ecb9 | Iustin Pop | " expected list, got %s" % type(args)) |
475 | e986f20c | Michael Hanselmann | return CallLuxiMethod(self._SendMethodCall, method, args, |
476 | e986f20c | Michael Hanselmann | version=constants.LUXI_VERSION) |
477 | c2a03789 | Iustin Pop | |
478 | 3ccafd0e | Iustin Pop | def SetQueueDrainFlag(self, drain_flag): |
479 | 83c046a2 | Iustin Pop | return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, )) |
480 | 3ccafd0e | Iustin Pop | |
481 | 05e50653 | Michael Hanselmann | def SetWatcherPause(self, until): |
482 | a629ecb9 | Iustin Pop | return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, )) |
483 | 05e50653 | Michael Hanselmann | |
484 | d9d1e541 | Klaus Aehlig | def PickupJob(self, job): |
485 | d9d1e541 | Klaus Aehlig | return self.CallMethod(REQ_PICKUP_JOB, (job,)) |
486 | d9d1e541 | Klaus Aehlig | |
487 | 0bbe448c | Michael Hanselmann | def SubmitJob(self, ops): |
488 | 0bbe448c | Michael Hanselmann | ops_state = map(lambda op: op.__getstate__(), ops) |
489 | 734a2a7c | René Nussbaumer | return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, )) |
490 | 0bbe448c | Michael Hanselmann | |
491 | 346c3037 | Klaus Aehlig | def SubmitJobToDrainedQueue(self, ops): |
492 | 346c3037 | Klaus Aehlig | ops_state = map(lambda op: op.__getstate__(), ops) |
493 | 346c3037 | Klaus Aehlig | return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, )) |
494 | 346c3037 | Klaus Aehlig | |
495 | 2971c913 | Iustin Pop | def SubmitManyJobs(self, jobs): |
496 | 2971c913 | Iustin Pop | jobs_state = [] |
497 | 2971c913 | Iustin Pop | for ops in jobs: |
498 | 2971c913 | Iustin Pop | jobs_state.append([op.__getstate__() for op in ops]) |
499 | 734a2a7c | René Nussbaumer | return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, )) |
500 | 2971c913 | Iustin Pop | |
501 | 0bbe448c | Michael Hanselmann | def CancelJob(self, job_id): |
502 | a629ecb9 | Iustin Pop | return self.CallMethod(REQ_CANCEL_JOB, (job_id, )) |
503 | 0bbe448c | Michael Hanselmann | |
504 | 0bbe448c | Michael Hanselmann | def ArchiveJob(self, job_id): |
505 | a629ecb9 | Iustin Pop | return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, )) |
506 | 0bbe448c | Michael Hanselmann | |
507 | f63ffb37 | Michael Hanselmann | def ChangeJobPriority(self, job_id, priority): |
508 | f63ffb37 | Michael Hanselmann | return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority)) |
509 | f63ffb37 | Michael Hanselmann | |
510 | 07cd723a | Iustin Pop | def AutoArchiveJobs(self, age): |
511 | f8ad5591 | Michael Hanselmann | timeout = (DEF_RWTO - 1) / 2 |
512 | 83c046a2 | Iustin Pop | return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout)) |
513 | 07cd723a | Iustin Pop | |
514 | f4484122 | Michael Hanselmann | def WaitForJobChangeOnce(self, job_id, fields, |
515 | 793a8f7c | Michael Hanselmann | prev_job_info, prev_log_serial, |
516 | 793a8f7c | Michael Hanselmann | timeout=WFJC_TIMEOUT): |
517 | 793a8f7c | Michael Hanselmann | """Waits for changes on a job.
|
518 | 793a8f7c | Michael Hanselmann |
|
519 | 793a8f7c | Michael Hanselmann | @param job_id: Job ID
|
520 | 793a8f7c | Michael Hanselmann | @type fields: list
|
521 | 793a8f7c | Michael Hanselmann | @param fields: List of field names to be observed
|
522 | 793a8f7c | Michael Hanselmann | @type prev_job_info: None or list
|
523 | 793a8f7c | Michael Hanselmann | @param prev_job_info: Previously received job information
|
524 | 793a8f7c | Michael Hanselmann | @type prev_log_serial: None or int/long
|
525 | 793a8f7c | Michael Hanselmann | @param prev_log_serial: Highest log serial number previously received
|
526 | 793a8f7c | Michael Hanselmann | @type timeout: int/float
|
527 | 793a8f7c | Michael Hanselmann | @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
|
528 | 793a8f7c | Michael Hanselmann | be capped to that value)
|
529 | 793a8f7c | Michael Hanselmann |
|
530 | 793a8f7c | Michael Hanselmann | """
|
531 | 793a8f7c | Michael Hanselmann | assert timeout >= 0, "Timeout can not be negative" |
532 | f4484122 | Michael Hanselmann | return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, |
533 | f4484122 | Michael Hanselmann | (job_id, fields, prev_job_info, |
534 | 793a8f7c | Michael Hanselmann | prev_log_serial, |
535 | 793a8f7c | Michael Hanselmann | min(WFJC_TIMEOUT, timeout)))
|
536 | f4484122 | Michael Hanselmann | |
537 | f4484122 | Michael Hanselmann | def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial): |
538 | 5c735209 | Iustin Pop | while True: |
539 | f4484122 | Michael Hanselmann | result = self.WaitForJobChangeOnce(job_id, fields,
|
540 | f4484122 | Michael Hanselmann | prev_job_info, prev_log_serial) |
541 | 5c735209 | Iustin Pop | if result != constants.JOB_NOTCHANGED:
|
542 | 5c735209 | Iustin Pop | break
|
543 | 5c735209 | Iustin Pop | return result
|
544 | dfe57c22 | Michael Hanselmann | |
545 | 2e5c33db | Iustin Pop | def Query(self, what, fields, qfilter): |
546 | 28b71a76 | Michael Hanselmann | """Query for resources/items.
|
547 | 28b71a76 | Michael Hanselmann |
|
548 | abd66bf8 | Michael Hanselmann | @param what: One of L{constants.QR_VIA_LUXI}
|
549 | 28b71a76 | Michael Hanselmann | @type fields: List of strings
|
550 | 28b71a76 | Michael Hanselmann | @param fields: List of requested fields
|
551 | 2e5c33db | Iustin Pop | @type qfilter: None or list
|
552 | 2e5c33db | Iustin Pop | @param qfilter: Query filter
|
553 | 28b71a76 | Michael Hanselmann | @rtype: L{objects.QueryResponse}
|
554 | 28b71a76 | Michael Hanselmann |
|
555 | 28b71a76 | Michael Hanselmann | """
|
556 | a629ecb9 | Iustin Pop | result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
|
557 | 28b71a76 | Michael Hanselmann | return objects.QueryResponse.FromDict(result)
|
558 | 28b71a76 | Michael Hanselmann | |
559 | 28b71a76 | Michael Hanselmann | def QueryFields(self, what, fields): |
560 | 28b71a76 | Michael Hanselmann | """Query for available fields.
|
561 | 28b71a76 | Michael Hanselmann |
|
562 | abd66bf8 | Michael Hanselmann | @param what: One of L{constants.QR_VIA_LUXI}
|
563 | 28b71a76 | Michael Hanselmann | @type fields: None or list of strings
|
564 | 28b71a76 | Michael Hanselmann | @param fields: List of requested fields
|
565 | 28b71a76 | Michael Hanselmann | @rtype: L{objects.QueryFieldsResponse}
|
566 | 28b71a76 | Michael Hanselmann |
|
567 | 28b71a76 | Michael Hanselmann | """
|
568 | a629ecb9 | Iustin Pop | result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
|
569 | 28b71a76 | Michael Hanselmann | return objects.QueryFieldsResponse.FromDict(result)
|
570 | 28b71a76 | Michael Hanselmann | |
571 | 0bbe448c | Michael Hanselmann | def QueryJobs(self, job_ids, fields): |
572 | 0bbe448c | Michael Hanselmann | return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields)) |
573 | 3d8548c4 | Michael Hanselmann | |
574 | ec79568d | Iustin Pop | def QueryInstances(self, names, fields, use_locking): |
575 | ec79568d | Iustin Pop | return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking)) |
576 | ee6c7b94 | Michael Hanselmann | |
577 | ec79568d | Iustin Pop | def QueryNodes(self, names, fields, use_locking): |
578 | ec79568d | Iustin Pop | return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking)) |
579 | 02f7fe54 | Michael Hanselmann | |
580 | a79ef2a5 | Adeodato Simo | def QueryGroups(self, names, fields, use_locking): |
581 | a79ef2a5 | Adeodato Simo | return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking)) |
582 | a79ef2a5 | Adeodato Simo | |
583 | 306bed0e | Apollon Oikonomopoulos | def QueryNetworks(self, names, fields, use_locking): |
584 | 306bed0e | Apollon Oikonomopoulos | return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking)) |
585 | 306bed0e | Apollon Oikonomopoulos | |
586 | ec79568d | Iustin Pop | def QueryExports(self, nodes, use_locking): |
587 | ec79568d | Iustin Pop | return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking)) |
588 | 32f93223 | Michael Hanselmann | |
589 | 66baeccc | Iustin Pop | def QueryClusterInfo(self): |
590 | 66baeccc | Iustin Pop | return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ()) |
591 | 66baeccc | Iustin Pop | |
592 | ae5849b5 | Michael Hanselmann | def QueryConfigValues(self, fields): |
593 | a629ecb9 | Iustin Pop | return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, )) |
594 | ae5849b5 | Michael Hanselmann | |
595 | 7699c3af | Iustin Pop | def QueryTags(self, kind, name): |
596 | 7699c3af | Iustin Pop | return self.CallMethod(REQ_QUERY_TAGS, (kind, name)) |