4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
39 from ganeti import utils
40 from ganeti import objects
41 from ganeti import http
42 from ganeti import serializer
43 from ganeti import constants
44 from ganeti import errors
45 from ganeti import netutils
46 from ganeti import ssconf
47 from ganeti import runtime
48 from ganeti import compat
49 from ganeti import rpc_defs
51 # Special module generated at build time
52 from ganeti import _generated_rpc
54 # pylint has a bug here, doesn't see this import
55 import ganeti.http.client # pylint: disable=W0611
58 # Timeout for connecting to nodes (seconds)
59 _RPC_CONNECT_TIMEOUT = 5
61 _RPC_CLIENT_HEADERS = [
62 "Content-type: %s" % http.HTTP_APP_JSON,
66 # Various time constants for the timeout table
67 _TMO_URGENT = 60 # one minute
68 _TMO_FAST = 5 * 60 # five minutes
69 _TMO_NORMAL = 15 * 60 # 15 minutes
70 _TMO_SLOW = 3600 # one hour
74 #: Special value to describe an offline host
79 """Initializes the module-global HTTP client manager.
81 Must be called before using any RPC function and while exactly one thread is
85 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
86 # one thread running. This check is just a safety measure -- it doesn't
88 assert threading.activeCount() == 1, \
89 "Found more than one active thread when initializing pycURL"
91 logging.info("Using PycURL %s", pycurl.version)
93 pycurl.global_init(pycurl.GLOBAL_ALL)
97 """Stops the module-global HTTP client manager.
99 Must be called before quitting the program and while exactly one thread is
103 pycurl.global_cleanup()
106 def _ConfigRpcCurl(curl):
107 noded_cert = str(constants.NODED_CERT_FILE)
109 curl.setopt(pycurl.FOLLOWLOCATION, False)
110 curl.setopt(pycurl.CAINFO, noded_cert)
111 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
112 curl.setopt(pycurl.SSL_VERIFYPEER, True)
113 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
114 curl.setopt(pycurl.SSLCERT, noded_cert)
115 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
116 curl.setopt(pycurl.SSLKEY, noded_cert)
117 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
121 """RPC-wrapper decorator.
123 When applied to a function, it runs it with the RPC system
124 initialized, and it shutsdown the system afterwards. This means the
125 function must be called without RPC being initialized.
128 def wrapper(*args, **kwargs):
131 return fn(*args, **kwargs)
138 """Compresses a string for transport over RPC.
140 Small amounts of data are not compressed.
145 @return: Encoded data to send
148 # Small amounts of data are not compressed
150 return (constants.RPC_ENCODING_NONE, data)
152 # Compress with zlib and encode in base64
153 return (constants.RPC_ENCODING_ZLIB_BASE64,
154 base64.b64encode(zlib.compress(data, 3)))
157 class RpcResult(object):
160 This class holds an RPC result. It is needed since in multi-node
161 calls we can't raise an exception just because one one out of many
162 failed, and therefore we use this class to encapsulate the result.
164 @ivar data: the data payload, for successful results, or None
165 @ivar call: the name of the RPC call
166 @ivar node: the name of the node to which we made the call
167 @ivar offline: whether the operation failed because the node was
168 offline, as opposed to actual failure; offline=True will always
169 imply failed=True, in order to allow simpler checking if
170 the user doesn't care about the exact failure mode
171 @ivar fail_msg: the error message if the call failed
174 def __init__(self, data=None, failed=False, offline=False,
175 call=None, node=None):
176 self.offline = offline
181 self.fail_msg = "Node is marked offline"
182 self.data = self.payload = None
184 self.fail_msg = self._EnsureErr(data)
185 self.data = self.payload = None
188 if not isinstance(self.data, (tuple, list)):
189 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
193 self.fail_msg = ("RPC layer error: invalid result length (%d), "
194 "expected 2" % len(self.data))
196 elif not self.data[0]:
197 self.fail_msg = self._EnsureErr(self.data[1])
202 self.payload = data[1]
204 for attr_name in ["call", "data", "fail_msg",
205 "node", "offline", "payload"]:
206 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
210 """Helper to ensure we return a 'True' value for error."""
214 return "No error information"
216 def Raise(self, msg, prereq=False, ecode=None):
217 """If the result has failed, raise an OpExecError.
219 This is used so that LU code doesn't have to check for each
220 result, but instead can call this function.
223 if not self.fail_msg:
226 if not msg: # one could pass None for default message
227 msg = ("Call '%s' to node '%s' has failed: %s" %
228 (self.call, self.node, self.fail_msg))
230 msg = "%s: %s" % (msg, self.fail_msg)
232 ec = errors.OpPrereqError
234 ec = errors.OpExecError
235 if ecode is not None:
239 raise ec(*args) # pylint: disable=W0142
242 def _SsconfResolver(ssconf_ips, node_list, _,
243 ssc=ssconf.SimpleStore,
244 nslookup_fn=netutils.Hostname.GetIP):
245 """Return addresses for given node names.
247 @type ssconf_ips: bool
248 @param ssconf_ips: Use the ssconf IPs
249 @type node_list: list
250 @param node_list: List of node names
252 @param ssc: SimpleStore class that is used to obtain node->ip mappings
253 @type nslookup_fn: callable
254 @param nslookup_fn: function use to do NS lookup
255 @rtype: list of tuple; (string, string)
256 @return: List of tuples containing node name and IP address
260 family = ss.GetPrimaryIPFamily()
263 iplist = ss.GetNodePrimaryIPList()
264 ipmap = dict(entry.split() for entry in iplist)
269 for node in node_list:
272 ip = nslookup_fn(node, family=family)
273 result.append((node, ip))
278 class _StaticResolver:
279 def __init__(self, addresses):
280 """Initializes this class.
283 self._addresses = addresses
285 def __call__(self, hosts, _):
286 """Returns static addresses for hosts.
289 assert len(hosts) == len(self._addresses)
290 return zip(hosts, self._addresses)
293 def _CheckConfigNode(name, node, accept_offline_node):
294 """Checks if a node is online.
297 @param name: Node name
298 @type node: L{objects.Node} or None
299 @param node: Node object
303 # Depend on DNS for name resolution
305 elif node.offline and not accept_offline_node:
312 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
313 """Calculate node addresses using configuration.
316 accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
318 assert accept_offline_node or opts is None, "Unknown option"
320 # Special case for single-host lookups
323 return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
325 all_nodes = all_nodes_fn()
326 return [_CheckConfigNode(name, all_nodes.get(name, None),
332 def __init__(self, resolver, port, lock_monitor_cb=None):
333 """Initializes this class.
335 @param resolver: callable accepting a list of hostnames, returning a list
336 of tuples containing name and IP address (IP address can be the name or
337 the special value L{_OFFLINE} to mark offline machines)
339 @param port: TCP port
340 @param lock_monitor_cb: Callable for registering with lock monitor
343 self._resolver = resolver
345 self._lock_monitor_cb = lock_monitor_cb
348 def _PrepareRequests(hosts, port, procedure, body, read_timeout):
349 """Prepares requests by sorting offline hosts into separate list.
352 @param body: a dictionary with per-host body data
358 assert isinstance(body, dict)
359 assert len(body) == len(hosts)
360 assert compat.all(isinstance(v, str) for v in body.values())
361 assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
362 "%s != %s" % (hosts, body.keys())
364 for (name, ip) in hosts:
366 # Node is marked as offline
367 results[name] = RpcResult(node=name, offline=True, call=procedure)
370 http.client.HttpClientRequest(str(ip), port,
371 http.HTTP_POST, str("/%s" % procedure),
372 headers=_RPC_CLIENT_HEADERS,
373 post_data=body[name],
374 read_timeout=read_timeout,
375 nicename="%s/%s" % (name, procedure),
376 curl_config_fn=_ConfigRpcCurl)
378 return (results, requests)
381 def _CombineResults(results, requests, procedure):
382 """Combines pre-computed results for offline hosts with actual call results.
385 for name, req in requests.items():
386 if req.success and req.resp_status_code == http.HTTP_OK:
387 host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
388 node=name, call=procedure)
390 # TODO: Better error reporting
396 logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
397 host_result = RpcResult(data=msg, failed=True, node=name,
400 results[name] = host_result
404 def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
405 _req_process_fn=None):
406 """Makes an RPC request to a number of nodes.
408 @type hosts: sequence
409 @param hosts: Hostnames
410 @type procedure: string
411 @param procedure: Request path
412 @type body: dictionary
413 @param body: dictionary with request bodies per host
414 @type read_timeout: int or None
415 @param read_timeout: Read timeout for request
418 assert read_timeout is not None, \
419 "Missing RPC read timeout for procedure '%s'" % procedure
421 if _req_process_fn is None:
422 _req_process_fn = http.client.ProcessRequests
424 (results, requests) = \
425 self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
426 procedure, body, read_timeout)
428 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
430 assert not frozenset(results).intersection(requests)
432 return self._CombineResults(results, requests, procedure)
435 class _RpcClientBase:
436 def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
437 _req_process_fn=None):
438 """Initializes this class.
441 proc = _RpcProcessor(resolver,
442 netutils.GetDaemonPort(constants.NODED),
443 lock_monitor_cb=lock_monitor_cb)
444 self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
445 self._encoder = compat.partial(self._EncodeArg, encoder_fn)
448 def _EncodeArg(encoder_fn, (argkind, value)):
455 return encoder_fn(argkind)(value)
457 def _Call(self, cdef, node_list, args):
458 """Entry point for automatically generated RPC wrappers.
461 (procedure, _, resolver_opts, timeout, argdefs,
462 prep_fn, postproc_fn, _) = cdef
464 if callable(timeout):
465 read_timeout = timeout(args)
467 read_timeout = timeout
469 if callable(resolver_opts):
470 req_resolver_opts = resolver_opts(args)
472 req_resolver_opts = resolver_opts
474 if len(args) != len(argdefs):
475 raise errors.ProgrammerError("Number of passed arguments doesn't match")
477 enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
479 # for a no-op prep_fn, we serialise the body once, and then we
480 # reuse it in the dictionary values
481 body = serializer.DumpJson(enc_args)
482 pnbody = dict((n, body) for n in node_list)
484 # for a custom prep_fn, we pass the encoded arguments and the
485 # node name to the prep_fn, and we serialise its return value
486 assert callable(prep_fn)
487 pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
490 result = self._proc(node_list, procedure, pnbody, read_timeout,
494 return dict(map(lambda (key, value): (key, postproc_fn(value)),
500 def _ObjectToDict(value):
501 """Converts an object to a dictionary.
503 @note: See L{objects}.
506 return value.ToDict()
509 def _ObjectListToDict(value):
510 """Converts a list of L{objects} to dictionaries.
513 return map(_ObjectToDict, value)
516 def _EncodeNodeToDiskDict(value):
517 """Encodes a dictionary with node name as key and disk objects as values.
520 return dict((name, _ObjectListToDict(disks))
521 for name, disks in value.items())
524 def _PrepareFileUpload(getents_fn, filename):
525 """Loads a file and prepares it for an upload to nodes.
528 statcb = utils.FileStatHelper()
529 data = _Compress(utils.ReadFile(filename, preread=statcb))
532 if getents_fn is None:
533 getents_fn = runtime.GetEnts
535 getents = getents_fn()
537 return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
538 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
541 def _PrepareFinalizeExportDisks(snap_disks):
542 """Encodes disks for finalizing export.
547 for disk in snap_disks:
548 if isinstance(disk, bool):
549 flat_disks.append(disk)
551 flat_disks.append(disk.ToDict())
556 def _EncodeImportExportIO((ieio, ieioargs)):
557 """Encodes import/export I/O information.
560 if ieio == constants.IEIO_RAW_DISK:
561 assert len(ieioargs) == 1
562 return (ieio, (ieioargs[0].ToDict(), ))
564 if ieio == constants.IEIO_SCRIPT:
565 assert len(ieioargs) == 2
566 return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
568 return (ieio, ieioargs)
571 def _EncodeBlockdevRename(value):
572 """Encodes information for renaming block devices.
575 return [(d.ToDict(), uid) for d, uid in value]
578 def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
579 """Annotates just DRBD disks layouts.
582 assert disk.dev_type == constants.LD_DRBD8
584 disk.params = objects.FillDict(drbd_params, disk.params)
585 (dev_data, dev_meta) = disk.children
586 dev_data.params = objects.FillDict(data_params, dev_data.params)
587 dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
592 def _AnnotateDParamsGeneric(disk, (params, )):
593 """Generic disk parameter annotation routine.
596 assert disk.dev_type != constants.LD_DRBD8
598 disk.params = objects.FillDict(params, disk.params)
603 def AnnotateDiskParams(template, disks, disk_params):
604 """Annotates the disk objects with the disk parameters.
606 @param template: The disk template used
607 @param disks: The list of disks objects to annotate
608 @param disk_params: The disk paramaters for annotation
609 @returns: A list of disk objects annotated
612 ld_params = objects.Disk.ComputeLDParams(template, disk_params)
614 if template == constants.DT_DRBD8:
615 annotation_fn = _AnnotateDParamsDRBD
616 elif template == constants.DT_DISKLESS:
617 annotation_fn = lambda disk, _: disk
619 annotation_fn = _AnnotateDParamsGeneric
623 new_disks.append(annotation_fn(disk.Copy(), ld_params))
630 rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
631 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
632 rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
633 rpc_defs.ED_COMPRESS: _Compress,
634 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
635 rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
636 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
640 class RpcRunner(_RpcClientBase,
641 _generated_rpc.RpcClientDefault,
642 _generated_rpc.RpcClientBootstrap,
643 _generated_rpc.RpcClientDnsOnly,
644 _generated_rpc.RpcClientConfig):
648 def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
649 """Initialized the RPC runner.
651 @type cfg: L{config.ConfigWriter}
652 @param cfg: Configuration
653 @type lock_monitor_cb: callable
654 @param lock_monitor_cb: Lock monitor callback
659 encoders = _ENCODERS.copy()
662 # Encoders requiring configuration object
663 rpc_defs.ED_INST_DICT: self._InstDict,
664 rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
665 rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
667 # Encoders annotating disk parameters
668 rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
669 rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
671 # Encoders with special requirements
672 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
675 # Resolver using configuration
676 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
679 # Pylint doesn't recognize multiple inheritance properly, see
680 # <http://www.logilab.org/ticket/36586> and
681 # <http://www.logilab.org/ticket/35642>
682 # pylint: disable=W0233
683 _RpcClientBase.__init__(self, resolver, encoders.get,
684 lock_monitor_cb=lock_monitor_cb,
685 _req_process_fn=_req_process_fn)
686 _generated_rpc.RpcClientConfig.__init__(self)
687 _generated_rpc.RpcClientBootstrap.__init__(self)
688 _generated_rpc.RpcClientDnsOnly.__init__(self)
689 _generated_rpc.RpcClientDefault.__init__(self)
691 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
692 """Convert the given instance to a dict.
694 This is done via the instance's ToDict() method and additionally
695 we fill the hvparams with the cluster defaults.
697 @type instance: L{objects.Instance}
698 @param instance: an Instance object
699 @type hvp: dict or None
700 @param hvp: a dictionary with overridden hypervisor parameters
701 @type bep: dict or None
702 @param bep: a dictionary with overridden backend parameters
703 @type osp: dict or None
704 @param osp: a dictionary with overridden os parameters
706 @return: the instance dict, with the hvparams filled with the
710 idict = instance.ToDict()
711 cluster = self._cfg.GetClusterInfo()
712 idict["hvparams"] = cluster.FillHV(instance)
714 idict["hvparams"].update(hvp)
715 idict["beparams"] = cluster.FillBE(instance)
717 idict["beparams"].update(bep)
718 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
720 idict["osparams"].update(osp)
721 for nic in idict["nics"]:
722 nic["nicparams"] = objects.FillDict(
723 cluster.nicparams[constants.PP_DEFAULT],
725 idict["disks"] = self._DisksDictDP((instance.disks, instance))
728 def _InstDictHvpBepDp(self, (instance, hvp, bep)):
729 """Wrapper for L{_InstDict}.
732 return self._InstDict(instance, hvp=hvp, bep=bep)
734 def _InstDictOspDp(self, (instance, osparams)):
735 """Wrapper for L{_InstDict}.
738 return self._InstDict(instance, osp=osparams)
740 def _DisksDictDP(self, (disks, instance)):
741 """Wrapper for L{AnnotateDiskParams}.
744 diskparams = self._cfg.GetInstanceDiskParams(instance)
745 return [disk.ToDict()
746 for disk in AnnotateDiskParams(instance.disk_template,
749 def _SingleDiskDictDP(self, (disk, instance)):
750 """Wrapper for L{AnnotateDiskParams}.
753 (anno_disk,) = self._DisksDictDP(([disk], instance))
757 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
758 """RPC wrappers for job queue.
761 def __init__(self, context, address_list):
762 """Initializes this class.
765 if address_list is None:
766 resolver = compat.partial(_SsconfResolver, True)
768 # Caller provided an address list
769 resolver = _StaticResolver(address_list)
771 _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
772 lock_monitor_cb=context.glm.AddToLockMonitor)
773 _generated_rpc.RpcClientJobQueue.__init__(self)
776 class BootstrapRunner(_RpcClientBase,
777 _generated_rpc.RpcClientBootstrap,
778 _generated_rpc.RpcClientDnsOnly):
779 """RPC wrappers for bootstrapping.
783 """Initializes this class.
786 # Pylint doesn't recognize multiple inheritance properly, see
787 # <http://www.logilab.org/ticket/36586> and
788 # <http://www.logilab.org/ticket/35642>
789 # pylint: disable=W0233
790 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
792 _generated_rpc.RpcClientBootstrap.__init__(self)
793 _generated_rpc.RpcClientDnsOnly.__init__(self)
796 class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
797 """RPC wrappers for calls using only DNS.
801 """Initialize this class.
804 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
806 _generated_rpc.RpcClientDnsOnly.__init__(self)
809 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
810 """RPC wrappers for L{config}.
813 def __init__(self, context, address_list, _req_process_fn=None,
815 """Initializes this class.
819 lock_monitor_cb = context.glm.AddToLockMonitor
821 lock_monitor_cb = None
823 if address_list is None:
824 resolver = compat.partial(_SsconfResolver, True)
826 # Caller provided an address list
827 resolver = _StaticResolver(address_list)
829 encoders = _ENCODERS.copy()
832 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
835 _RpcClientBase.__init__(self, resolver, encoders.get,
836 lock_monitor_cb=lock_monitor_cb,
837 _req_process_fn=_req_process_fn)
838 _generated_rpc.RpcClientConfig.__init__(self)