4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
51 # Special module generated at build time
52 from ganeti import _generated_rpc
54 # pylint has a bug here, doesn't see this import
55 import ganeti.http.client # pylint: disable=W0611
58 # Timeout for connecting to nodes (seconds)
59 _RPC_CONNECT_TIMEOUT = 5
61 _RPC_CLIENT_HEADERS = [
62 "Content-type: %s" % http.HTTP_APP_JSON,
66 # Various time constants for the timeout table
67 _TMO_URGENT = 60 # one minute
68 _TMO_FAST = 5 * 60 # five minutes
69 _TMO_NORMAL = 15 * 60 # 15 minutes
70 _TMO_SLOW = 3600 # one hour
74 #: Special value to describe an offline host
79 """Initializes the module-global HTTP client manager.
81 Must be called before using any RPC function and while exactly one thread is
85 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
86 # one thread running. This check is just a safety measure -- it doesn't
88 assert threading.activeCount() == 1, \
89 "Found more than one active thread when initializing pycURL"
91 logging.info("Using PycURL %s", pycurl.version)
93 pycurl.global_init(pycurl.GLOBAL_ALL)
97 """Stops the module-global HTTP client manager.
99 Must be called before quitting the program and while exactly one thread is
103 pycurl.global_cleanup()
106 def _ConfigRpcCurl(curl):
107 noded_cert = str(constants.NODED_CERT_FILE)
109 curl.setopt(pycurl.FOLLOWLOCATION, False)
110 curl.setopt(pycurl.CAINFO, noded_cert)
111 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
112 curl.setopt(pycurl.SSL_VERIFYPEER, True)
113 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
114 curl.setopt(pycurl.SSLCERT, noded_cert)
115 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
116 curl.setopt(pycurl.SSLKEY, noded_cert)
117 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
121 """RPC-wrapper decorator.
123 When applied to a function, it runs it with the RPC system
124 initialized, and it shutsdown the system afterwards. This means the
125 function must be called without RPC being initialized.
128 def wrapper(*args, **kwargs):
131 return fn(*args, **kwargs)
138 """Compresses a string for transport over RPC.
140 Small amounts of data are not compressed.
145 @return: Encoded data to send
148 # Small amounts of data are not compressed
150 return (constants.RPC_ENCODING_NONE, data)
152 # Compress with zlib and encode in base64
153 return (constants.RPC_ENCODING_ZLIB_BASE64,
154 base64.b64encode(zlib.compress(data, 3)))
157 class RpcResult(object):
160 This class holds an RPC result. It is needed since in multi-node
161 calls we can't raise an exception just because one one out of many
162 failed, and therefore we use this class to encapsulate the result.
164 @ivar data: the data payload, for successful results, or None
165 @ivar call: the name of the RPC call
166 @ivar node: the name of the node to which we made the call
167 @ivar offline: whether the operation failed because the node was
168 offline, as opposed to actual failure; offline=True will always
169 imply failed=True, in order to allow simpler checking if
170 the user doesn't care about the exact failure mode
171 @ivar fail_msg: the error message if the call failed
174 def __init__(self, data=None, failed=False, offline=False,
175 call=None, node=None):
176 self.offline = offline
181 self.fail_msg = "Node is marked offline"
182 self.data = self.payload = None
184 self.fail_msg = self._EnsureErr(data)
185 self.data = self.payload = None
188 if not isinstance(self.data, (tuple, list)):
189 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
193 self.fail_msg = ("RPC layer error: invalid result length (%d), "
194 "expected 2" % len(self.data))
196 elif not self.data[0]:
197 self.fail_msg = self._EnsureErr(self.data[1])
202 self.payload = data[1]
204 for attr_name in ["call", "data", "fail_msg",
205 "node", "offline", "payload"]:
206 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
210 """Helper to ensure we return a 'True' value for error."""
214 return "No error information"
216 def Raise(self, msg, prereq=False, ecode=None):
217 """If the result has failed, raise an OpExecError.
219 This is used so that LU code doesn't have to check for each
220 result, but instead can call this function.
223 if not self.fail_msg:
226 if not msg: # one could pass None for default message
227 msg = ("Call '%s' to node '%s' has failed: %s" %
228 (self.call, self.node, self.fail_msg))
230 msg = "%s: %s" % (msg, self.fail_msg)
232 ec = errors.OpPrereqError
234 ec = errors.OpExecError
235 if ecode is not None:
239 raise ec(*args) # pylint: disable=W0142
242 def _SsconfResolver(node_list,
243 ssc=ssconf.SimpleStore,
244 nslookup_fn=netutils.Hostname.GetIP):
245 """Return addresses for given node names.
247 @type node_list: list
248 @param node_list: List of node names
250 @param ssc: SimpleStore class that is used to obtain node->ip mappings
251 @type nslookup_fn: callable
252 @param nslookup_fn: function use to do NS lookup
253 @rtype: list of tuple; (string, string)
254 @return: List of tuples containing node name and IP address
258 iplist = ss.GetNodePrimaryIPList()
259 family = ss.GetPrimaryIPFamily()
260 ipmap = dict(entry.split() for entry in iplist)
263 for node in node_list:
266 ip = nslookup_fn(node, family=family)
267 result.append((node, ip))
272 class _StaticResolver:
273 def __init__(self, addresses):
274 """Initializes this class.
277 self._addresses = addresses
279 def __call__(self, hosts):
280 """Returns static addresses for hosts.
283 assert len(hosts) == len(self._addresses)
284 return zip(hosts, self._addresses)
287 def _CheckConfigNode(name, node):
288 """Checks if a node is online.
291 @param name: Node name
292 @type node: L{objects.Node} or None
293 @param node: Node object
297 # Depend on DNS for name resolution
306 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
307 """Calculate node addresses using configuration.
310 # Special case for single-host lookups
313 return [_CheckConfigNode(name, single_node_fn(name))]
315 all_nodes = all_nodes_fn()
316 return [_CheckConfigNode(name, all_nodes.get(name, None))
321 def __init__(self, resolver, port, lock_monitor_cb=None):
322 """Initializes this class.
324 @param resolver: callable accepting a list of hostnames, returning a list
325 of tuples containing name and IP address (IP address can be the name or
326 the special value L{_OFFLINE} to mark offline machines)
328 @param port: TCP port
329 @param lock_monitor_cb: Callable for registering with lock monitor
332 self._resolver = resolver
334 self._lock_monitor_cb = lock_monitor_cb
337 def _PrepareRequests(hosts, port, procedure, body, read_timeout):
338 """Prepares requests by sorting offline hosts into separate list.
344 for (name, ip) in hosts:
346 # Node is marked as offline
347 results[name] = RpcResult(node=name, offline=True, call=procedure)
350 http.client.HttpClientRequest(str(ip), port,
351 http.HTTP_PUT, str("/%s" % procedure),
352 headers=_RPC_CLIENT_HEADERS,
354 read_timeout=read_timeout,
355 nicename="%s/%s" % (name, procedure),
356 curl_config_fn=_ConfigRpcCurl)
358 return (results, requests)
361 def _CombineResults(results, requests, procedure):
362 """Combines pre-computed results for offline hosts with actual call results.
365 for name, req in requests.items():
366 if req.success and req.resp_status_code == http.HTTP_OK:
367 host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
368 node=name, call=procedure)
370 # TODO: Better error reporting
376 logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
377 host_result = RpcResult(data=msg, failed=True, node=name,
380 results[name] = host_result
384 def __call__(self, hosts, procedure, body, read_timeout=None,
385 _req_process_fn=http.client.ProcessRequests):
386 """Makes an RPC request to a number of nodes.
388 @type hosts: sequence
389 @param hosts: Hostnames
390 @type procedure: string
391 @param procedure: Request path
393 @param body: Request body
394 @type read_timeout: int or None
395 @param read_timeout: Read timeout for request
398 assert read_timeout is not None, \
399 "Missing RPC read timeout for procedure '%s'" % procedure
401 (results, requests) = \
402 self._PrepareRequests(self._resolver(hosts), self._port, procedure,
403 str(body), read_timeout)
405 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
407 assert not frozenset(results).intersection(requests)
409 return self._CombineResults(results, requests, procedure)
412 class RpcRunner(_generated_rpc.RpcClientDefault,
413 _generated_rpc.RpcClientBootstrap,
414 _generated_rpc.RpcClientConfig):
418 def __init__(self, context):
419 """Initialized the RPC runner.
421 @type context: C{masterd.GanetiContext}
422 @param context: Ganeti context
425 # Pylint doesn't recognize multiple inheritance properly, see
426 # <http://www.logilab.org/ticket/36586> and
427 # <http://www.logilab.org/ticket/35642>
428 # pylint: disable=W0233
429 _generated_rpc.RpcClientConfig.__init__(self)
430 _generated_rpc.RpcClientBootstrap.__init__(self)
431 _generated_rpc.RpcClientDefault.__init__(self)
433 self._cfg = context.cfg
434 self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
435 self._cfg.GetNodeInfo,
436 self._cfg.GetAllNodesInfo),
437 netutils.GetDaemonPort(constants.NODED),
438 lock_monitor_cb=context.glm.AddToLockMonitor)
440 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
441 """Convert the given instance to a dict.
443 This is done via the instance's ToDict() method and additionally
444 we fill the hvparams with the cluster defaults.
446 @type instance: L{objects.Instance}
447 @param instance: an Instance object
448 @type hvp: dict or None
449 @param hvp: a dictionary with overridden hypervisor parameters
450 @type bep: dict or None
451 @param bep: a dictionary with overridden backend parameters
452 @type osp: dict or None
453 @param osp: a dictionary with overridden os parameters
455 @return: the instance dict, with the hvparams filled with the
459 idict = instance.ToDict()
460 cluster = self._cfg.GetClusterInfo()
461 idict["hvparams"] = cluster.FillHV(instance)
463 idict["hvparams"].update(hvp)
464 idict["beparams"] = cluster.FillBE(instance)
466 idict["beparams"].update(bep)
467 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
469 idict["osparams"].update(osp)
470 for nic in idict["nics"]:
471 nic['nicparams'] = objects.FillDict(
472 cluster.nicparams[constants.PP_DEFAULT],
476 def _InstDictHvpBep(self, (instance, hvp, bep)):
477 """Wrapper for L{_InstDict}.
480 return self._InstDict(instance, hvp=hvp, bep=bep)
482 def _InstDictOsp(self, (instance, osparams)):
483 """Wrapper for L{_InstDict}.
486 return self._InstDict(instance, osp=osparams)
488 def _Call(self, node_list, procedure, timeout, args):
489 """Entry point for automatically generated RPC wrappers.
492 body = serializer.DumpJson(args, indent=False)
494 return self._proc(node_list, procedure, body, read_timeout=timeout)
497 def _MigrationStatusPostProc(result):
498 if not result.fail_msg and result.payload is not None:
499 result.payload = objects.MigrationStatus.FromDict(result.payload)
503 def _BlockdevFindPostProc(result):
504 if not result.fail_msg and result.payload is not None:
505 result.payload = objects.BlockDevStatus.FromDict(result.payload)
509 def _BlockdevGetMirrorStatusPostProc(result):
510 if not result.fail_msg:
511 result.payload = [objects.BlockDevStatus.FromDict(i)
512 for i in result.payload]
516 def _BlockdevGetMirrorStatusMultiPostProc(result):
517 for nres in result.values():
521 for idx, (success, status) in enumerate(nres.payload):
523 nres.payload[idx] = (success, objects.BlockDevStatus.FromDict(status))
528 def _OsGetPostProc(result):
529 if not result.fail_msg and isinstance(result.payload, dict):
530 result.payload = objects.OS.FromDict(result.payload)
534 def _PrepareFinalizeExportDisks(snap_disks):
537 for disk in snap_disks:
538 if isinstance(disk, bool):
539 flat_disks.append(disk)
541 flat_disks.append(disk.ToDict())
546 def _ImpExpStatusPostProc(result):
547 """Post-processor for import/export status.
549 @rtype: Payload containing list of L{objects.ImportExportStatus} instances
550 @return: Returns a list of the state of each named import/export or None if
551 a status couldn't be retrieved
554 if not result.fail_msg:
557 for i in result.payload:
561 decoded.append(objects.ImportExportStatus.FromDict(i))
563 result.payload = decoded
568 def _EncodeImportExportIO(ieio, ieioargs):
569 """Encodes import/export I/O information.
572 if ieio == constants.IEIO_RAW_DISK:
573 assert len(ieioargs) == 1
574 return (ieioargs[0].ToDict(), )
576 if ieio == constants.IEIO_SCRIPT:
577 assert len(ieioargs) == 2
578 return (ieioargs[0].ToDict(), ieioargs[1])
583 def _PrepareFileUpload(filename):
584 """Loads a file and prepares it for an upload to nodes.
587 data = _Compress(utils.ReadFile(filename))
588 st = os.stat(filename)
589 getents = runtime.GetEnts()
590 return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
591 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
597 def call_test_delay(self, node_list, duration, read_timeout=None):
598 """Sleep for a fixed time on given node(s).
600 This is a multi-node call.
603 assert read_timeout is None
604 return self.call_test_delay(node_list, duration,
605 read_timeout=int(duration + 5))
608 class JobQueueRunner(_generated_rpc.RpcClientJobQueue):
609 """RPC wrappers for job queue.
612 _Compress = staticmethod(_Compress)
614 def __init__(self, context, address_list):
615 """Initializes this class.
618 _generated_rpc.RpcClientJobQueue.__init__(self)
620 if address_list is None:
621 resolver = _SsconfResolver
623 # Caller provided an address list
624 resolver = _StaticResolver(address_list)
626 self._proc = _RpcProcessor(resolver,
627 netutils.GetDaemonPort(constants.NODED),
628 lock_monitor_cb=context.glm.AddToLockMonitor)
630 def _Call(self, node_list, procedure, timeout, args):
631 """Entry point for automatically generated RPC wrappers.
634 body = serializer.DumpJson(args, indent=False)
636 return self._proc(node_list, procedure, body, read_timeout=timeout)
639 class BootstrapRunner(_generated_rpc.RpcClientBootstrap):
640 """RPC wrappers for bootstrapping.
644 """Initializes this class.
647 _generated_rpc.RpcClientBootstrap.__init__(self)
649 self._proc = _RpcProcessor(_SsconfResolver,
650 netutils.GetDaemonPort(constants.NODED))
652 def _Call(self, node_list, procedure, timeout, args):
653 """Entry point for automatically generated RPC wrappers.
656 body = serializer.DumpJson(args, indent=False)
658 return self._proc(node_list, procedure, body, read_timeout=timeout)
661 class ConfigRunner(_generated_rpc.RpcClientConfig):
662 """RPC wrappers for L{config}.
665 _PrepareFileUpload = \
666 staticmethod(RpcRunner._PrepareFileUpload) # pylint: disable=W0212
668 def __init__(self, address_list):
669 """Initializes this class.
672 _generated_rpc.RpcClientConfig.__init__(self)
674 if address_list is None:
675 resolver = _SsconfResolver
677 # Caller provided an address list
678 resolver = _StaticResolver(address_list)
680 self._proc = _RpcProcessor(resolver,
681 netutils.GetDaemonPort(constants.NODED))
683 def _Call(self, node_list, procedure, timeout, args):
684 """Entry point for automatically generated RPC wrappers.
687 body = serializer.DumpJson(args, indent=False)
689 return self._proc(node_list, procedure, body, read_timeout=timeout)