4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable=W0201,C0302
26 # W0201 since most LU attributes are defined in CheckPrereq or similar
29 # C0302: since we have waaaay too many lines in this module
34 from ganeti import utils
35 from ganeti import errors
36 from ganeti import locking
37 from ganeti import constants
38 from ganeti import compat
39 from ganeti import query
40 from ganeti import qlang
42 from ganeti.cmdlib.base import ResultWithJobs, LogicalUnit, NoHooksLU, \
44 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_ONLINE, \
45 INSTANCE_NOT_RUNNING, CAN_CHANGE_INSTANCE_OFFLINE, \
46 _ExpandInstanceName, _ExpandItemName, \
47 _ExpandNodeName, _ShareAll, _CheckNodeGroupInstances, _GetWantedNodes, \
48 _GetWantedInstances, _RunPostHook, _RedistributeAncillaryFiles, \
49 _MergeAndVerifyHvState, _MergeAndVerifyDiskState, _GetUpdatedIPolicy, \
50 _ComputeNewInstanceViolations, _GetUpdatedParams, _CheckOSParams, \
51 _CheckHVParams, _AdjustCandidatePool, _CheckNodePVs, \
52 _ComputeIPolicyInstanceViolation, _AnnotateDiskParams, _SupportsOob, \
53 _ComputeIPolicySpecViolation, _GetDefaultIAllocator, \
54 _CheckInstancesNodeGroups, _LoadNodeEvacResult, _MapInstanceDisksToNodes, \
55 _CheckInstanceNodeGroups, _CheckParamsNotGlobal, \
56 _IsExclusiveStorageEnabledNode, _CheckInstanceState, \
57 _CheckIAllocatorOrNode, _FindFaultyInstanceDisks, _CheckNodeOnline
58 from ganeti.cmdlib.instance_utils import _AssembleInstanceDisks, \
59 _BuildInstanceHookEnvByObject, _GetClusterDomainSecret, \
60 _CheckNodeNotDrained, _RemoveDisks, _ShutdownInstanceDisks, \
61 _StartInstanceDisks, _RemoveInstance
63 from ganeti.cmdlib.cluster import LUClusterActivateMasterIp, \
64 LUClusterDeactivateMasterIp, LUClusterConfigQuery, LUClusterDestroy, \
65 LUClusterPostInit, _ClusterQuery, LUClusterQuery, LUClusterRedistConf, \
66 LUClusterRename, LUClusterRepairDiskSizes, LUClusterSetParams, \
67 LUClusterVerify, LUClusterVerifyConfig, LUClusterVerifyGroup, \
69 from ganeti.cmdlib.group import LUGroupAdd, LUGroupAssignNodes, \
70 _GroupQuery, LUGroupQuery, LUGroupSetParams, LUGroupRemove, \
71 LUGroupRename, LUGroupEvacuate, LUGroupVerifyDisks
72 from ganeti.cmdlib.node import LUNodeAdd, LUNodeSetParams, \
73 LUNodePowercycle, LUNodeEvacuate, LUNodeMigrate, LUNodeModifyStorage, \
74 _NodeQuery, LUNodeQuery, LUNodeQueryvols, LUNodeQueryStorage, \
75 LUNodeRemove, LURepairNodeStorage
76 from ganeti.cmdlib.instance import LUInstanceCreate, LUInstanceRename, \
77 LUInstanceRemove, LUInstanceMove, _InstanceQuery, LUInstanceQuery, \
78 LUInstanceQueryData, LUInstanceRecreateDisks, LUInstanceGrowDisk, \
79 LUInstanceReplaceDisks, LUInstanceActivateDisks, \
80 LUInstanceDeactivateDisks, LUInstanceStartup, LUInstanceShutdown, \
81 LUInstanceReinstall, LUInstanceReboot, LUInstanceConsole, \
82 LUInstanceFailover, LUInstanceMigrate, LUInstanceMultiAlloc, \
83 LUInstanceSetParams, LUInstanceChangeGroup
84 from ganeti.cmdlib.backup import _ExportQuery, LUBackupQuery, \
85 LUBackupPrepare, LUBackupExport, LUBackupRemove
86 from ganeti.cmdlib.tags import LUTagsGet, LUTagsSearch, LUTagsSet, LUTagsDel
87 from ganeti.cmdlib.network import LUNetworkAdd, LUNetworkRemove, \
88 LUNetworkSetParams, _NetworkQuery, LUNetworkQuery, LUNetworkConnect, \
90 from ganeti.cmdlib.test import LUTestDelay, LUTestJqueue, LUTestAllocator
93 class LUOobCommand(NoHooksLU):
94 """Logical unit for OOB handling.
98 _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
100 def ExpandNames(self):
101 """Gather locks we need.
104 if self.op.node_names:
105 self.op.node_names = _GetWantedNodes(self, self.op.node_names)
106 lock_names = self.op.node_names
108 lock_names = locking.ALL_SET
110 self.needed_locks = {
111 locking.LEVEL_NODE: lock_names,
114 self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
116 if not self.op.node_names:
117 # Acquire node allocation lock only if all nodes are affected
118 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
120 def CheckPrereq(self):
121 """Check prerequisites.
124 - the node exists in the configuration
127 Any errors are signaled by raising errors.OpPrereqError.
131 self.master_node = self.cfg.GetMasterNode()
133 assert self.op.power_delay >= 0.0
135 if self.op.node_names:
136 if (self.op.command in self._SKIP_MASTER and
137 self.master_node in self.op.node_names):
138 master_node_obj = self.cfg.GetNodeInfo(self.master_node)
139 master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
141 if master_oob_handler:
142 additional_text = ("run '%s %s %s' if you want to operate on the"
143 " master regardless") % (master_oob_handler,
147 additional_text = "it does not support out-of-band operations"
149 raise errors.OpPrereqError(("Operating on the master node %s is not"
150 " allowed for %s; %s") %
151 (self.master_node, self.op.command,
152 additional_text), errors.ECODE_INVAL)
154 self.op.node_names = self.cfg.GetNodeList()
155 if self.op.command in self._SKIP_MASTER:
156 self.op.node_names.remove(self.master_node)
158 if self.op.command in self._SKIP_MASTER:
159 assert self.master_node not in self.op.node_names
161 for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names):
163 raise errors.OpPrereqError("Node %s not found" % node_name,
166 self.nodes.append(node)
168 if (not self.op.ignore_status and
169 (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
170 raise errors.OpPrereqError(("Cannot power off node %s because it is"
171 " not marked offline") % node_name,
174 def Exec(self, feedback_fn):
175 """Execute OOB and return result if we expect any.
178 master_node = self.master_node
181 for idx, node in enumerate(utils.NiceSort(self.nodes,
182 key=lambda node: node.name)):
183 node_entry = [(constants.RS_NORMAL, node.name)]
184 ret.append(node_entry)
186 oob_program = _SupportsOob(self.cfg, node)
189 node_entry.append((constants.RS_UNAVAIL, None))
192 logging.info("Executing out-of-band command '%s' using '%s' on %s",
193 self.op.command, oob_program, node.name)
194 result = self.rpc.call_run_oob(master_node, oob_program,
195 self.op.command, node.name,
199 self.LogWarning("Out-of-band RPC failed on node '%s': %s",
200 node.name, result.fail_msg)
201 node_entry.append((constants.RS_NODATA, None))
204 self._CheckPayload(result)
205 except errors.OpExecError, err:
206 self.LogWarning("Payload returned by node '%s' is not valid: %s",
208 node_entry.append((constants.RS_NODATA, None))
210 if self.op.command == constants.OOB_HEALTH:
211 # For health we should log important events
212 for item, status in result.payload:
213 if status in [constants.OOB_STATUS_WARNING,
214 constants.OOB_STATUS_CRITICAL]:
215 self.LogWarning("Item '%s' on node '%s' has status '%s'",
216 item, node.name, status)
218 if self.op.command == constants.OOB_POWER_ON:
220 elif self.op.command == constants.OOB_POWER_OFF:
222 elif self.op.command == constants.OOB_POWER_STATUS:
223 powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
224 if powered != node.powered:
225 logging.warning(("Recorded power state (%s) of node '%s' does not"
226 " match actual power state (%s)"), node.powered,
229 # For configuration changing commands we should update the node
230 if self.op.command in (constants.OOB_POWER_ON,
231 constants.OOB_POWER_OFF):
232 self.cfg.Update(node, feedback_fn)
234 node_entry.append((constants.RS_NORMAL, result.payload))
236 if (self.op.command == constants.OOB_POWER_ON and
237 idx < len(self.nodes) - 1):
238 time.sleep(self.op.power_delay)
242 def _CheckPayload(self, result):
243 """Checks if the payload is valid.
245 @param result: RPC result
246 @raises errors.OpExecError: If payload is not valid
250 if self.op.command == constants.OOB_HEALTH:
251 if not isinstance(result.payload, list):
252 errs.append("command 'health' is expected to return a list but got %s" %
253 type(result.payload))
255 for item, status in result.payload:
256 if status not in constants.OOB_STATUSES:
257 errs.append("health item '%s' has invalid status '%s'" %
260 if self.op.command == constants.OOB_POWER_STATUS:
261 if not isinstance(result.payload, dict):
262 errs.append("power-status is expected to return a dict but got %s" %
263 type(result.payload))
265 if self.op.command in [
266 constants.OOB_POWER_ON,
267 constants.OOB_POWER_OFF,
268 constants.OOB_POWER_CYCLE,
270 if result.payload is not None:
271 errs.append("%s is expected to not return payload but got '%s'" %
272 (self.op.command, result.payload))
275 raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
276 utils.CommaJoin(errs))
279 class _OsQuery(_QueryBase):
280 FIELDS = query.OS_FIELDS
282 def ExpandNames(self, lu):
283 # Lock all nodes in shared mode
284 # Temporary removal of locks, should be reverted later
285 # TODO: reintroduce locks when they are lighter-weight
287 #self.share_locks[locking.LEVEL_NODE] = 1
288 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
290 # The following variables interact with _QueryBase._GetNames
292 self.wanted = self.names
294 self.wanted = locking.ALL_SET
296 self.do_locking = self.use_locking
298 def DeclareLocks(self, lu, level):
302 def _DiagnoseByOS(rlist):
303 """Remaps a per-node return list into an a per-os per-node dictionary
305 @param rlist: a map with node names as keys and OS objects as values
308 @return: a dictionary with osnames as keys and as value another
309 map, with nodes as keys and tuples of (path, status, diagnose,
310 variants, parameters, api_versions) as values, eg::
312 {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
313 (/srv/..., False, "invalid api")],
314 "node2": [(/srv/..., True, "", [], [])]}
319 # we build here the list of nodes that didn't fail the RPC (at RPC
320 # level), so that nodes with a non-responding node daemon don't
321 # make all OSes invalid
322 good_nodes = [node_name for node_name in rlist
323 if not rlist[node_name].fail_msg]
324 for node_name, nr in rlist.items():
325 if nr.fail_msg or not nr.payload:
327 for (name, path, status, diagnose, variants,
328 params, api_versions) in nr.payload:
329 if name not in all_os:
330 # build a list of nodes for this os containing empty lists
331 # for each node in node_list
333 for nname in good_nodes:
334 all_os[name][nname] = []
335 # convert params from [name, help] to (name, help)
336 params = [tuple(v) for v in params]
337 all_os[name][node_name].append((path, status, diagnose,
338 variants, params, api_versions))
341 def _GetQueryData(self, lu):
342 """Computes the list of nodes and their attributes.
345 # Locking is not used
346 assert not (compat.any(lu.glm.is_owned(level)
347 for level in locking.LEVELS
348 if level != locking.LEVEL_CLUSTER) or
349 self.do_locking or self.use_locking)
351 valid_nodes = [node.name
352 for node in lu.cfg.GetAllNodesInfo().values()
353 if not node.offline and node.vm_capable]
354 pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes))
355 cluster = lu.cfg.GetClusterInfo()
359 for (os_name, os_data) in pol.items():
360 info = query.OsInfo(name=os_name, valid=True, node_status=os_data,
361 hidden=(os_name in cluster.hidden_os),
362 blacklisted=(os_name in cluster.blacklisted_os))
368 for idx, osl in enumerate(os_data.values()):
369 info.valid = bool(info.valid and osl and osl[0][1])
373 (node_variants, node_params, node_api) = osl[0][3:6]
376 variants.update(node_variants)
377 parameters.update(node_params)
378 api_versions.update(node_api)
380 # Filter out inconsistent values
381 variants.intersection_update(node_variants)
382 parameters.intersection_update(node_params)
383 api_versions.intersection_update(node_api)
385 info.variants = list(variants)
386 info.parameters = list(parameters)
387 info.api_versions = list(api_versions)
391 # Prepare data in requested order
392 return [data[name] for name in self._GetNames(lu, pol.keys(), None)
396 class LUOsDiagnose(NoHooksLU):
397 """Logical unit for OS diagnose/query.
403 def _BuildFilter(fields, names):
404 """Builds a filter for querying OSes.
407 name_filter = qlang.MakeSimpleFilter("name", names)
409 # Legacy behaviour: Hide hidden, blacklisted or invalid OSes if the
410 # respective field is not requested
411 status_filter = [[qlang.OP_NOT, [qlang.OP_TRUE, fname]]
412 for fname in ["hidden", "blacklisted"]
413 if fname not in fields]
414 if "valid" not in fields:
415 status_filter.append([qlang.OP_TRUE, "valid"])
418 status_filter.insert(0, qlang.OP_AND)
422 if name_filter and status_filter:
423 return [qlang.OP_AND, name_filter, status_filter]
429 def CheckArguments(self):
430 self.oq = _OsQuery(self._BuildFilter(self.op.output_fields, self.op.names),
431 self.op.output_fields, False)
433 def ExpandNames(self):
434 self.oq.ExpandNames(self)
436 def Exec(self, feedback_fn):
437 return self.oq.OldStyleQuery(self)
440 class _ExtStorageQuery(_QueryBase):
441 FIELDS = query.EXTSTORAGE_FIELDS
443 def ExpandNames(self, lu):
444 # Lock all nodes in shared mode
445 # Temporary removal of locks, should be reverted later
446 # TODO: reintroduce locks when they are lighter-weight
448 #self.share_locks[locking.LEVEL_NODE] = 1
449 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
451 # The following variables interact with _QueryBase._GetNames
453 self.wanted = self.names
455 self.wanted = locking.ALL_SET
457 self.do_locking = self.use_locking
459 def DeclareLocks(self, lu, level):
463 def _DiagnoseByProvider(rlist):
464 """Remaps a per-node return list into an a per-provider per-node dictionary
466 @param rlist: a map with node names as keys and ExtStorage objects as values
469 @return: a dictionary with extstorage providers as keys and as
470 value another map, with nodes as keys and tuples of
471 (path, status, diagnose, parameters) as values, eg::
473 {"provider1": {"node1": [(/usr/lib/..., True, "", [])]
474 "node2": [(/srv/..., False, "missing file")]
475 "node3": [(/srv/..., True, "", [])]
480 # we build here the list of nodes that didn't fail the RPC (at RPC
481 # level), so that nodes with a non-responding node daemon don't
482 # make all OSes invalid
483 good_nodes = [node_name for node_name in rlist
484 if not rlist[node_name].fail_msg]
485 for node_name, nr in rlist.items():
486 if nr.fail_msg or not nr.payload:
488 for (name, path, status, diagnose, params) in nr.payload:
489 if name not in all_es:
490 # build a list of nodes for this os containing empty lists
491 # for each node in node_list
493 for nname in good_nodes:
494 all_es[name][nname] = []
495 # convert params from [name, help] to (name, help)
496 params = [tuple(v) for v in params]
497 all_es[name][node_name].append((path, status, diagnose, params))
500 def _GetQueryData(self, lu):
501 """Computes the list of nodes and their attributes.
504 # Locking is not used
505 assert not (compat.any(lu.glm.is_owned(level)
506 for level in locking.LEVELS
507 if level != locking.LEVEL_CLUSTER) or
508 self.do_locking or self.use_locking)
510 valid_nodes = [node.name
511 for node in lu.cfg.GetAllNodesInfo().values()
512 if not node.offline and node.vm_capable]
513 pol = self._DiagnoseByProvider(lu.rpc.call_extstorage_diagnose(valid_nodes))
517 nodegroup_list = lu.cfg.GetNodeGroupList()
519 for (es_name, es_data) in pol.items():
520 # For every provider compute the nodegroup validity.
521 # To do this we need to check the validity of each node in es_data
522 # and then construct the corresponding nodegroup dict:
523 # { nodegroup1: status
527 for nodegroup in nodegroup_list:
528 ndgrp = lu.cfg.GetNodeGroup(nodegroup)
530 nodegroup_nodes = ndgrp.members
531 nodegroup_name = ndgrp.name
534 for node in nodegroup_nodes:
535 if node in valid_nodes:
536 if es_data[node] != []:
537 node_status = es_data[node][0][1]
538 node_statuses.append(node_status)
540 node_statuses.append(False)
542 if False in node_statuses:
543 ndgrp_data[nodegroup_name] = False
545 ndgrp_data[nodegroup_name] = True
547 # Compute the provider's parameters
549 for idx, esl in enumerate(es_data.values()):
550 valid = bool(esl and esl[0][1])
554 node_params = esl[0][3]
557 parameters.update(node_params)
559 # Filter out inconsistent values
560 parameters.intersection_update(node_params)
562 params = list(parameters)
564 # Now fill all the info for this provider
565 info = query.ExtStorageInfo(name=es_name, node_status=es_data,
566 nodegroup_status=ndgrp_data,
571 # Prepare data in requested order
572 return [data[name] for name in self._GetNames(lu, pol.keys(), None)
576 class LUExtStorageDiagnose(NoHooksLU):
577 """Logical unit for ExtStorage diagnose/query.
582 def CheckArguments(self):
583 self.eq = _ExtStorageQuery(qlang.MakeSimpleFilter("name", self.op.names),
584 self.op.output_fields, False)
586 def ExpandNames(self):
587 self.eq.ExpandNames(self)
589 def Exec(self, feedback_fn):
590 return self.eq.OldStyleQuery(self)
593 class LUQuery(NoHooksLU):
594 """Query for resources/items of a certain kind.
597 # pylint: disable=W0142
600 def CheckArguments(self):
601 qcls = _GetQueryImplementation(self.op.what)
603 self.impl = qcls(self.op.qfilter, self.op.fields, self.op.use_locking)
605 def ExpandNames(self):
606 self.impl.ExpandNames(self)
608 def DeclareLocks(self, level):
609 self.impl.DeclareLocks(self, level)
611 def Exec(self, feedback_fn):
612 return self.impl.NewStyleQuery(self)
615 class LUQueryFields(NoHooksLU):
616 """Query for resources/items of a certain kind.
619 # pylint: disable=W0142
622 def CheckArguments(self):
623 self.qcls = _GetQueryImplementation(self.op.what)
625 def ExpandNames(self):
626 self.needed_locks = {}
628 def Exec(self, feedback_fn):
629 return query.QueryFields(self.qcls.FIELDS, self.op.fields)
632 class LURestrictedCommand(NoHooksLU):
633 """Logical unit for executing restricted commands.
638 def ExpandNames(self):
640 self.op.nodes = _GetWantedNodes(self, self.op.nodes)
642 self.needed_locks = {
643 locking.LEVEL_NODE: self.op.nodes,
646 locking.LEVEL_NODE: not self.op.use_locking,
649 def CheckPrereq(self):
650 """Check prerequisites.
654 def Exec(self, feedback_fn):
655 """Execute restricted command and return output.
658 owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
660 # Check if correct locks are held
661 assert set(self.op.nodes).issubset(owned_nodes)
663 rpcres = self.rpc.call_restricted_command(self.op.nodes, self.op.command)
667 for node_name in self.op.nodes:
668 nres = rpcres[node_name]
670 msg = ("Command '%s' on node '%s' failed: %s" %
671 (self.op.command, node_name, nres.fail_msg))
672 result.append((False, msg))
674 result.append((True, nres.payload))
679 #: Query type implementations
681 constants.QR_CLUSTER: _ClusterQuery,
682 constants.QR_INSTANCE: _InstanceQuery,
683 constants.QR_NODE: _NodeQuery,
684 constants.QR_GROUP: _GroupQuery,
685 constants.QR_NETWORK: _NetworkQuery,
686 constants.QR_OS: _OsQuery,
687 constants.QR_EXTSTORAGE: _ExtStorageQuery,
688 constants.QR_EXPORT: _ExportQuery,
691 assert set(_QUERY_IMPL.keys()) == constants.QR_VIA_OP
694 def _GetQueryImplementation(name):
695 """Returns the implemtnation for a query type.
697 @param name: Query type, must be one of L{constants.QR_VIA_OP}
701 return _QUERY_IMPL[name]
703 raise errors.OpPrereqError("Unknown query resource '%s'" % name,