91e6864b2d4596388a4a9cb3e178c5f70e509a66
[ganeti-local] / lib / cmdlib / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable=W0201,C0302
25
26 # W0201 since most LU attributes are defined in CheckPrereq or similar
27 # functions
28
29 # C0302: since we have waaaay too many lines in this module
30
31 import time
32 import logging
33
34 from ganeti import utils
35 from ganeti import errors
36 from ganeti import locking
37 from ganeti import constants
38 from ganeti import compat
39 from ganeti import query
40 from ganeti import qlang
41
42 from ganeti.cmdlib.base import ResultWithJobs, LogicalUnit, NoHooksLU, \
43   Tasklet, _QueryBase
44 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_ONLINE, \
45   INSTANCE_NOT_RUNNING, CAN_CHANGE_INSTANCE_OFFLINE, \
46   _ExpandInstanceName, _ExpandItemName, \
47   _ExpandNodeName, _ShareAll, _CheckNodeGroupInstances, _GetWantedNodes, \
48   _GetWantedInstances, _RunPostHook, _RedistributeAncillaryFiles, \
49   _MergeAndVerifyHvState, _MergeAndVerifyDiskState, _GetUpdatedIPolicy, \
50   _ComputeNewInstanceViolations, _GetUpdatedParams, _CheckOSParams, \
51   _CheckHVParams, _AdjustCandidatePool, _CheckNodePVs, \
52   _ComputeIPolicyInstanceViolation, _AnnotateDiskParams, _SupportsOob, \
53   _ComputeIPolicySpecViolation, _GetDefaultIAllocator, \
54   _CheckInstancesNodeGroups, _LoadNodeEvacResult, _MapInstanceDisksToNodes, \
55   _CheckInstanceNodeGroups, _CheckParamsNotGlobal, \
56   _IsExclusiveStorageEnabledNode, _CheckInstanceState, \
57   _CheckIAllocatorOrNode, _FindFaultyInstanceDisks, _CheckNodeOnline
58 from ganeti.cmdlib.instance_utils import _AssembleInstanceDisks, \
59   _BuildInstanceHookEnvByObject, _GetClusterDomainSecret, \
60   _CheckNodeNotDrained, _RemoveDisks, _ShutdownInstanceDisks, \
61   _StartInstanceDisks, _RemoveInstance
62
63 from ganeti.cmdlib.cluster import LUClusterActivateMasterIp, \
64   LUClusterDeactivateMasterIp, LUClusterConfigQuery, LUClusterDestroy, \
65   LUClusterPostInit, _ClusterQuery, LUClusterQuery, LUClusterRedistConf, \
66   LUClusterRename, LUClusterRepairDiskSizes, LUClusterSetParams, \
67   LUClusterVerify, LUClusterVerifyConfig, LUClusterVerifyGroup, \
68   LUClusterVerifyDisks
69 from ganeti.cmdlib.group import LUGroupAdd, LUGroupAssignNodes, \
70   _GroupQuery, LUGroupQuery, LUGroupSetParams, LUGroupRemove, \
71   LUGroupRename, LUGroupEvacuate, LUGroupVerifyDisks
72 from ganeti.cmdlib.node import LUNodeAdd, LUNodeSetParams, \
73   LUNodePowercycle, LUNodeEvacuate, LUNodeMigrate, LUNodeModifyStorage, \
74   _NodeQuery, LUNodeQuery, LUNodeQueryvols, LUNodeQueryStorage, \
75   LUNodeRemove, LURepairNodeStorage
76 from ganeti.cmdlib.instance import LUInstanceCreate, LUInstanceRename, \
77   LUInstanceRemove, LUInstanceMove, _InstanceQuery, LUInstanceQuery, \
78   LUInstanceQueryData, LUInstanceRecreateDisks, LUInstanceGrowDisk, \
79   LUInstanceReplaceDisks, LUInstanceActivateDisks, \
80   LUInstanceDeactivateDisks, LUInstanceStartup, LUInstanceShutdown, \
81   LUInstanceReinstall, LUInstanceReboot, LUInstanceConsole, \
82   LUInstanceFailover, LUInstanceMigrate, LUInstanceMultiAlloc, \
83   LUInstanceSetParams, LUInstanceChangeGroup
84 from ganeti.cmdlib.backup import _ExportQuery, LUBackupQuery, \
85   LUBackupPrepare, LUBackupExport, LUBackupRemove
86 from ganeti.cmdlib.tags import LUTagsGet, LUTagsSearch, LUTagsSet, LUTagsDel
87 from ganeti.cmdlib.network import LUNetworkAdd, LUNetworkRemove, \
88   LUNetworkSetParams, _NetworkQuery, LUNetworkQuery, LUNetworkConnect, \
89   LUNetworkDisconnect
90 from ganeti.cmdlib.test import LUTestDelay, LUTestJqueue, LUTestAllocator
91
92
93 class LUOobCommand(NoHooksLU):
94   """Logical unit for OOB handling.
95
96   """
97   REQ_BGL = False
98   _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
99
100   def ExpandNames(self):
101     """Gather locks we need.
102
103     """
104     if self.op.node_names:
105       self.op.node_names = _GetWantedNodes(self, self.op.node_names)
106       lock_names = self.op.node_names
107     else:
108       lock_names = locking.ALL_SET
109
110     self.needed_locks = {
111       locking.LEVEL_NODE: lock_names,
112       }
113
114     self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
115
116     if not self.op.node_names:
117       # Acquire node allocation lock only if all nodes are affected
118       self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
119
120   def CheckPrereq(self):
121     """Check prerequisites.
122
123     This checks:
124      - the node exists in the configuration
125      - OOB is supported
126
127     Any errors are signaled by raising errors.OpPrereqError.
128
129     """
130     self.nodes = []
131     self.master_node = self.cfg.GetMasterNode()
132
133     assert self.op.power_delay >= 0.0
134
135     if self.op.node_names:
136       if (self.op.command in self._SKIP_MASTER and
137           self.master_node in self.op.node_names):
138         master_node_obj = self.cfg.GetNodeInfo(self.master_node)
139         master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
140
141         if master_oob_handler:
142           additional_text = ("run '%s %s %s' if you want to operate on the"
143                              " master regardless") % (master_oob_handler,
144                                                       self.op.command,
145                                                       self.master_node)
146         else:
147           additional_text = "it does not support out-of-band operations"
148
149         raise errors.OpPrereqError(("Operating on the master node %s is not"
150                                     " allowed for %s; %s") %
151                                    (self.master_node, self.op.command,
152                                     additional_text), errors.ECODE_INVAL)
153     else:
154       self.op.node_names = self.cfg.GetNodeList()
155       if self.op.command in self._SKIP_MASTER:
156         self.op.node_names.remove(self.master_node)
157
158     if self.op.command in self._SKIP_MASTER:
159       assert self.master_node not in self.op.node_names
160
161     for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names):
162       if node is None:
163         raise errors.OpPrereqError("Node %s not found" % node_name,
164                                    errors.ECODE_NOENT)
165       else:
166         self.nodes.append(node)
167
168       if (not self.op.ignore_status and
169           (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
170         raise errors.OpPrereqError(("Cannot power off node %s because it is"
171                                     " not marked offline") % node_name,
172                                    errors.ECODE_STATE)
173
174   def Exec(self, feedback_fn):
175     """Execute OOB and return result if we expect any.
176
177     """
178     master_node = self.master_node
179     ret = []
180
181     for idx, node in enumerate(utils.NiceSort(self.nodes,
182                                               key=lambda node: node.name)):
183       node_entry = [(constants.RS_NORMAL, node.name)]
184       ret.append(node_entry)
185
186       oob_program = _SupportsOob(self.cfg, node)
187
188       if not oob_program:
189         node_entry.append((constants.RS_UNAVAIL, None))
190         continue
191
192       logging.info("Executing out-of-band command '%s' using '%s' on %s",
193                    self.op.command, oob_program, node.name)
194       result = self.rpc.call_run_oob(master_node, oob_program,
195                                      self.op.command, node.name,
196                                      self.op.timeout)
197
198       if result.fail_msg:
199         self.LogWarning("Out-of-band RPC failed on node '%s': %s",
200                         node.name, result.fail_msg)
201         node_entry.append((constants.RS_NODATA, None))
202       else:
203         try:
204           self._CheckPayload(result)
205         except errors.OpExecError, err:
206           self.LogWarning("Payload returned by node '%s' is not valid: %s",
207                           node.name, err)
208           node_entry.append((constants.RS_NODATA, None))
209         else:
210           if self.op.command == constants.OOB_HEALTH:
211             # For health we should log important events
212             for item, status in result.payload:
213               if status in [constants.OOB_STATUS_WARNING,
214                             constants.OOB_STATUS_CRITICAL]:
215                 self.LogWarning("Item '%s' on node '%s' has status '%s'",
216                                 item, node.name, status)
217
218           if self.op.command == constants.OOB_POWER_ON:
219             node.powered = True
220           elif self.op.command == constants.OOB_POWER_OFF:
221             node.powered = False
222           elif self.op.command == constants.OOB_POWER_STATUS:
223             powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
224             if powered != node.powered:
225               logging.warning(("Recorded power state (%s) of node '%s' does not"
226                                " match actual power state (%s)"), node.powered,
227                               node.name, powered)
228
229           # For configuration changing commands we should update the node
230           if self.op.command in (constants.OOB_POWER_ON,
231                                  constants.OOB_POWER_OFF):
232             self.cfg.Update(node, feedback_fn)
233
234           node_entry.append((constants.RS_NORMAL, result.payload))
235
236           if (self.op.command == constants.OOB_POWER_ON and
237               idx < len(self.nodes) - 1):
238             time.sleep(self.op.power_delay)
239
240     return ret
241
242   def _CheckPayload(self, result):
243     """Checks if the payload is valid.
244
245     @param result: RPC result
246     @raises errors.OpExecError: If payload is not valid
247
248     """
249     errs = []
250     if self.op.command == constants.OOB_HEALTH:
251       if not isinstance(result.payload, list):
252         errs.append("command 'health' is expected to return a list but got %s" %
253                     type(result.payload))
254       else:
255         for item, status in result.payload:
256           if status not in constants.OOB_STATUSES:
257             errs.append("health item '%s' has invalid status '%s'" %
258                         (item, status))
259
260     if self.op.command == constants.OOB_POWER_STATUS:
261       if not isinstance(result.payload, dict):
262         errs.append("power-status is expected to return a dict but got %s" %
263                     type(result.payload))
264
265     if self.op.command in [
266       constants.OOB_POWER_ON,
267       constants.OOB_POWER_OFF,
268       constants.OOB_POWER_CYCLE,
269       ]:
270       if result.payload is not None:
271         errs.append("%s is expected to not return payload but got '%s'" %
272                     (self.op.command, result.payload))
273
274     if errs:
275       raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
276                                utils.CommaJoin(errs))
277
278
279 class _OsQuery(_QueryBase):
280   FIELDS = query.OS_FIELDS
281
282   def ExpandNames(self, lu):
283     # Lock all nodes in shared mode
284     # Temporary removal of locks, should be reverted later
285     # TODO: reintroduce locks when they are lighter-weight
286     lu.needed_locks = {}
287     #self.share_locks[locking.LEVEL_NODE] = 1
288     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
289
290     # The following variables interact with _QueryBase._GetNames
291     if self.names:
292       self.wanted = self.names
293     else:
294       self.wanted = locking.ALL_SET
295
296     self.do_locking = self.use_locking
297
298   def DeclareLocks(self, lu, level):
299     pass
300
301   @staticmethod
302   def _DiagnoseByOS(rlist):
303     """Remaps a per-node return list into an a per-os per-node dictionary
304
305     @param rlist: a map with node names as keys and OS objects as values
306
307     @rtype: dict
308     @return: a dictionary with osnames as keys and as value another
309         map, with nodes as keys and tuples of (path, status, diagnose,
310         variants, parameters, api_versions) as values, eg::
311
312           {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
313                                      (/srv/..., False, "invalid api")],
314                            "node2": [(/srv/..., True, "", [], [])]}
315           }
316
317     """
318     all_os = {}
319     # we build here the list of nodes that didn't fail the RPC (at RPC
320     # level), so that nodes with a non-responding node daemon don't
321     # make all OSes invalid
322     good_nodes = [node_name for node_name in rlist
323                   if not rlist[node_name].fail_msg]
324     for node_name, nr in rlist.items():
325       if nr.fail_msg or not nr.payload:
326         continue
327       for (name, path, status, diagnose, variants,
328            params, api_versions) in nr.payload:
329         if name not in all_os:
330           # build a list of nodes for this os containing empty lists
331           # for each node in node_list
332           all_os[name] = {}
333           for nname in good_nodes:
334             all_os[name][nname] = []
335         # convert params from [name, help] to (name, help)
336         params = [tuple(v) for v in params]
337         all_os[name][node_name].append((path, status, diagnose,
338                                         variants, params, api_versions))
339     return all_os
340
341   def _GetQueryData(self, lu):
342     """Computes the list of nodes and their attributes.
343
344     """
345     # Locking is not used
346     assert not (compat.any(lu.glm.is_owned(level)
347                            for level in locking.LEVELS
348                            if level != locking.LEVEL_CLUSTER) or
349                 self.do_locking or self.use_locking)
350
351     valid_nodes = [node.name
352                    for node in lu.cfg.GetAllNodesInfo().values()
353                    if not node.offline and node.vm_capable]
354     pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes))
355     cluster = lu.cfg.GetClusterInfo()
356
357     data = {}
358
359     for (os_name, os_data) in pol.items():
360       info = query.OsInfo(name=os_name, valid=True, node_status=os_data,
361                           hidden=(os_name in cluster.hidden_os),
362                           blacklisted=(os_name in cluster.blacklisted_os))
363
364       variants = set()
365       parameters = set()
366       api_versions = set()
367
368       for idx, osl in enumerate(os_data.values()):
369         info.valid = bool(info.valid and osl and osl[0][1])
370         if not info.valid:
371           break
372
373         (node_variants, node_params, node_api) = osl[0][3:6]
374         if idx == 0:
375           # First entry
376           variants.update(node_variants)
377           parameters.update(node_params)
378           api_versions.update(node_api)
379         else:
380           # Filter out inconsistent values
381           variants.intersection_update(node_variants)
382           parameters.intersection_update(node_params)
383           api_versions.intersection_update(node_api)
384
385       info.variants = list(variants)
386       info.parameters = list(parameters)
387       info.api_versions = list(api_versions)
388
389       data[os_name] = info
390
391     # Prepare data in requested order
392     return [data[name] for name in self._GetNames(lu, pol.keys(), None)
393             if name in data]
394
395
396 class LUOsDiagnose(NoHooksLU):
397   """Logical unit for OS diagnose/query.
398
399   """
400   REQ_BGL = False
401
402   @staticmethod
403   def _BuildFilter(fields, names):
404     """Builds a filter for querying OSes.
405
406     """
407     name_filter = qlang.MakeSimpleFilter("name", names)
408
409     # Legacy behaviour: Hide hidden, blacklisted or invalid OSes if the
410     # respective field is not requested
411     status_filter = [[qlang.OP_NOT, [qlang.OP_TRUE, fname]]
412                      for fname in ["hidden", "blacklisted"]
413                      if fname not in fields]
414     if "valid" not in fields:
415       status_filter.append([qlang.OP_TRUE, "valid"])
416
417     if status_filter:
418       status_filter.insert(0, qlang.OP_AND)
419     else:
420       status_filter = None
421
422     if name_filter and status_filter:
423       return [qlang.OP_AND, name_filter, status_filter]
424     elif name_filter:
425       return name_filter
426     else:
427       return status_filter
428
429   def CheckArguments(self):
430     self.oq = _OsQuery(self._BuildFilter(self.op.output_fields, self.op.names),
431                        self.op.output_fields, False)
432
433   def ExpandNames(self):
434     self.oq.ExpandNames(self)
435
436   def Exec(self, feedback_fn):
437     return self.oq.OldStyleQuery(self)
438
439
440 class _ExtStorageQuery(_QueryBase):
441   FIELDS = query.EXTSTORAGE_FIELDS
442
443   def ExpandNames(self, lu):
444     # Lock all nodes in shared mode
445     # Temporary removal of locks, should be reverted later
446     # TODO: reintroduce locks when they are lighter-weight
447     lu.needed_locks = {}
448     #self.share_locks[locking.LEVEL_NODE] = 1
449     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
450
451     # The following variables interact with _QueryBase._GetNames
452     if self.names:
453       self.wanted = self.names
454     else:
455       self.wanted = locking.ALL_SET
456
457     self.do_locking = self.use_locking
458
459   def DeclareLocks(self, lu, level):
460     pass
461
462   @staticmethod
463   def _DiagnoseByProvider(rlist):
464     """Remaps a per-node return list into an a per-provider per-node dictionary
465
466     @param rlist: a map with node names as keys and ExtStorage objects as values
467
468     @rtype: dict
469     @return: a dictionary with extstorage providers as keys and as
470         value another map, with nodes as keys and tuples of
471         (path, status, diagnose, parameters) as values, eg::
472
473           {"provider1": {"node1": [(/usr/lib/..., True, "", [])]
474                          "node2": [(/srv/..., False, "missing file")]
475                          "node3": [(/srv/..., True, "", [])]
476           }
477
478     """
479     all_es = {}
480     # we build here the list of nodes that didn't fail the RPC (at RPC
481     # level), so that nodes with a non-responding node daemon don't
482     # make all OSes invalid
483     good_nodes = [node_name for node_name in rlist
484                   if not rlist[node_name].fail_msg]
485     for node_name, nr in rlist.items():
486       if nr.fail_msg or not nr.payload:
487         continue
488       for (name, path, status, diagnose, params) in nr.payload:
489         if name not in all_es:
490           # build a list of nodes for this os containing empty lists
491           # for each node in node_list
492           all_es[name] = {}
493           for nname in good_nodes:
494             all_es[name][nname] = []
495         # convert params from [name, help] to (name, help)
496         params = [tuple(v) for v in params]
497         all_es[name][node_name].append((path, status, diagnose, params))
498     return all_es
499
500   def _GetQueryData(self, lu):
501     """Computes the list of nodes and their attributes.
502
503     """
504     # Locking is not used
505     assert not (compat.any(lu.glm.is_owned(level)
506                            for level in locking.LEVELS
507                            if level != locking.LEVEL_CLUSTER) or
508                 self.do_locking or self.use_locking)
509
510     valid_nodes = [node.name
511                    for node in lu.cfg.GetAllNodesInfo().values()
512                    if not node.offline and node.vm_capable]
513     pol = self._DiagnoseByProvider(lu.rpc.call_extstorage_diagnose(valid_nodes))
514
515     data = {}
516
517     nodegroup_list = lu.cfg.GetNodeGroupList()
518
519     for (es_name, es_data) in pol.items():
520       # For every provider compute the nodegroup validity.
521       # To do this we need to check the validity of each node in es_data
522       # and then construct the corresponding nodegroup dict:
523       #      { nodegroup1: status
524       #        nodegroup2: status
525       #      }
526       ndgrp_data = {}
527       for nodegroup in nodegroup_list:
528         ndgrp = lu.cfg.GetNodeGroup(nodegroup)
529
530         nodegroup_nodes = ndgrp.members
531         nodegroup_name = ndgrp.name
532         node_statuses = []
533
534         for node in nodegroup_nodes:
535           if node in valid_nodes:
536             if es_data[node] != []:
537               node_status = es_data[node][0][1]
538               node_statuses.append(node_status)
539             else:
540               node_statuses.append(False)
541
542         if False in node_statuses:
543           ndgrp_data[nodegroup_name] = False
544         else:
545           ndgrp_data[nodegroup_name] = True
546
547       # Compute the provider's parameters
548       parameters = set()
549       for idx, esl in enumerate(es_data.values()):
550         valid = bool(esl and esl[0][1])
551         if not valid:
552           break
553
554         node_params = esl[0][3]
555         if idx == 0:
556           # First entry
557           parameters.update(node_params)
558         else:
559           # Filter out inconsistent values
560           parameters.intersection_update(node_params)
561
562       params = list(parameters)
563
564       # Now fill all the info for this provider
565       info = query.ExtStorageInfo(name=es_name, node_status=es_data,
566                                   nodegroup_status=ndgrp_data,
567                                   parameters=params)
568
569       data[es_name] = info
570
571     # Prepare data in requested order
572     return [data[name] for name in self._GetNames(lu, pol.keys(), None)
573             if name in data]
574
575
576 class LUExtStorageDiagnose(NoHooksLU):
577   """Logical unit for ExtStorage diagnose/query.
578
579   """
580   REQ_BGL = False
581
582   def CheckArguments(self):
583     self.eq = _ExtStorageQuery(qlang.MakeSimpleFilter("name", self.op.names),
584                                self.op.output_fields, False)
585
586   def ExpandNames(self):
587     self.eq.ExpandNames(self)
588
589   def Exec(self, feedback_fn):
590     return self.eq.OldStyleQuery(self)
591
592
593 class LUQuery(NoHooksLU):
594   """Query for resources/items of a certain kind.
595
596   """
597   # pylint: disable=W0142
598   REQ_BGL = False
599
600   def CheckArguments(self):
601     qcls = _GetQueryImplementation(self.op.what)
602
603     self.impl = qcls(self.op.qfilter, self.op.fields, self.op.use_locking)
604
605   def ExpandNames(self):
606     self.impl.ExpandNames(self)
607
608   def DeclareLocks(self, level):
609     self.impl.DeclareLocks(self, level)
610
611   def Exec(self, feedback_fn):
612     return self.impl.NewStyleQuery(self)
613
614
615 class LUQueryFields(NoHooksLU):
616   """Query for resources/items of a certain kind.
617
618   """
619   # pylint: disable=W0142
620   REQ_BGL = False
621
622   def CheckArguments(self):
623     self.qcls = _GetQueryImplementation(self.op.what)
624
625   def ExpandNames(self):
626     self.needed_locks = {}
627
628   def Exec(self, feedback_fn):
629     return query.QueryFields(self.qcls.FIELDS, self.op.fields)
630
631
632 class LURestrictedCommand(NoHooksLU):
633   """Logical unit for executing restricted commands.
634
635   """
636   REQ_BGL = False
637
638   def ExpandNames(self):
639     if self.op.nodes:
640       self.op.nodes = _GetWantedNodes(self, self.op.nodes)
641
642     self.needed_locks = {
643       locking.LEVEL_NODE: self.op.nodes,
644       }
645     self.share_locks = {
646       locking.LEVEL_NODE: not self.op.use_locking,
647       }
648
649   def CheckPrereq(self):
650     """Check prerequisites.
651
652     """
653
654   def Exec(self, feedback_fn):
655     """Execute restricted command and return output.
656
657     """
658     owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
659
660     # Check if correct locks are held
661     assert set(self.op.nodes).issubset(owned_nodes)
662
663     rpcres = self.rpc.call_restricted_command(self.op.nodes, self.op.command)
664
665     result = []
666
667     for node_name in self.op.nodes:
668       nres = rpcres[node_name]
669       if nres.fail_msg:
670         msg = ("Command '%s' on node '%s' failed: %s" %
671                (self.op.command, node_name, nres.fail_msg))
672         result.append((False, msg))
673       else:
674         result.append((True, nres.payload))
675
676     return result
677
678
679 #: Query type implementations
680 _QUERY_IMPL = {
681   constants.QR_CLUSTER: _ClusterQuery,
682   constants.QR_INSTANCE: _InstanceQuery,
683   constants.QR_NODE: _NodeQuery,
684   constants.QR_GROUP: _GroupQuery,
685   constants.QR_NETWORK: _NetworkQuery,
686   constants.QR_OS: _OsQuery,
687   constants.QR_EXTSTORAGE: _ExtStorageQuery,
688   constants.QR_EXPORT: _ExportQuery,
689   }
690
691 assert set(_QUERY_IMPL.keys()) == constants.QR_VIA_OP
692
693
694 def _GetQueryImplementation(name):
695   """Returns the implemtnation for a query type.
696
697   @param name: Query type, must be one of L{constants.QR_VIA_OP}
698
699   """
700   try:
701     return _QUERY_IMPL[name]
702   except KeyError:
703     raise errors.OpPrereqError("Unknown query resource '%s'" % name,
704                                errors.ECODE_INVAL)