Merge branch 'stable-2.9' into stable-2.10
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Inter-node RPC library.
23
24 """
25
26 # pylint: disable=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import logging
34 import zlib
35 import base64
36 import pycurl
37 import threading
38 import copy
39
40 from ganeti import utils
41 from ganeti import objects
42 from ganeti import http
43 from ganeti import serializer
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import netutils
47 from ganeti import ssconf
48 from ganeti import runtime
49 from ganeti import compat
50 from ganeti import rpc_defs
51 from ganeti import pathutils
52 from ganeti import vcluster
53
54 # Special module generated at build time
55 from ganeti import _generated_rpc
56
57 # pylint has a bug here, doesn't see this import
58 import ganeti.http.client  # pylint: disable=W0611
59
60
61 _RPC_CLIENT_HEADERS = [
62   "Content-type: %s" % http.HTTP_APP_JSON,
63   "Expect:",
64   ]
65
66 #: Special value to describe an offline host
67 _OFFLINE = object()
68
69
70 def Init():
71   """Initializes the module-global HTTP client manager.
72
73   Must be called before using any RPC function and while exactly one thread is
74   running.
75
76   """
77   # curl_global_init(3) and curl_global_cleanup(3) must be called with only
78   # one thread running. This check is just a safety measure -- it doesn't
79   # cover all cases.
80   assert threading.activeCount() == 1, \
81          "Found more than one active thread when initializing pycURL"
82
83   logging.info("Using PycURL %s", pycurl.version)
84
85   pycurl.global_init(pycurl.GLOBAL_ALL)
86
87
88 def Shutdown():
89   """Stops the module-global HTTP client manager.
90
91   Must be called before quitting the program and while exactly one thread is
92   running.
93
94   """
95   pycurl.global_cleanup()
96
97
98 def _ConfigRpcCurl(curl):
99   noded_cert = str(pathutils.NODED_CERT_FILE)
100
101   curl.setopt(pycurl.FOLLOWLOCATION, False)
102   curl.setopt(pycurl.CAINFO, noded_cert)
103   curl.setopt(pycurl.SSL_VERIFYHOST, 0)
104   curl.setopt(pycurl.SSL_VERIFYPEER, True)
105   curl.setopt(pycurl.SSLCERTTYPE, "PEM")
106   curl.setopt(pycurl.SSLCERT, noded_cert)
107   curl.setopt(pycurl.SSLKEYTYPE, "PEM")
108   curl.setopt(pycurl.SSLKEY, noded_cert)
109   curl.setopt(pycurl.CONNECTTIMEOUT, constants.RPC_CONNECT_TIMEOUT)
110
111
112 def RunWithRPC(fn):
113   """RPC-wrapper decorator.
114
115   When applied to a function, it runs it with the RPC system
116   initialized, and it shutsdown the system afterwards. This means the
117   function must be called without RPC being initialized.
118
119   """
120   def wrapper(*args, **kwargs):
121     Init()
122     try:
123       return fn(*args, **kwargs)
124     finally:
125       Shutdown()
126   return wrapper
127
128
129 def _Compress(_, data):
130   """Compresses a string for transport over RPC.
131
132   Small amounts of data are not compressed.
133
134   @type data: str
135   @param data: Data
136   @rtype: tuple
137   @return: Encoded data to send
138
139   """
140   # Small amounts of data are not compressed
141   if len(data) < 512:
142     return (constants.RPC_ENCODING_NONE, data)
143
144   # Compress with zlib and encode in base64
145   return (constants.RPC_ENCODING_ZLIB_BASE64,
146           base64.b64encode(zlib.compress(data, 3)))
147
148
149 class RpcResult(object):
150   """RPC Result class.
151
152   This class holds an RPC result. It is needed since in multi-node
153   calls we can't raise an exception just because one out of many
154   failed, and therefore we use this class to encapsulate the result.
155
156   @ivar data: the data payload, for successful results, or None
157   @ivar call: the name of the RPC call
158   @ivar node: the name of the node to which we made the call
159   @ivar offline: whether the operation failed because the node was
160       offline, as opposed to actual failure; offline=True will always
161       imply failed=True, in order to allow simpler checking if
162       the user doesn't care about the exact failure mode
163   @ivar fail_msg: the error message if the call failed
164
165   """
166   def __init__(self, data=None, failed=False, offline=False,
167                call=None, node=None):
168     self.offline = offline
169     self.call = call
170     self.node = node
171
172     if offline:
173       self.fail_msg = "Node is marked offline"
174       self.data = self.payload = None
175     elif failed:
176       self.fail_msg = self._EnsureErr(data)
177       self.data = self.payload = None
178     else:
179       self.data = data
180       if not isinstance(self.data, (tuple, list)):
181         self.fail_msg = ("RPC layer error: invalid result type (%s)" %
182                          type(self.data))
183         self.payload = None
184       elif len(data) != 2:
185         self.fail_msg = ("RPC layer error: invalid result length (%d), "
186                          "expected 2" % len(self.data))
187         self.payload = None
188       elif not self.data[0]:
189         self.fail_msg = self._EnsureErr(self.data[1])
190         self.payload = None
191       else:
192         # finally success
193         self.fail_msg = None
194         self.payload = data[1]
195
196     for attr_name in ["call", "data", "fail_msg",
197                       "node", "offline", "payload"]:
198       assert hasattr(self, attr_name), "Missing attribute %s" % attr_name
199
200   @staticmethod
201   def _EnsureErr(val):
202     """Helper to ensure we return a 'True' value for error."""
203     if val:
204       return val
205     else:
206       return "No error information"
207
208   def Raise(self, msg, prereq=False, ecode=None):
209     """If the result has failed, raise an OpExecError.
210
211     This is used so that LU code doesn't have to check for each
212     result, but instead can call this function.
213
214     """
215     if not self.fail_msg:
216       return
217
218     if not msg: # one could pass None for default message
219       msg = ("Call '%s' to node '%s' has failed: %s" %
220              (self.call, self.node, self.fail_msg))
221     else:
222       msg = "%s: %s" % (msg, self.fail_msg)
223     if prereq:
224       ec = errors.OpPrereqError
225     else:
226       ec = errors.OpExecError
227     if ecode is not None:
228       args = (msg, ecode)
229     else:
230       args = (msg, )
231     raise ec(*args) # pylint: disable=W0142
232
233   def Warn(self, msg, feedback_fn):
234     """If the result has failed, call the feedback_fn.
235
236     This is used to in cases were LU wants to warn the
237     user about a failure, but continue anyway.
238
239     """
240     if not self.fail_msg:
241       return
242
243     msg = "%s: %s" % (msg, self.fail_msg)
244     feedback_fn(msg)
245
246
247 def _SsconfResolver(ssconf_ips, node_list, _,
248                     ssc=ssconf.SimpleStore,
249                     nslookup_fn=netutils.Hostname.GetIP):
250   """Return addresses for given node names.
251
252   @type ssconf_ips: bool
253   @param ssconf_ips: Use the ssconf IPs
254   @type node_list: list
255   @param node_list: List of node names
256   @type ssc: class
257   @param ssc: SimpleStore class that is used to obtain node->ip mappings
258   @type nslookup_fn: callable
259   @param nslookup_fn: function use to do NS lookup
260   @rtype: list of tuple; (string, string)
261   @return: List of tuples containing node name and IP address
262
263   """
264   ss = ssc()
265   family = ss.GetPrimaryIPFamily()
266
267   if ssconf_ips:
268     iplist = ss.GetNodePrimaryIPList()
269     ipmap = dict(entry.split() for entry in iplist)
270   else:
271     ipmap = {}
272
273   result = []
274   for node in node_list:
275     ip = ipmap.get(node)
276     if ip is None:
277       ip = nslookup_fn(node, family=family)
278     result.append((node, ip, node))
279
280   return result
281
282
283 class _StaticResolver:
284   def __init__(self, addresses):
285     """Initializes this class.
286
287     """
288     self._addresses = addresses
289
290   def __call__(self, hosts, _):
291     """Returns static addresses for hosts.
292
293     """
294     assert len(hosts) == len(self._addresses)
295     return zip(hosts, self._addresses, hosts)
296
297
298 def _CheckConfigNode(node_uuid_or_name, node, accept_offline_node):
299   """Checks if a node is online.
300
301   @type node_uuid_or_name: string
302   @param node_uuid_or_name: Node UUID
303   @type node: L{objects.Node} or None
304   @param node: Node object
305
306   """
307   if node is None:
308     # Assume that the passed parameter was actually a node name, so depend on
309     # DNS for name resolution
310     return (node_uuid_or_name, node_uuid_or_name, node_uuid_or_name)
311   else:
312     if node.offline and not accept_offline_node:
313       ip = _OFFLINE
314     else:
315       ip = node.primary_ip
316     return (node.name, ip, node_uuid_or_name)
317
318
319 def _NodeConfigResolver(single_node_fn, all_nodes_fn, node_uuids, opts):
320   """Calculate node addresses using configuration.
321
322   Note that strings in node_uuids are treated as node names if the UUID is not
323   found in the configuration.
324
325   """
326   accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
327
328   assert accept_offline_node or opts is None, "Unknown option"
329
330   # Special case for single-host lookups
331   if len(node_uuids) == 1:
332     (uuid, ) = node_uuids
333     return [_CheckConfigNode(uuid, single_node_fn(uuid), accept_offline_node)]
334   else:
335     all_nodes = all_nodes_fn()
336     return [_CheckConfigNode(uuid, all_nodes.get(uuid, None),
337                              accept_offline_node)
338             for uuid in node_uuids]
339
340
341 class _RpcProcessor:
342   def __init__(self, resolver, port, lock_monitor_cb=None):
343     """Initializes this class.
344
345     @param resolver: callable accepting a list of node UUIDs or hostnames,
346       returning a list of tuples containing name, IP address and original name
347       of the resolved node. IP address can be the name or the special value
348       L{_OFFLINE} to mark offline machines.
349     @type port: int
350     @param port: TCP port
351     @param lock_monitor_cb: Callable for registering with lock monitor
352
353     """
354     self._resolver = resolver
355     self._port = port
356     self._lock_monitor_cb = lock_monitor_cb
357
358   @staticmethod
359   def _PrepareRequests(hosts, port, procedure, body, read_timeout):
360     """Prepares requests by sorting offline hosts into separate list.
361
362     @type body: dict
363     @param body: a dictionary with per-host body data
364
365     """
366     results = {}
367     requests = {}
368
369     assert isinstance(body, dict)
370     assert len(body) == len(hosts)
371     assert compat.all(isinstance(v, str) for v in body.values())
372     assert frozenset(map(lambda x: x[2], hosts)) == frozenset(body.keys()), \
373         "%s != %s" % (hosts, body.keys())
374
375     for (name, ip, original_name) in hosts:
376       if ip is _OFFLINE:
377         # Node is marked as offline
378         results[original_name] = RpcResult(node=name,
379                                            offline=True,
380                                            call=procedure)
381       else:
382         requests[original_name] = \
383           http.client.HttpClientRequest(str(ip), port,
384                                         http.HTTP_POST, str("/%s" % procedure),
385                                         headers=_RPC_CLIENT_HEADERS,
386                                         post_data=body[original_name],
387                                         read_timeout=read_timeout,
388                                         nicename="%s/%s" % (name, procedure),
389                                         curl_config_fn=_ConfigRpcCurl)
390
391     return (results, requests)
392
393   @staticmethod
394   def _CombineResults(results, requests, procedure):
395     """Combines pre-computed results for offline hosts with actual call results.
396
397     """
398     for name, req in requests.items():
399       if req.success and req.resp_status_code == http.HTTP_OK:
400         host_result = RpcResult(data=serializer.LoadJson(req.resp_body),
401                                 node=name, call=procedure)
402       else:
403         # TODO: Better error reporting
404         if req.error:
405           msg = req.error
406         else:
407           msg = req.resp_body
408
409         logging.error("RPC error in %s on node %s: %s", procedure, name, msg)
410         host_result = RpcResult(data=msg, failed=True, node=name,
411                                 call=procedure)
412
413       results[name] = host_result
414
415     return results
416
417   def __call__(self, nodes, procedure, body, read_timeout, resolver_opts,
418                _req_process_fn=None):
419     """Makes an RPC request to a number of nodes.
420
421     @type nodes: sequence
422     @param nodes: node UUIDs or Hostnames
423     @type procedure: string
424     @param procedure: Request path
425     @type body: dictionary
426     @param body: dictionary with request bodies per host
427     @type read_timeout: int or None
428     @param read_timeout: Read timeout for request
429     @rtype: dictionary
430     @return: a dictionary mapping host names to rpc.RpcResult objects
431
432     """
433     assert read_timeout is not None, \
434       "Missing RPC read timeout for procedure '%s'" % procedure
435
436     if _req_process_fn is None:
437       _req_process_fn = http.client.ProcessRequests
438
439     (results, requests) = \
440       self._PrepareRequests(self._resolver(nodes, resolver_opts), self._port,
441                             procedure, body, read_timeout)
442
443     _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
444
445     assert not frozenset(results).intersection(requests)
446
447     return self._CombineResults(results, requests, procedure)
448
449
450 class _RpcClientBase:
451   def __init__(self, resolver, encoder_fn, lock_monitor_cb=None,
452                _req_process_fn=None):
453     """Initializes this class.
454
455     """
456     proc = _RpcProcessor(resolver,
457                          netutils.GetDaemonPort(constants.NODED),
458                          lock_monitor_cb=lock_monitor_cb)
459     self._proc = compat.partial(proc, _req_process_fn=_req_process_fn)
460     self._encoder = compat.partial(self._EncodeArg, encoder_fn)
461
462   @staticmethod
463   def _EncodeArg(encoder_fn, node, (argkind, value)):
464     """Encode argument.
465
466     """
467     if argkind is None:
468       return value
469     else:
470       return encoder_fn(argkind)(node, value)
471
472   def _Call(self, cdef, node_list, args):
473     """Entry point for automatically generated RPC wrappers.
474
475     """
476     (procedure, _, resolver_opts, timeout, argdefs,
477      prep_fn, postproc_fn, _) = cdef
478
479     if callable(timeout):
480       read_timeout = timeout(args)
481     else:
482       read_timeout = timeout
483
484     if callable(resolver_opts):
485       req_resolver_opts = resolver_opts(args)
486     else:
487       req_resolver_opts = resolver_opts
488
489     if len(args) != len(argdefs):
490       raise errors.ProgrammerError("Number of passed arguments doesn't match")
491
492     if prep_fn is None:
493       prep_fn = lambda _, args: args
494     assert callable(prep_fn)
495
496     # encode the arguments for each node individually, pass them and the node
497     # name to the prep_fn, and serialise its return value
498     encode_args_fn = lambda node: map(compat.partial(self._encoder, node),
499                                       zip(map(compat.snd, argdefs), args))
500     pnbody = dict((n, serializer.DumpJson(prep_fn(n, encode_args_fn(n))))
501                   for n in node_list)
502
503     result = self._proc(node_list, procedure, pnbody, read_timeout,
504                         req_resolver_opts)
505
506     if postproc_fn:
507       return dict(map(lambda (key, value): (key, postproc_fn(value)),
508                       result.items()))
509     else:
510       return result
511
512
513 def _ObjectToDict(_, value):
514   """Converts an object to a dictionary.
515
516   @note: See L{objects}.
517
518   """
519   return value.ToDict()
520
521
522 def _ObjectListToDict(node, value):
523   """Converts a list of L{objects} to dictionaries.
524
525   """
526   return map(compat.partial(_ObjectToDict, node), value)
527
528
529 def _PrepareFileUpload(getents_fn, node, filename):
530   """Loads a file and prepares it for an upload to nodes.
531
532   """
533   statcb = utils.FileStatHelper()
534   data = _Compress(node, utils.ReadFile(filename, preread=statcb))
535   st = statcb.st
536
537   if getents_fn is None:
538     getents_fn = runtime.GetEnts
539
540   getents = getents_fn()
541
542   virt_filename = vcluster.MakeVirtualPath(filename)
543
544   return [virt_filename, data, st.st_mode, getents.LookupUid(st.st_uid),
545           getents.LookupGid(st.st_gid), st.st_atime, st.st_mtime]
546
547
548 def _PrepareFinalizeExportDisks(_, snap_disks):
549   """Encodes disks for finalizing export.
550
551   """
552   flat_disks = []
553
554   for disk in snap_disks:
555     if isinstance(disk, bool):
556       flat_disks.append(disk)
557     else:
558       flat_disks.append(disk.ToDict())
559
560   return flat_disks
561
562
563 def _EncodeBlockdevRename(_, value):
564   """Encodes information for renaming block devices.
565
566   """
567   return [(d.ToDict(), uid) for d, uid in value]
568
569
570 def _AddSpindlesToLegacyNodeInfo(result, space_info):
571   """Extracts the spindle information from the space info and adds
572   it to the result dictionary.
573
574   @type result: dict of strings
575   @param result: dictionary holding the result of the legacy node info
576   @type space_info: list of dicts of strings
577   @param space_info: list, each row holding space information of one storage
578     unit
579   @rtype: None
580   @return: does not return anything, manipulates the C{result} variable
581
582   """
583   lvm_pv_info = utils.storage.LookupSpaceInfoByStorageType(
584       space_info, constants.ST_LVM_PV)
585   if lvm_pv_info:
586     result["spindles_free"] = lvm_pv_info["storage_free"]
587     result["spindles_total"] = lvm_pv_info["storage_size"]
588   else:
589     result["spindles_free"] = 0
590     result["spindles_total"] = 0
591
592
593 def _AddStorageInfoToLegacyNodeInfoByTemplate(
594     result, space_info, disk_template):
595   """Extracts the storage space information of the disk template from
596   the space info and adds it to the result dictionary.
597
598   @see: C{_AddSpindlesToLegacyNodeInfo} for parameter information.
599
600   """
601   if utils.storage.DiskTemplateSupportsSpaceReporting(disk_template):
602     disk_info = utils.storage.LookupSpaceInfoByDiskTemplate(
603         space_info, disk_template)
604     result["name"] = disk_info["name"]
605     result["storage_free"] = disk_info["storage_free"]
606     result["storage_size"] = disk_info["storage_size"]
607   else:
608     # FIXME: consider displaying '-' in this case
609     result["storage_free"] = 0
610     result["storage_size"] = 0
611
612
613 def MakeLegacyNodeInfo(data, disk_template):
614   """Formats the data returned by L{rpc.RpcRunner.call_node_info}.
615
616   Converts the data into a single dictionary. This is fine for most use cases,
617   but some require information from more than one volume group or hypervisor.
618
619   """
620   (bootid, space_info, (hv_info, )) = data
621
622   ret = utils.JoinDisjointDicts(hv_info, {"bootid": bootid})
623
624   _AddSpindlesToLegacyNodeInfo(ret, space_info)
625   _AddStorageInfoToLegacyNodeInfoByTemplate(ret, space_info, disk_template)
626
627   return ret
628
629
630 def _AnnotateDParamsDRBD(disk, (drbd_params, data_params, meta_params)):
631   """Annotates just DRBD disks layouts.
632
633   """
634   assert disk.dev_type == constants.DT_DRBD8
635
636   disk.params = objects.FillDict(drbd_params, disk.params)
637   (dev_data, dev_meta) = disk.children
638   dev_data.params = objects.FillDict(data_params, dev_data.params)
639   dev_meta.params = objects.FillDict(meta_params, dev_meta.params)
640
641   return disk
642
643
644 def _AnnotateDParamsGeneric(disk, (params, )):
645   """Generic disk parameter annotation routine.
646
647   """
648   assert disk.dev_type != constants.DT_DRBD8
649
650   disk.params = objects.FillDict(params, disk.params)
651
652   return disk
653
654
655 def AnnotateDiskParams(disks, disk_params):
656   """Annotates the disk objects with the disk parameters.
657
658   @param disks: The list of disks objects to annotate
659   @param disk_params: The disk parameters for annotation
660   @returns: A list of disk objects annotated
661
662   """
663   def AnnotateDisk(disk):
664     if disk.dev_type == constants.DT_DISKLESS:
665       return disk
666
667     ld_params = objects.Disk.ComputeLDParams(disk.dev_type, disk_params)
668
669     if disk.dev_type == constants.DT_DRBD8:
670       return _AnnotateDParamsDRBD(disk, ld_params)
671     else:
672       return _AnnotateDParamsGeneric(disk, ld_params)
673
674   return [AnnotateDisk(disk.Copy()) for disk in disks]
675
676
677 def _GetExclusiveStorageFlag(cfg, node_uuid):
678   ni = cfg.GetNodeInfo(node_uuid)
679   if ni is None:
680     raise errors.OpPrereqError("Invalid node name %s" % node_uuid,
681                                errors.ECODE_NOENT)
682   return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
683
684
685 def _AddExclusiveStorageFlagToLvmStorageUnits(storage_units, es_flag):
686   """Adds the exclusive storage flag to lvm units.
687
688   This function creates a copy of the storage_units lists, with the
689   es_flag being added to all lvm storage units.
690
691   @type storage_units: list of pairs (string, string)
692   @param storage_units: list of 'raw' storage units, consisting only of
693     (storage_type, storage_key)
694   @type es_flag: boolean
695   @param es_flag: exclusive storage flag
696   @rtype: list of tuples (string, string, list)
697   @return: list of storage units (storage_type, storage_key, params) with
698     the params containing the es_flag for lvm-vg storage units
699
700   """
701   result = []
702   for (storage_type, storage_key) in storage_units:
703     if storage_type in [constants.ST_LVM_VG]:
704       result.append((storage_type, storage_key, [es_flag]))
705       if es_flag:
706         result.append((constants.ST_LVM_PV, storage_key, [es_flag]))
707     else:
708       result.append((storage_type, storage_key, []))
709   return result
710
711
712 def GetExclusiveStorageForNodes(cfg, node_uuids):
713   """Return the exclusive storage flag for all the given nodes.
714
715   @type cfg: L{config.ConfigWriter}
716   @param cfg: cluster configuration
717   @type node_uuids: list or tuple
718   @param node_uuids: node UUIDs for which to read the flag
719   @rtype: dict
720   @return: mapping from node uuids to exclusive storage flags
721   @raise errors.OpPrereqError: if any given node name has no corresponding
722   node
723
724   """
725   getflag = lambda n: _GetExclusiveStorageFlag(cfg, n)
726   flags = map(getflag, node_uuids)
727   return dict(zip(node_uuids, flags))
728
729
730 def PrepareStorageUnitsForNodes(cfg, storage_units, node_uuids):
731   """Return the lvm storage unit for all the given nodes.
732
733   Main purpose of this function is to map the exclusive storage flag, which
734   can be different for each node, to the default LVM storage unit.
735
736   @type cfg: L{config.ConfigWriter}
737   @param cfg: cluster configuration
738   @type storage_units: list of pairs (string, string)
739   @param storage_units: list of 'raw' storage units, e.g. pairs of
740     (storage_type, storage_key)
741   @type node_uuids: list or tuple
742   @param node_uuids: node UUIDs for which to read the flag
743   @rtype: dict
744   @return: mapping from node uuids to a list of storage units which include
745     the exclusive storage flag for lvm storage
746   @raise errors.OpPrereqError: if any given node name has no corresponding
747   node
748
749   """
750   getunit = lambda n: _AddExclusiveStorageFlagToLvmStorageUnits(
751       storage_units, _GetExclusiveStorageFlag(cfg, n))
752   flags = map(getunit, node_uuids)
753   return dict(zip(node_uuids, flags))
754
755
756 #: Generic encoders
757 _ENCODERS = {
758   rpc_defs.ED_OBJECT_DICT: _ObjectToDict,
759   rpc_defs.ED_OBJECT_DICT_LIST: _ObjectListToDict,
760   rpc_defs.ED_COMPRESS: _Compress,
761   rpc_defs.ED_FINALIZE_EXPORT_DISKS: _PrepareFinalizeExportDisks,
762   rpc_defs.ED_BLOCKDEV_RENAME: _EncodeBlockdevRename,
763   }
764
765
766 class RpcRunner(_RpcClientBase,
767                 _generated_rpc.RpcClientDefault,
768                 _generated_rpc.RpcClientBootstrap,
769                 _generated_rpc.RpcClientDnsOnly,
770                 _generated_rpc.RpcClientConfig):
771   """RPC runner class.
772
773   """
774   def __init__(self, cfg, lock_monitor_cb, _req_process_fn=None, _getents=None):
775     """Initialized the RPC runner.
776
777     @type cfg: L{config.ConfigWriter}
778     @param cfg: Configuration
779     @type lock_monitor_cb: callable
780     @param lock_monitor_cb: Lock monitor callback
781
782     """
783     self._cfg = cfg
784
785     encoders = _ENCODERS.copy()
786
787     encoders.update({
788       # Encoders requiring configuration object
789       rpc_defs.ED_INST_DICT: self._InstDict,
790       rpc_defs.ED_INST_DICT_HVP_BEP_DP: self._InstDictHvpBepDp,
791       rpc_defs.ED_INST_DICT_OSP_DP: self._InstDictOspDp,
792       rpc_defs.ED_NIC_DICT: self._NicDict,
793       rpc_defs.ED_DEVICE_DICT: self._DeviceDict,
794
795       # Encoders annotating disk parameters
796       rpc_defs.ED_DISKS_DICT_DP: self._DisksDictDP,
797       rpc_defs.ED_MULTI_DISKS_DICT_DP: self._MultiDiskDictDP,
798       rpc_defs.ED_SINGLE_DISK_DICT_DP: self._SingleDiskDictDP,
799       rpc_defs.ED_NODE_TO_DISK_DICT_DP: self._EncodeNodeToDiskDictDP,
800
801       # Encoders with special requirements
802       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
803
804       rpc_defs.ED_IMPEXP_IO: self._EncodeImportExportIO,
805       })
806
807     # Resolver using configuration
808     resolver = compat.partial(_NodeConfigResolver, cfg.GetNodeInfo,
809                               cfg.GetAllNodesInfo)
810
811     # Pylint doesn't recognize multiple inheritance properly, see
812     # <http://www.logilab.org/ticket/36586> and
813     # <http://www.logilab.org/ticket/35642>
814     # pylint: disable=W0233
815     _RpcClientBase.__init__(self, resolver, encoders.get,
816                             lock_monitor_cb=lock_monitor_cb,
817                             _req_process_fn=_req_process_fn)
818     _generated_rpc.RpcClientConfig.__init__(self)
819     _generated_rpc.RpcClientBootstrap.__init__(self)
820     _generated_rpc.RpcClientDnsOnly.__init__(self)
821     _generated_rpc.RpcClientDefault.__init__(self)
822
823   def _NicDict(self, _, nic):
824     """Convert the given nic to a dict and encapsulate netinfo
825
826     """
827     n = copy.deepcopy(nic)
828     if n.network:
829       net_uuid = self._cfg.LookupNetwork(n.network)
830       if net_uuid:
831         nobj = self._cfg.GetNetwork(net_uuid)
832         n.netinfo = objects.Network.ToDict(nobj)
833     return n.ToDict()
834
835   def _DeviceDict(self, _, (device, instance)):
836     if isinstance(device, objects.NIC):
837       return self._NicDict(None, device)
838     elif isinstance(device, objects.Disk):
839       return self._SingleDiskDictDP(None, (device, instance))
840
841   def _InstDict(self, node, instance, hvp=None, bep=None, osp=None):
842     """Convert the given instance to a dict.
843
844     This is done via the instance's ToDict() method and additionally
845     we fill the hvparams with the cluster defaults.
846
847     @type instance: L{objects.Instance}
848     @param instance: an Instance object
849     @type hvp: dict or None
850     @param hvp: a dictionary with overridden hypervisor parameters
851     @type bep: dict or None
852     @param bep: a dictionary with overridden backend parameters
853     @type osp: dict or None
854     @param osp: a dictionary with overridden os parameters
855     @rtype: dict
856     @return: the instance dict, with the hvparams filled with the
857         cluster defaults
858
859     """
860     idict = instance.ToDict()
861     cluster = self._cfg.GetClusterInfo()
862     idict["hvparams"] = cluster.FillHV(instance)
863     if hvp is not None:
864       idict["hvparams"].update(hvp)
865     idict["beparams"] = cluster.FillBE(instance)
866     if bep is not None:
867       idict["beparams"].update(bep)
868     idict["osparams"] = cluster.SimpleFillOS(instance.os, instance.osparams)
869     if osp is not None:
870       idict["osparams"].update(osp)
871     idict["disks"] = self._DisksDictDP(node, (instance.disks, instance))
872     for nic in idict["nics"]:
873       nic["nicparams"] = objects.FillDict(
874         cluster.nicparams[constants.PP_DEFAULT],
875         nic["nicparams"])
876       network = nic.get("network", None)
877       if network:
878         net_uuid = self._cfg.LookupNetwork(network)
879         if net_uuid:
880           nobj = self._cfg.GetNetwork(net_uuid)
881           nic["netinfo"] = objects.Network.ToDict(nobj)
882     return idict
883
884   def _InstDictHvpBepDp(self, node, (instance, hvp, bep)):
885     """Wrapper for L{_InstDict}.
886
887     """
888     return self._InstDict(node, instance, hvp=hvp, bep=bep)
889
890   def _InstDictOspDp(self, node, (instance, osparams)):
891     """Wrapper for L{_InstDict}.
892
893     """
894     return self._InstDict(node, instance, osp=osparams)
895
896   def _DisksDictDP(self, node, (disks, instance)):
897     """Wrapper for L{AnnotateDiskParams}.
898
899     """
900     diskparams = self._cfg.GetInstanceDiskParams(instance)
901     ret = []
902     for disk in AnnotateDiskParams(disks, diskparams):
903       disk_node_uuids = disk.GetNodes(instance.primary_node)
904       node_ips = dict((uuid, node.secondary_ip) for (uuid, node)
905                       in self._cfg.GetMultiNodeInfo(disk_node_uuids))
906
907       disk.UpdateDynamicDiskParams(node, node_ips)
908
909       ret.append(disk.ToDict(include_dynamic_params=True))
910
911     return ret
912
913   def _MultiDiskDictDP(self, node, disks_insts):
914     """Wrapper for L{AnnotateDiskParams}.
915
916     Supports a list of (disk, instance) tuples.
917     """
918     return [disk for disk_inst in disks_insts
919             for disk in self._DisksDictDP(node, disk_inst)]
920
921   def _SingleDiskDictDP(self, node, (disk, instance)):
922     """Wrapper for L{AnnotateDiskParams}.
923
924     """
925     (anno_disk,) = self._DisksDictDP(node, ([disk], instance))
926     return anno_disk
927
928   def _EncodeNodeToDiskDictDP(self, node, value):
929     """Encode dict of node name -> list of (disk, instance) tuples as values.
930
931     """
932     return dict((name, [self._SingleDiskDictDP(node, disk) for disk in disks])
933                 for name, disks in value.items())
934
935   def _EncodeImportExportIO(self, node, (ieio, ieioargs)):
936     """Encodes import/export I/O information.
937
938     """
939     if ieio == constants.IEIO_RAW_DISK:
940       assert len(ieioargs) == 1
941       return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ))
942
943     if ieio == constants.IEIO_SCRIPT:
944       assert len(ieioargs) == 2
945       return (ieio, (self._SingleDiskDictDP(node, ieioargs[0]), ieioargs[1]))
946
947     return (ieio, ieioargs)
948
949
950 class JobQueueRunner(_RpcClientBase, _generated_rpc.RpcClientJobQueue):
951   """RPC wrappers for job queue.
952
953   """
954   def __init__(self, context, address_list):
955     """Initializes this class.
956
957     """
958     if address_list is None:
959       resolver = compat.partial(_SsconfResolver, True)
960     else:
961       # Caller provided an address list
962       resolver = _StaticResolver(address_list)
963
964     _RpcClientBase.__init__(self, resolver, _ENCODERS.get,
965                             lock_monitor_cb=context.glm.AddToLockMonitor)
966     _generated_rpc.RpcClientJobQueue.__init__(self)
967
968
969 class BootstrapRunner(_RpcClientBase,
970                       _generated_rpc.RpcClientBootstrap,
971                       _generated_rpc.RpcClientDnsOnly):
972   """RPC wrappers for bootstrapping.
973
974   """
975   def __init__(self):
976     """Initializes this class.
977
978     """
979     # Pylint doesn't recognize multiple inheritance properly, see
980     # <http://www.logilab.org/ticket/36586> and
981     # <http://www.logilab.org/ticket/35642>
982     # pylint: disable=W0233
983     _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, True),
984                             _ENCODERS.get)
985     _generated_rpc.RpcClientBootstrap.__init__(self)
986     _generated_rpc.RpcClientDnsOnly.__init__(self)
987
988
989 class DnsOnlyRunner(_RpcClientBase, _generated_rpc.RpcClientDnsOnly):
990   """RPC wrappers for calls using only DNS.
991
992   """
993   def __init__(self):
994     """Initialize this class.
995
996     """
997     _RpcClientBase.__init__(self, compat.partial(_SsconfResolver, False),
998                             _ENCODERS.get)
999     _generated_rpc.RpcClientDnsOnly.__init__(self)
1000
1001
1002 class ConfigRunner(_RpcClientBase, _generated_rpc.RpcClientConfig):
1003   """RPC wrappers for L{config}.
1004
1005   """
1006   def __init__(self, context, address_list, _req_process_fn=None,
1007                _getents=None):
1008     """Initializes this class.
1009
1010     """
1011     if context:
1012       lock_monitor_cb = context.glm.AddToLockMonitor
1013     else:
1014       lock_monitor_cb = None
1015
1016     if address_list is None:
1017       resolver = compat.partial(_SsconfResolver, True)
1018     else:
1019       # Caller provided an address list
1020       resolver = _StaticResolver(address_list)
1021
1022     encoders = _ENCODERS.copy()
1023
1024     encoders.update({
1025       rpc_defs.ED_FILE_DETAILS: compat.partial(_PrepareFileUpload, _getents),
1026       })
1027
1028     _RpcClientBase.__init__(self, resolver, encoders.get,
1029                             lock_monitor_cb=lock_monitor_cb,
1030                             _req_process_fn=_req_process_fn)
1031     _generated_rpc.RpcClientConfig.__init__(self)