4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42 from ganeti import objects
47 KEY_SUCCESS = "success"
49 KEY_VERSION = "version"
51 REQ_SUBMIT_JOB = "SubmitJob"
52 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
53 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
54 REQ_CANCEL_JOB = "CancelJob"
55 REQ_ARCHIVE_JOB = "ArchiveJob"
56 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
58 REQ_QUERY_FIELDS = "QueryFields"
59 REQ_QUERY_JOBS = "QueryJobs"
60 REQ_QUERY_INSTANCES = "QueryInstances"
61 REQ_QUERY_NODES = "QueryNodes"
62 REQ_QUERY_GROUPS = "QueryGroups"
63 REQ_QUERY_EXPORTS = "QueryExports"
64 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
65 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
66 REQ_QUERY_TAGS = "QueryTags"
67 REQ_QUERY_LOCKS = "QueryLocks"
68 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
69 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
74 # WaitForJobChange timeout
75 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
78 class ProtocolError(errors.LuxiError):
79 """Denotes an error in the LUXI protocol."""
82 class ConnectionClosedError(ProtocolError):
83 """Connection closed error."""
86 class TimeoutError(ProtocolError):
87 """Operation timeout error."""
90 class RequestError(ProtocolError):
93 This signifies an error in the request format or request handling,
94 but not (e.g.) an error in starting up an instance.
96 Some common conditions that can trigger this exception:
97 - job submission failed because the job data was wrong
98 - query failed because required fields were missing
103 class NoMasterError(ProtocolError):
104 """The master cannot be reached.
106 This means that the master daemon is not running or the socket has
112 class PermissionError(ProtocolError):
113 """Permission denied while connecting to the master socket.
115 This means the user doesn't have the proper rights.
121 """Low-level transport class.
123 This is used on the client side.
125 This could be replace by any other class that provides the same
126 semantics to the Client. This means:
127 - can send messages and receive messages
128 - safe for multithreading
132 def __init__(self, address, timeouts=None):
133 """Constructor for the Client class.
136 - address: a valid address the the used transport class
137 - timeout: a list of timeouts, to be used on connect and read/write
139 There are two timeouts used since we might want to wait for a long
140 time for a response, but the connect timeout should be lower.
142 If not passed, we use a default of 10 and respectively 60 seconds.
144 Note that on reading data, since the timeout applies to an
145 invidual receive, it might be that the total duration is longer
146 than timeout value passed (we make a hard limit at twice the read
150 self.address = address
152 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
154 self._ctimeout, self._rwtimeout = timeouts
158 self._msgs = collections.deque()
161 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
165 utils.Retry(self._Connect, 1.0, self._ctimeout,
166 args=(self.socket, address, self._ctimeout))
167 except utils.RetryTimeout:
168 raise TimeoutError("Connect timed out")
170 self.socket.settimeout(self._rwtimeout)
171 except (socket.error, NoMasterError):
172 if self.socket is not None:
178 def _Connect(sock, address, timeout):
179 sock.settimeout(timeout)
181 sock.connect(address)
182 except socket.timeout, err:
183 raise TimeoutError("Connect timed out: %s" % str(err))
184 except socket.error, err:
185 error_code = err.args[0]
186 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
187 raise NoMasterError(address)
188 elif error_code in (errno.EPERM, errno.EACCES):
189 raise PermissionError(address)
190 elif error_code == errno.EAGAIN:
191 # Server's socket backlog is full at the moment
192 raise utils.RetryAgain()
195 def _CheckSocket(self):
196 """Make sure we are connected.
199 if self.socket is None:
200 raise ProtocolError("Connection is closed")
205 This just sends a message and doesn't wait for the response.
208 if constants.LUXI_EOM in msg:
209 raise ProtocolError("Message terminator found in payload")
213 # TODO: sendall is not guaranteed to send everything
214 self.socket.sendall(msg + constants.LUXI_EOM)
215 except socket.timeout, err:
216 raise TimeoutError("Sending timeout: %s" % str(err))
219 """Try to receive a message from the socket.
221 In case we already have messages queued, we just return from the
222 queue. Otherwise, we try to read data with a _rwtimeout network
223 timeout, and making sure we don't go over 2x_rwtimeout as a global
228 etime = time.time() + self._rwtimeout
229 while not self._msgs:
230 if time.time() > etime:
231 raise TimeoutError("Extended receive timeout")
234 data = self.socket.recv(4096)
235 except socket.timeout, err:
236 raise TimeoutError("Receive timeout: %s" % str(err))
237 except socket.error, err:
238 if err.args and err.args[0] == errno.EAGAIN:
243 raise ConnectionClosedError("Connection closed while reading")
244 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
245 self._buffer = new_msgs.pop()
246 self._msgs.extend(new_msgs)
247 return self._msgs.popleft()
250 """Send a message and wait for the response.
252 This is just a wrapper over Send and Recv.
259 """Close the socket"""
260 if self.socket is not None:
265 def ParseRequest(msg):
266 """Parses a LUXI request message.
270 request = serializer.LoadJson(msg)
271 except ValueError, err:
272 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
274 logging.debug("LUXI request: %s", request)
276 if not isinstance(request, dict):
277 logging.error("LUXI request not a dict: %r", msg)
278 raise ProtocolError("Invalid LUXI request (not a dict)")
280 method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
281 args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
282 version = request.get(KEY_VERSION, None) # pylint: disable-msg=E1103
284 if method is None or args is None:
285 logging.error("LUXI request missing method or arguments: %r", msg)
286 raise ProtocolError(("Invalid LUXI request (no method or arguments"
287 " in request): %r") % msg)
289 return (method, args, version)
292 def ParseResponse(msg):
293 """Parses a LUXI response message.
298 data = serializer.LoadJson(msg)
299 except Exception, err:
300 raise ProtocolError("Error while deserializing response: %s" % str(err))
303 if not (isinstance(data, dict) and
304 KEY_SUCCESS in data and
306 raise ProtocolError("Invalid response from server: %r" % data)
308 return (data[KEY_SUCCESS], data[KEY_RESULT],
309 data.get(KEY_VERSION, None)) # pylint: disable-msg=E1103
312 def FormatResponse(success, result, version=None):
313 """Formats a LUXI response message.
317 KEY_SUCCESS: success,
321 if version is not None:
322 response[KEY_VERSION] = version
324 logging.debug("LUXI response: %s", response)
326 return serializer.DumpJson(response)
329 def FormatRequest(method, args, version=None):
330 """Formats a LUXI request message.
339 if version is not None:
340 request[KEY_VERSION] = version
342 # Serialize the request
343 return serializer.DumpJson(request, indent=False)
346 def CallLuxiMethod(transport_cb, method, args, version=None):
347 """Send a LUXI request via a transport and return the response.
350 assert callable(transport_cb)
352 request_msg = FormatRequest(method, args, version=version)
354 # Send request and wait for response
355 response_msg = transport_cb(request_msg)
357 (success, result, resp_version) = ParseResponse(response_msg)
359 # Verify version if there was one in the response
360 if resp_version is not None and resp_version != version:
361 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
362 (version, resp_version))
367 errors.MaybeRaise(result)
368 raise RequestError(result)
371 class Client(object):
372 """High-level client implementation.
374 This uses a backing Transport-like class on top of which it
375 implements data serialization/deserialization.
378 def __init__(self, address=None, timeouts=None, transport=Transport):
379 """Constructor for the Client class.
382 - address: a valid address the the used transport class
383 - timeout: a list of timeouts, to be used on connect and read/write
384 - transport: a Transport-like class
387 If timeout is not passed, the default timeouts of the transport
392 address = constants.MASTER_SOCKET
393 self.address = address
394 self.timeouts = timeouts
395 self.transport_class = transport
396 self.transport = None
397 self._InitTransport()
399 def _InitTransport(self):
400 """(Re)initialize the transport if needed.
403 if self.transport is None:
404 self.transport = self.transport_class(self.address,
405 timeouts=self.timeouts)
407 def _CloseTransport(self):
408 """Close the transport, ignoring errors.
411 if self.transport is None:
414 old_transp = self.transport
415 self.transport = None
417 except Exception: # pylint: disable-msg=W0703
420 def _SendMethodCall(self, data):
421 # Send request and wait for response
423 self._InitTransport()
424 return self.transport.Call(data)
426 self._CloseTransport()
429 def CallMethod(self, method, args):
430 """Send a generic request and return the response.
433 return CallLuxiMethod(self._SendMethodCall, method, args,
434 version=constants.LUXI_VERSION)
436 def SetQueueDrainFlag(self, drain_flag):
437 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
439 def SetWatcherPause(self, until):
440 return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
442 def SubmitJob(self, ops):
443 ops_state = map(lambda op: op.__getstate__(), ops)
444 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
446 def SubmitManyJobs(self, jobs):
449 jobs_state.append([op.__getstate__() for op in ops])
450 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
452 def CancelJob(self, job_id):
453 return self.CallMethod(REQ_CANCEL_JOB, job_id)
455 def ArchiveJob(self, job_id):
456 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
458 def AutoArchiveJobs(self, age):
459 timeout = (DEF_RWTO - 1) / 2
460 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
462 def WaitForJobChangeOnce(self, job_id, fields,
463 prev_job_info, prev_log_serial,
464 timeout=WFJC_TIMEOUT):
465 """Waits for changes on a job.
467 @param job_id: Job ID
469 @param fields: List of field names to be observed
470 @type prev_job_info: None or list
471 @param prev_job_info: Previously received job information
472 @type prev_log_serial: None or int/long
473 @param prev_log_serial: Highest log serial number previously received
474 @type timeout: int/float
475 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
476 be capped to that value)
479 assert timeout >= 0, "Timeout can not be negative"
480 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
481 (job_id, fields, prev_job_info,
483 min(WFJC_TIMEOUT, timeout)))
485 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
487 result = self.WaitForJobChangeOnce(job_id, fields,
488 prev_job_info, prev_log_serial)
489 if result != constants.JOB_NOTCHANGED:
493 def Query(self, what, fields, filter_):
494 """Query for resources/items.
496 @param what: One of L{constants.QR_OP_LUXI}
497 @type fields: List of strings
498 @param fields: List of requested fields
499 @type filter_: None or list
500 @param filter_: Query filter
501 @rtype: L{objects.QueryResponse}
504 req = objects.QueryRequest(what=what, fields=fields, filter=filter_)
505 result = self.CallMethod(REQ_QUERY, req.ToDict())
506 return objects.QueryResponse.FromDict(result)
508 def QueryFields(self, what, fields):
509 """Query for available fields.
511 @param what: One of L{constants.QR_OP_LUXI}
512 @type fields: None or list of strings
513 @param fields: List of requested fields
514 @rtype: L{objects.QueryFieldsResponse}
517 req = objects.QueryFieldsRequest(what=what, fields=fields)
518 result = self.CallMethod(REQ_QUERY_FIELDS, req.ToDict())
519 return objects.QueryFieldsResponse.FromDict(result)
521 def QueryJobs(self, job_ids, fields):
522 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
524 def QueryInstances(self, names, fields, use_locking):
525 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
527 def QueryNodes(self, names, fields, use_locking):
528 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
530 def QueryGroups(self, names, fields, use_locking):
531 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
533 def QueryExports(self, nodes, use_locking):
534 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
536 def QueryClusterInfo(self):
537 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
539 def QueryConfigValues(self, fields):
540 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
542 def QueryTags(self, kind, name):
543 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
545 def QueryLocks(self, fields, sync):
546 return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))