4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Inter-node RPC library.
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
50 from ganeti import rpc_defs
52 # Special module generated at build time
53 from ganeti import _generated_rpc
55 # pylint has a bug here, doesn't see this import
56 import ganeti.http.client # pylint: disable=W0611
59 # Timeout for connecting to nodes (seconds)
60 _RPC_CONNECT_TIMEOUT = 5
62 _RPC_CLIENT_HEADERS = [
63 "Content-type: %s" % http.HTTP_APP_JSON,
67 # Various time constants for the timeout table
68 _TMO_URGENT = 60 # one minute
69 _TMO_FAST = 5 * 60 # five minutes
70 _TMO_NORMAL = 15 * 60 # 15 minutes
71 _TMO_SLOW = 3600 # one hour
75 #: Special value to describe an offline host
80 """Initializes the module-global HTTP client manager.
82 Must be called before using any RPC function and while exactly one thread is
86 # curl_global_init(3) and curl_global_cleanup(3) must be called with only
87 # one thread running. This check is just a safety measure -- it doesn't
89 assert threading.activeCount() == 1, \
90 "Found more than one active thread when initializing pycURL"
92 logging.info("Using PycURL %s", pycurl.version)
94 pycurl.global_init(pycurl.GLOBAL_ALL)
98 """Stops the module-global HTTP client manager.
100 Must be called before quitting the program and while exactly one thread is
104 pycurl.global_cleanup()
107 def _ConfigRpcCurl(curl):
108 noded_cert = str(constants.NODED_CERT_FILE)
110 curl.setopt(pycurl.FOLLOWLOCATION, False)
111 curl.setopt(pycurl.CAINFO, noded_cert)
112 curl.setopt(pycurl.SSL_VERIFYHOST, 0)
113 curl.setopt(pycurl.SSL_VERIFYPEER, True)
114 curl.setopt(pycurl.SSLCERTTYPE, "PEM")
115 curl.setopt(pycurl.SSLCERT, noded_cert)
116 curl.setopt(pycurl.SSLKEYTYPE, "PEM")
117 curl.setopt(pycurl.SSLKEY, noded_cert)
118 curl.setopt(pycurl.CONNECTTIMEOUT, _RPC_CONNECT_TIMEOUT)
122 """RPC-wrapper decorator.
124 When applied to a function, it runs it with the RPC system
125 initialized, and it shutsdown the system afterwards. This means the
126 function must be called without RPC being initialized.
129 def wrapper(*args, **kwargs):
132 return fn(*args, **kwargs)
139 """Compresses a string for transport over RPC.
141 Small amounts of data are not compressed.
146 @return: Encoded data to send
149 # Small amounts of data are not compressed
151 return (constants.RPC_ENCODING_NONE, data)
153 # Compress with zlib and encode in base64
154 return (constants.RPC_ENCODING_ZLIB_BASE64,
155 base64.b64encode(zlib.compress(data, 3)))
158 class RpcResult(object):
161 This class holds an RPC result. It is needed since in multi-node
162 calls we can't raise an exception just because one one out of many
163 failed, and therefore we use this class to encapsulate the result.
165 @ivar data: the data payload, for successful results, or None
166 @ivar call: the name of the RPC call
167 @ivar node: the name of the node to which we made the call
168 @ivar offline: whether the operation failed because the node was
169 offline, as opposed to actual failure; offline=True will always
170 imply failed=True, in order to allow simpler checking if
171 the user doesn't care about the exact failure mode
172 @ivar fail_msg: the error message if the call failed
175 def __init__(self, data=None, failed=False, offline=False,
176 call=None, node=None):
177 self.offline = offline
182 self.fail_msg = "Node is marked offline"
183 self.data = self.payload = None
185 self.fail_msg = self._EnsureErr(data)
186 self.data = self.payload = None
189 if not isinstance(self.data, (tuple, list)):
190 self.fail_msg = ("RPC layer error: invalid result type (%s)" %
194 self.fail_msg = ("RPC layer error: invalid result length (%d), "
195 "expected 2" % len(self.data))
197 elif not self.data[0]:
198 self.fail_msg = self._EnsureErr(self.data[1])
203 self.payload = data[1]
205 for attr_name in ["call", "data", "fail_msg",
206 "node", "offline", "payload"]:
207 assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
211 """Helper to ensure we return a 'True' value for error."""
215 return "No error information"
217 def Raise(self, msg, prereq=False, ecode=None):
218 """If the result has failed, raise an OpExecError.
220 This is used so that LU code doesn't have to check for each
221 result, but instead can call this function.
224 if not self.fail_msg:
227 if not msg: # one could pass None for default message
228 msg = ("Call '%s' to node '%s' has failed: %s" %
229 (self.call, self.node, self.fail_msg))
231 msg = "%s: %s" % (msg, self.fail_msg)
233 ec = errors.OpPrereqError
235 ec = errors.OpExecError
236 if ecode is not None:
240 raise ec(*args) # pylint: disable=W0142
243 def _SsconfResolver(ssconf_ips, node_list, _,
244 ssc=ssconf.SimpleStore,
245 nslookup_fn=netutils.Hostname.GetIP):
246 """Return addresses for given node names.
248 @type ssconf_ips: bool
249 @param ssconf_ips: Use the ssconf IPs
250 @type node_list: list
251 @param node_list: List of node names
253 @param ssc: SimpleStore class that is used to obtain node->ip mappings
254 @type nslookup_fn: callable
255 @param nslookup_fn: function use to do NS lookup
256 @rtype: list of tuple; (string, string)
257 @return: List of tuples containing node name and IP address
261 family = ss.GetPrimaryIPFamily()
264 iplist = ss.GetNodePrimaryIPList()
265 ipmap = dict(entry.split() for entry in iplist)
270 for node in node_list:
273 ip = nslookup_fn(node, family=family)
274 result.append((node, ip))
279 class _StaticResolver:
280 def __init__(self, addresses):
281 """Initializes this class.
284 self._addresses = addresses
286 def __call__(self, hosts, _):
287 """Returns static addresses for hosts.
290 assert len(hosts) == len(self._addresses)
291 return zip(hosts, self._addresses)
294 def _CheckConfigNode(name, node, accept_offline_node):
295 """Checks if a node is online.
298 @param name: Node name
299 @type node: L{objects.Node} or None
300 @param node: Node object
304 # Depend on DNS for name resolution
306 elif node.offline and not accept_offline_node:
313 def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
314 """Calculate node addresses using configuration.
317 accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
319 assert accept_offline_node or opts is None, "Unknown option"
321 # Special case for single-host lookups
324 return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
326 all_nodes = all_nodes_fn()
327 return [_CheckConfigNode(name, all_nodes.get(name, None),
333 def __init__(self, resolver, port, lock_monitor_cb=None):
334 """Initializes this class.
336 @param resolver: callable accepting a list of hostnames, returning a list
337 of tuples containing name and IP address (IP address can be the name or
338 the special value L{_OFFLINE} to mark offline machines)
340 @param port: TCP port
341 @param lock_monitor_cb: Callable for registering with lock monitor
344 self._resolver = resolver
346 self._lock_monitor_cb = lock_monitor_cb
349 def _PrepareRequests(hosts, port, procedure, body, read_timeout):
350 """Prepares requests by sorting offline hosts into separate list.
353 @param body: a dictionary with per-host body data
359 assert isinstance(body, dict)
360 assert len(body) == len(hosts)
361 assert compat.all(isinstance(v, str) for v in body.values())
362 assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
363 "%s != %s" % (hosts, body.keys())
365 for (name, ip) in hosts:
367 # Node is marked as offline
368 results[name] = RpcResult(node=name, offline=True, call=procedure)
371 http.client.HttpClientRequest(str(ip), port,
372 http.HTTP_POST, str("/%s" % procedure),
373 headers=_RPC_CLIENT_HEADERS,
374 post_data=body[name],
375 read_timeout=read_timeout,
376 nicename="%s/%s" % (name, procedure),
377 curl_config_fn=_ConfigRpcCurl)
379 return (results, requests)
382 def _CombineResults(results, requests, procedure):
383 """Combines pre-computed results for offline hosts with actual call results.
386 for name, req in requests.items():
387 if req.success and req.resp_status_code == http.HTTP_OK:
388 host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
389 node=name, call=procedure)
391 # TODO: Better error reporting
397 logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
398 host_result = RpcResult(data=msg, failed=True, node=name,
401 results[name] = host_result
405 def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
406 _req_process_fn=None):
407 """Makes an RPC request to a number of nodes.
409 @type hosts: sequence
410 @param hosts: Hostnames
411 @type procedure: string
412 @param procedure: Request path
413 @type body: dictionary
414 @param body: dictionary with request bodies per host
415 @type read_timeout: int or None
416 @param read_timeout: Read timeout for request
419 assert read_timeout is not None, \
420 "Missing RPC read timeout for procedure '%s'" % procedure
422 if _req_process_fn is None:
423 _req_process_fn = http.client.ProcessRequests
425 (results, requests) = \
426 self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
427 procedure, body, read_timeout)
429 _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
431 assert not frozenset(results).intersection(requests)
433 return self._CombineResults(results, requests, procedure)
436 class _RpcClientBase:
437 def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
438 _req_process_fn=None):
439 """Initializes this class.
442 proc = _RpcProcessor(resolver,
443 netutils.GetDaemonPort(constants.NODED),
444 lock_monitor_cb=lock_monitor_cb)
445 self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
446 self._encoder = compat.partial(self._EncodeArg, encoder_fn)
449 def _EncodeArg(encoder_fn, (argkind, value)):
456 return encoder_fn(argkind)(value)
458 def _Call(self, cdef, node_list, args):
459 """Entry point for automatically generated RPC wrappers.
462 (procedure, _, resolver_opts, timeout, argdefs,
463 prep_fn, postproc_fn, _) = cdef
465 if callable(timeout):
466 read_timeout = timeout(args)
468 read_timeout = timeout
470 if callable(resolver_opts):
471 req_resolver_opts = resolver_opts(args)
473 req_resolver_opts = resolver_opts
475 if len(args) != len(argdefs):
476 raise errors.ProgrammerError("Number of passed arguments doesn't match")
478 enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
480 # for a no-op prep_fn, we serialise the body once, and then we
481 # reuse it in the dictionary values
482 body = serializer.DumpJson(enc_args)
483 pnbody = dict((n, body) for n in node_list)
485 # for a custom prep_fn, we pass the encoded arguments and the
486 # node name to the prep_fn, and we serialise its return value
487 assert callable(prep_fn)
488 pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
491 result = self._proc(node_list, procedure, pnbody, read_timeout,
495 return dict(map(lambda (key, value): (key, postproc_fn(value)),
501 def _ObjectToDict(value):
502 """Converts an object to a dictionary.
504 @note: See L{objects}.
507 return value.ToDict()
510 def _ObjectListToDict(value):
511 """Converts a list of L{objects} to dictionaries.
514 return map(_ObjectToDict, value)
517 def _EncodeNodeToDiskDict(value):
518 """Encodes a dictionary with node name as key and disk objects as values.
521 return dict((name, _ObjectListToDict(disks))
522 for name, disks in value.items())
525 def _PrepareFileUpload(getents_fn, filename):
526 """Loads a file and prepares it for an upload to nodes.
529 statcb = utils.FileStatHelper()
530 data = _Compress(utils.ReadFile(filename, preread=statcb))
533 if getents_fn is None:
534 getents_fn = runtime.GetEnts
536 getents = getents_fn()
538 return [filename, data, st.st_mode, getents.LookupUid(st.st_uid),
539 getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
542 def _PrepareFinalizeExportDisks(snap_disks):
543 """Encodes disks for finalizing export.
548 for disk in snap_disks:
549 if isinstance(disk, bool):
550 flat_disks.append(disk)
552 flat_disks.append(disk.ToDict())
557 def _EncodeImportExportIO((ieio, ieioargs)):
558 """Encodes import/export I/O information.
561 if ieio == constants.IEIO_RAW_DISK:
562 assert len(ieioargs) == 1
563 return (ieio, (ieioargs[0].ToDict(), ))
565 if ieio == constants.IEIO_SCRIPT:
566 assert len(ieioargs) == 2
567 return (ieio, (ieioargs[0].ToDict(), ieioargs[1]))
569 return (ieio, ieioargs)
572 def _EncodeBlockdevRename(value):
573 """Encodes information for renaming block devices.
576 return [(d.ToDict(), uid) for d, uid in value]
579 def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
580 """Annotates just DRBD disks layouts.
583 assert disk.dev_type == constants.LD_DRBD8
585 disk.params = objects.FillDict(drbd_params, disk.params)
586 (dev_data, dev_meta) = disk.children
587 dev_data.params = objects.FillDict(data_params, dev_data.params)
588 dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
593 def _AnnotateDParamsGeneric(disk, (params, )):
594 """Generic disk parameter annotation routine.
597 assert disk.dev_type != constants.LD_DRBD8
599 disk.params = objects.FillDict(params, disk.params)
604 def AnnotateDiskParams(template, disks, disk_params):
605 """Annotates the disk objects with the disk parameters.
607 @param template: The disk template used
608 @param disks: The list of disks objects to annotate
609 @param disk_params: The disk paramaters for annotation
610 @returns: A list of disk objects annotated
613 ld_params = objects.Disk.ComputeLDParams(template, disk_params)
615 if template == constants.DT_DRBD8:
616 annotation_fn = _AnnotateDParamsDRBD
617 elif template == constants.DT_DISKLESS:
618 annotation_fn = lambda disk, _: disk
620 annotation_fn = _AnnotateDParamsGeneric
624 new_disks.append(annotation_fn(disk.Copy(), ld_params))
631 rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
632 rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
633 rpc_defs.ED_NODE_TO_DISK_DICT: _EncodeNodeToDiskDict,
634 rpc_defs.ED_COMPRESS: _Compress,
635 rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
636 rpc_defs.ED_IMPEXP_IO: _EncodeImportExportIO,
637 rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
641 class RpcRunner(_RpcClientBase,
642 _generated_rpc.RpcClientDefault,
643 _generated_rpc.RpcClientBootstrap,
644 _generated_rpc.RpcClientDnsOnly,
645 _generated_rpc.RpcClientConfig):
649 def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
650 """Initialized the RPC runner.
652 @type cfg: L{config.ConfigWriter}
653 @param cfg: Configuration
654 @type lock_monitor_cb: callable
655 @param lock_monitor_cb: Lock monitor callback
660 encoders = _ENCODERS.copy()
663 # Encoders requiring configuration object
664 rpc_defs.ED_INST_DICT: self._InstDict,
665 rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
666 rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
667 rpc_defs.ED_NIC_DICT: self._NicDict,
669 # Encoders annotating disk parameters
670 rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
671 rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
673 # Encoders with special requirements
674 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
677 # Resolver using configuration
678 resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
681 # Pylint doesn't recognize multiple inheritance properly, see
682 # <http://www.logilab.org/ticket/36586> and
683 # <http://www.logilab.org/ticket/35642>
684 # pylint: disable=W0233
685 _RpcClientBase.__init__(self, resolver, encoders.get,
686 lock_monitor_cb=lock_monitor_cb,
687 _req_process_fn=_req_process_fn)
688 _generated_rpc.RpcClientConfig.__init__(self)
689 _generated_rpc.RpcClientBootstrap.__init__(self)
690 _generated_rpc.RpcClientDnsOnly.__init__(self)
691 _generated_rpc.RpcClientDefault.__init__(self)
693 def _NicDict(self, nic):
694 """Convert the given nic to a dict and encapsulate netinfo
697 n = copy.deepcopy(nic)
699 net_uuid = self._cfg.LookupNetwork(n.network)
701 nobj = self._cfg.GetNetwork(net_uuid)
702 n.netinfo = objects.Network.ToDict(nobj)
705 def _InstDict(self, instance, hvp=None, bep=None, osp=None):
706 """Convert the given instance to a dict.
708 This is done via the instance's ToDict() method and additionally
709 we fill the hvparams with the cluster defaults.
711 @type instance: L{objects.Instance}
712 @param instance: an Instance object
713 @type hvp: dict or None
714 @param hvp: a dictionary with overridden hypervisor parameters
715 @type bep: dict or None
716 @param bep: a dictionary with overridden backend parameters
717 @type osp: dict or None
718 @param osp: a dictionary with overridden os parameters
720 @return: the instance dict, with the hvparams filled with the
724 idict = instance.ToDict()
725 cluster = self._cfg.GetClusterInfo()
726 idict["hvparams"] = cluster.FillHV(instance)
728 idict["hvparams"].update(hvp)
729 idict["beparams"] = cluster.FillBE(instance)
731 idict["beparams"].update(bep)
732 idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
734 idict["osparams"].update(osp)
735 for nic in idict["nics"]:
736 nic["nicparams"] = objects.FillDict(
737 cluster.nicparams[constants.PP_DEFAULT],
739 network = nic.get("network", None)
741 net_uuid = self._cfg.LookupNetwork(network)
743 nobj = self._cfg.GetNetwork(net_uuid)
744 nic["netinfo"] = objects.Network.ToDict(nobj)
745 idict["disks"] = self._DisksDictDP((instance.disks, instance))
748 def _InstDictHvpBepDp(self, (instance, hvp, bep)):
749 """Wrapper for L{_InstDict}.
752 return self._InstDict(instance, hvp=hvp, bep=bep)
754 def _InstDictOspDp(self, (instance, osparams)):
755 """Wrapper for L{_InstDict}.
758 return self._InstDict(instance, osp=osparams)
760 def _DisksDictDP(self, (disks, instance)):
761 """Wrapper for L{AnnotateDiskParams}.
764 diskparams = self._cfg.GetInstanceDiskParams(instance)
765 return [disk.ToDict()
766 for disk in AnnotateDiskParams(instance.disk_template,
769 def _SingleDiskDictDP(self, (disk, instance)):
770 """Wrapper for L{AnnotateDiskParams}.
773 (anno_disk,) = self._DisksDictDP(([disk], instance))
777 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
778 """RPC wrappers for job queue.
781 def __init__(self, context, address_list):
782 """Initializes this class.
785 if address_list is None:
786 resolver = compat.partial(_SsconfResolver, True)
788 # Caller provided an address list
789 resolver = _StaticResolver(address_list)
791 _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
792 lock_monitor_cb=context.glm.AddToLockMonitor)
793 _generated_rpc.RpcClientJobQueue.__init__(self)
796 class BootstrapRunner(_RpcClientBase,
797 _generated_rpc.RpcClientBootstrap,
798 _generated_rpc.RpcClientDnsOnly):
799 """RPC wrappers for bootstrapping.
803 """Initializes this class.
806 # Pylint doesn't recognize multiple inheritance properly, see
807 # <http://www.logilab.org/ticket/36586> and
808 # <http://www.logilab.org/ticket/35642>
809 # pylint: disable=W0233
810 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
812 _generated_rpc.RpcClientBootstrap.__init__(self)
813 _generated_rpc.RpcClientDnsOnly.__init__(self)
816 class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
817 """RPC wrappers for calls using only DNS.
821 """Initialize this class.
824 _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
826 _generated_rpc.RpcClientDnsOnly.__init__(self)
829 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
830 """RPC wrappers for L{config}.
833 def __init__(self, context, address_list, _req_process_fn=None,
835 """Initializes this class.
839 lock_monitor_cb = context.glm.AddToLockMonitor
841 lock_monitor_cb = None
843 if address_list is None:
844 resolver = compat.partial(_SsconfResolver, True)
846 # Caller provided an address list
847 resolver = _StaticResolver(address_list)
849 encoders = _ENCODERS.copy()
852 rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
855 _RpcClientBase.__init__(self, resolver, encoders.get,
856 lock_monitor_cb=lock_monitor_cb,
857 _req_process_fn=_req_process_fn)
858 _generated_rpc.RpcClientConfig.__init__(self)