4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with nodes."""
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti import netutils
31 from ganeti import objects
32 from ganeti import opcodes
33 from ganeti import qlang
34 from ganeti import query
35 from ganeti import rpc
36 from ganeti import utils
37 from ganeti.masterd import iallocator
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
41 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42 MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43 IsExclusiveStorageEnabledNode, CheckNodePVs, \
44 RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45 CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46 AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47 GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48 FindFaultyInstanceDisks, CheckStorageTypeEnabled
51 def _DecideSelfPromotion(lu, exceptions=None):
52 """Decide whether I should promote myself as a master candidate.
55 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57 # the new node will increase mc_max with one, so:
58 mc_should = min(mc_should + 1, cp_size)
59 return mc_now < mc_should
62 def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63 """Ensure that a node has the given secondary ip.
65 @type lu: L{LogicalUnit}
66 @param lu: the LU on behalf of which we make the check
67 @type node: L{objects.Node}
68 @param node: the node to check
69 @type secondary_ip: string
70 @param secondary_ip: the ip to check
72 @param prereq: whether to throw a prerequisite or an execute error
73 @raise errors.OpPrereqError: if the node doesn't have the ip,
75 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
78 # this can be called with a new node, which has no UUID yet, so perform the
79 # RPC call using its name
80 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81 result.Raise("Failure checking secondary ip on node %s" % node.name,
82 prereq=prereq, ecode=errors.ECODE_ENVIRON)
83 if not result.payload:
84 msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85 " please fix and re-run this command" % secondary_ip)
87 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
89 raise errors.OpExecError(msg)
92 class LUNodeAdd(LogicalUnit):
93 """Logical unit for adding node to the cluster.
97 HTYPE = constants.HTYPE_NODE
98 _NFLAGS = ["master_capable", "vm_capable"]
100 def CheckArguments(self):
101 self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102 # validate/normalize the node name
103 self.hostname = netutils.GetHostname(name=self.op.node_name,
104 family=self.primary_ip_family)
105 self.op.node_name = self.hostname.name
107 if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108 raise errors.OpPrereqError("Cannot readd the master node",
111 if self.op.readd and self.op.group:
112 raise errors.OpPrereqError("Cannot pass a node group when a node is"
113 " being readded", errors.ECODE_INVAL)
115 # OpenvSwitch: Warn user if link is missing
116 if (self.op.ndparams[constants.ND_OVS] and not
117 self.op.ndparams[constants.ND_OVS_LINK]):
118 self.LogInfo("No physical interface for OpenvSwitch was given."
119 " OpenvSwitch will not have an outside connection. This"
120 " might not be what you want.")
121 # OpenvSwitch: Fail if parameters are given, but OVS is not enabled.
122 if (not self.op.ndparams[constants.ND_OVS] and
123 (self.op.ndparams[constants.ND_OVS_NAME] or
124 self.op.ndparams[constants.ND_OVS_LINK])):
125 raise errors.OpPrereqError("OpenvSwitch name or link were given, but"
126 " OpenvSwitch is not enabled. Please enable"
127 " OpenvSwitch with --ovs", errors.ECODE_INVAL)
129 def BuildHooksEnv(self):
132 This will run on all nodes before, and on all nodes + the new node after.
136 "OP_TARGET": self.op.node_name,
137 "NODE_NAME": self.op.node_name,
138 "NODE_PIP": self.op.primary_ip,
139 "NODE_SIP": self.op.secondary_ip,
140 "MASTER_CAPABLE": str(self.op.master_capable),
141 "VM_CAPABLE": str(self.op.vm_capable),
144 def BuildHooksNodes(self):
145 """Build hooks nodes.
148 hook_nodes = self.cfg.GetNodeList()
149 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
150 if new_node_info is not None:
152 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
154 # add the new node as post hook node by name; it does not have an UUID yet
155 return (hook_nodes, hook_nodes, [self.op.node_name, ])
157 def CheckPrereq(self):
158 """Check prerequisites.
161 - the new node is not already in the config
163 - its parameters (single/dual homed) matches the cluster
165 Any errors are signaled by raising errors.OpPrereqError.
168 node_name = self.hostname.name
169 self.op.primary_ip = self.hostname.ip
170 if self.op.secondary_ip is None:
171 if self.primary_ip_family == netutils.IP6Address.family:
172 raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
173 " IPv4 address must be given as secondary",
175 self.op.secondary_ip = self.op.primary_ip
177 secondary_ip = self.op.secondary_ip
178 if not netutils.IP4Address.IsValid(secondary_ip):
179 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
180 " address" % secondary_ip, errors.ECODE_INVAL)
182 existing_node_info = self.cfg.GetNodeInfoByName(node_name)
183 if not self.op.readd and existing_node_info is not None:
184 raise errors.OpPrereqError("Node %s is already in the configuration" %
185 node_name, errors.ECODE_EXISTS)
186 elif self.op.readd and existing_node_info is None:
187 raise errors.OpPrereqError("Node %s is not in the configuration" %
188 node_name, errors.ECODE_NOENT)
190 self.changed_primary_ip = False
192 for existing_node in self.cfg.GetAllNodesInfo().values():
193 if self.op.readd and node_name == existing_node.name:
194 if existing_node.secondary_ip != secondary_ip:
195 raise errors.OpPrereqError("Readded node doesn't have the same IP"
196 " address configuration as before",
198 if existing_node.primary_ip != self.op.primary_ip:
199 self.changed_primary_ip = True
203 if (existing_node.primary_ip == self.op.primary_ip or
204 existing_node.secondary_ip == self.op.primary_ip or
205 existing_node.primary_ip == secondary_ip or
206 existing_node.secondary_ip == secondary_ip):
207 raise errors.OpPrereqError("New node ip address(es) conflict with"
208 " existing node %s" % existing_node.name,
209 errors.ECODE_NOTUNIQUE)
211 # After this 'if' block, None is no longer a valid value for the
212 # _capable op attributes
214 assert existing_node_info is not None, \
215 "Can't retrieve locked node %s" % node_name
216 for attr in self._NFLAGS:
217 if getattr(self.op, attr) is None:
218 setattr(self.op, attr, getattr(existing_node_info, attr))
220 for attr in self._NFLAGS:
221 if getattr(self.op, attr) is None:
222 setattr(self.op, attr, True)
224 if self.op.readd and not self.op.vm_capable:
225 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
227 raise errors.OpPrereqError("Node %s being re-added with vm_capable"
228 " flag set to false, but it already holds"
229 " instances" % node_name,
232 # check that the type of the node (single versus dual homed) is the
233 # same as for the master
234 myself = self.cfg.GetMasterNodeInfo()
235 master_singlehomed = myself.secondary_ip == myself.primary_ip
236 newbie_singlehomed = secondary_ip == self.op.primary_ip
237 if master_singlehomed != newbie_singlehomed:
238 if master_singlehomed:
239 raise errors.OpPrereqError("The master has no secondary ip but the"
243 raise errors.OpPrereqError("The master has a secondary ip but the"
244 " new node doesn't have one",
247 # checks reachability
248 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
249 raise errors.OpPrereqError("Node not reachable by ping",
250 errors.ECODE_ENVIRON)
252 if not newbie_singlehomed:
253 # check reachability from my secondary ip to newbie's secondary ip
254 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
255 source=myself.secondary_ip):
256 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
257 " based ping to node daemon port",
258 errors.ECODE_ENVIRON)
261 exceptions = [existing_node_info.uuid]
265 if self.op.master_capable:
266 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
268 self.master_candidate = False
271 self.new_node = existing_node_info
273 node_group = self.cfg.LookupNodeGroup(self.op.group)
274 self.new_node = objects.Node(name=node_name,
275 primary_ip=self.op.primary_ip,
276 secondary_ip=secondary_ip,
277 master_candidate=self.master_candidate,
278 offline=False, drained=False,
279 group=node_group, ndparams={})
282 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
283 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
284 "node", "cluster or group")
287 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
289 if self.op.disk_state:
290 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
292 # TODO: If we need to have multiple DnsOnlyRunner we probably should make
293 # it a property on the base class.
294 rpcrunner = rpc.DnsOnlyRunner()
295 result = rpcrunner.call_version([node_name])[node_name]
296 result.Raise("Can't get version information from node %s" % node_name)
297 if constants.PROTOCOL_VERSION == result.payload:
298 logging.info("Communication to node %s fine, sw version %s match",
299 node_name, result.payload)
301 raise errors.OpPrereqError("Version mismatch master version %s,"
303 (constants.PROTOCOL_VERSION, result.payload),
304 errors.ECODE_ENVIRON)
306 vg_name = self.cfg.GetVGName()
307 if vg_name is not None:
308 vparams = {constants.NV_PVLIST: [vg_name]}
309 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
310 cname = self.cfg.GetClusterName()
311 result = rpcrunner.call_node_verify_light(
312 [node_name], vparams, cname,
313 self.cfg.GetClusterInfo().hvparams)[node_name]
314 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
316 raise errors.OpPrereqError("Checks on node PVs failed: %s" %
317 "; ".join(errmsgs), errors.ECODE_ENVIRON)
319 def Exec(self, feedback_fn):
320 """Adds the new node to the cluster.
323 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
326 # We adding a new node so we assume it's powered
327 self.new_node.powered = True
329 # for re-adds, reset the offline/drained/master-candidate flags;
330 # we need to reset here, otherwise offline would prevent RPC calls
331 # later in the procedure; this also means that if the re-add
332 # fails, we are left with a non-offlined, broken node
334 self.new_node.drained = False
335 self.LogInfo("Readding a node, the offline/drained flags were reset")
336 # if we demote the node, we do cleanup later in the procedure
337 self.new_node.master_candidate = self.master_candidate
338 if self.changed_primary_ip:
339 self.new_node.primary_ip = self.op.primary_ip
341 # copy the master/vm_capable flags
342 for attr in self._NFLAGS:
343 setattr(self.new_node, attr, getattr(self.op, attr))
345 # notify the user about any possible mc promotion
346 if self.new_node.master_candidate:
347 self.LogInfo("Node will be a master candidate")
350 self.new_node.ndparams = self.op.ndparams
352 self.new_node.ndparams = {}
355 self.new_node.hv_state_static = self.new_hv_state
357 if self.op.disk_state:
358 self.new_node.disk_state_static = self.new_disk_state
360 # Add node to our /etc/hosts, and add key to known_hosts
361 if self.cfg.GetClusterInfo().modify_etc_hosts:
362 master_node = self.cfg.GetMasterNode()
363 result = self.rpc.call_etc_hosts_modify(
364 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
366 result.Raise("Can't update hosts file with new host data")
368 if self.new_node.secondary_ip != self.new_node.primary_ip:
369 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
372 node_verifier_uuids = [self.cfg.GetMasterNode()]
373 node_verify_param = {
374 constants.NV_NODELIST: ([self.new_node.name], {}),
375 # TODO: do a node-net-test as well?
378 result = self.rpc.call_node_verify(
379 node_verifier_uuids, node_verify_param,
380 self.cfg.GetClusterName(),
381 self.cfg.GetClusterInfo().hvparams)
382 for verifier in node_verifier_uuids:
383 result[verifier].Raise("Cannot communicate with node %s" % verifier)
384 nl_payload = result[verifier].payload[constants.NV_NODELIST]
386 for failed in nl_payload:
387 feedback_fn("ssh/hostname verification failed"
388 " (checking from %s): %s" %
389 (verifier, nl_payload[failed]))
390 raise errors.OpExecError("ssh/hostname verification failed")
392 # OpenvSwitch initialization on the node
393 if self.new_node.ndparams[constants.ND_OVS]:
394 result = self.rpc.call_node_configure_ovs(
396 self.new_node.ndparams[constants.ND_OVS_NAME],
397 self.new_node.ndparams[constants.ND_OVS_LINK])
400 self.context.ReaddNode(self.new_node)
401 RedistributeAncillaryFiles(self)
402 # make sure we redistribute the config
403 self.cfg.Update(self.new_node, feedback_fn)
404 # and make sure the new node will not have old files around
405 if not self.new_node.master_candidate:
406 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
407 result.Warn("Node failed to demote itself from master candidate status",
410 self.context.AddNode(self.new_node, self.proc.GetECId())
411 RedistributeAncillaryFiles(self)
414 class LUNodeSetParams(LogicalUnit):
415 """Modifies the parameters of a node.
417 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
418 to the node role (as _ROLE_*)
419 @cvar _R2F: a dictionary from node role to tuples of flags
420 @cvar _FLAGS: a list of attribute names corresponding to the flags
423 HPATH = "node-modify"
424 HTYPE = constants.HTYPE_NODE
426 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
428 (True, False, False): _ROLE_CANDIDATE,
429 (False, True, False): _ROLE_DRAINED,
430 (False, False, True): _ROLE_OFFLINE,
431 (False, False, False): _ROLE_REGULAR,
433 _R2F = dict((v, k) for k, v in _F2R.items())
434 _FLAGS = ["master_candidate", "drained", "offline"]
436 def CheckArguments(self):
437 (self.op.node_uuid, self.op.node_name) = \
438 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
439 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
440 self.op.master_capable, self.op.vm_capable,
441 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
443 if all_mods.count(None) == len(all_mods):
444 raise errors.OpPrereqError("Please pass at least one modification",
446 if all_mods.count(True) > 1:
447 raise errors.OpPrereqError("Can't set the node into more than one"
448 " state at the same time",
451 # Boolean value that tells us whether we might be demoting from MC
452 self.might_demote = (self.op.master_candidate is False or
453 self.op.offline is True or
454 self.op.drained is True or
455 self.op.master_capable is False)
457 if self.op.secondary_ip:
458 if not netutils.IP4Address.IsValid(self.op.secondary_ip):
459 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
460 " address" % self.op.secondary_ip,
463 self.lock_all = self.op.auto_promote and self.might_demote
464 self.lock_instances = self.op.secondary_ip is not None
466 def _InstanceFilter(self, instance):
467 """Filter for getting affected instances.
470 return (instance.disk_template in constants.DTS_INT_MIRROR and
471 self.op.node_uuid in instance.all_nodes)
473 def ExpandNames(self):
475 self.needed_locks = {
476 locking.LEVEL_NODE: locking.ALL_SET,
478 # Block allocations when all nodes are locked
479 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
482 self.needed_locks = {
483 locking.LEVEL_NODE: self.op.node_uuid,
486 # Since modifying a node can have severe effects on currently running
487 # operations the resource lock is at least acquired in shared mode
488 self.needed_locks[locking.LEVEL_NODE_RES] = \
489 self.needed_locks[locking.LEVEL_NODE]
491 # Get all locks except nodes in shared mode; they are not used for anything
492 # but read-only access
493 self.share_locks = ShareAll()
494 self.share_locks[locking.LEVEL_NODE] = 0
495 self.share_locks[locking.LEVEL_NODE_RES] = 0
496 self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
498 if self.lock_instances:
499 self.needed_locks[locking.LEVEL_INSTANCE] = \
500 self.cfg.GetInstanceNames(
501 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
503 def BuildHooksEnv(self):
506 This runs on the master node.
510 "OP_TARGET": self.op.node_name,
511 "MASTER_CANDIDATE": str(self.op.master_candidate),
512 "OFFLINE": str(self.op.offline),
513 "DRAINED": str(self.op.drained),
514 "MASTER_CAPABLE": str(self.op.master_capable),
515 "VM_CAPABLE": str(self.op.vm_capable),
518 def BuildHooksNodes(self):
519 """Build hooks nodes.
522 nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
525 def CheckPrereq(self):
526 """Check prerequisites.
528 This only checks the instance list against the existing names.
531 node = self.cfg.GetNodeInfo(self.op.node_uuid)
532 if self.lock_instances:
533 affected_instances = \
534 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
536 # Verify instance locks
537 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
538 wanted_instance_names = frozenset([inst.name for inst in
539 affected_instances.values()])
540 if wanted_instance_names - owned_instance_names:
541 raise errors.OpPrereqError("Instances affected by changing node %s's"
542 " secondary IP address have changed since"
543 " locks were acquired, wanted '%s', have"
544 " '%s'; retry the operation" %
546 utils.CommaJoin(wanted_instance_names),
547 utils.CommaJoin(owned_instance_names)),
550 affected_instances = None
552 if (self.op.master_candidate is not None or
553 self.op.drained is not None or
554 self.op.offline is not None):
555 # we can't change the master's node flags
556 if node.uuid == self.cfg.GetMasterNode():
557 raise errors.OpPrereqError("The master role can be changed"
558 " only via master-failover",
561 if self.op.master_candidate and not node.master_capable:
562 raise errors.OpPrereqError("Node %s is not master capable, cannot make"
563 " it a master candidate" % node.name,
566 if self.op.vm_capable is False:
567 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
569 raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
570 " the vm_capable flag" % node.name,
573 if node.master_candidate and self.might_demote and not self.lock_all:
574 assert not self.op.auto_promote, "auto_promote set but lock_all not"
575 # check if after removing the current node, we're missing master
577 (mc_remaining, mc_should, _) = \
578 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
579 if mc_remaining < mc_should:
580 raise errors.OpPrereqError("Not enough master candidates, please"
581 " pass auto promote option to allow"
582 " promotion (--auto-promote or RAPI"
583 " auto_promote=True)", errors.ECODE_STATE)
585 self.old_flags = old_flags = (node.master_candidate,
586 node.drained, node.offline)
587 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
588 self.old_role = old_role = self._F2R[old_flags]
590 # Check for ineffective changes
591 for attr in self._FLAGS:
592 if getattr(self.op, attr) is False and getattr(node, attr) is False:
593 self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
594 setattr(self.op, attr, None)
596 # Past this point, any flag change to False means a transition
597 # away from the respective state, as only real changes are kept
599 # TODO: We might query the real power state if it supports OOB
600 if SupportsOob(self.cfg, node):
601 if self.op.offline is False and not (node.powered or
602 self.op.powered is True):
603 raise errors.OpPrereqError(("Node %s needs to be turned on before its"
604 " offline status can be reset") %
605 self.op.node_name, errors.ECODE_STATE)
606 elif self.op.powered is not None:
607 raise errors.OpPrereqError(("Unable to change powered state for node %s"
608 " as it does not support out-of-band"
609 " handling") % self.op.node_name,
612 # If we're being deofflined/drained, we'll MC ourself if needed
613 if (self.op.drained is False or self.op.offline is False or
614 (self.op.master_capable and not node.master_capable)):
615 if _DecideSelfPromotion(self):
616 self.op.master_candidate = True
617 self.LogInfo("Auto-promoting node to master candidate")
619 # If we're no longer master capable, we'll demote ourselves from MC
620 if self.op.master_capable is False and node.master_candidate:
621 self.LogInfo("Demoting from master candidate")
622 self.op.master_candidate = False
625 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
626 if self.op.master_candidate:
627 new_role = self._ROLE_CANDIDATE
628 elif self.op.drained:
629 new_role = self._ROLE_DRAINED
630 elif self.op.offline:
631 new_role = self._ROLE_OFFLINE
632 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
633 # False is still in new flags, which means we're un-setting (the
635 new_role = self._ROLE_REGULAR
636 else: # no new flags, nothing, keep old role
639 self.new_role = new_role
641 if old_role == self._ROLE_OFFLINE and new_role != old_role:
642 # Trying to transition out of offline status
643 result = self.rpc.call_version([node.uuid])[node.uuid]
645 raise errors.OpPrereqError("Node %s is being de-offlined but fails"
646 " to report its version: %s" %
647 (node.name, result.fail_msg),
650 self.LogWarning("Transitioning node from offline to online state"
651 " without using re-add. Please make sure the node"
654 # When changing the secondary ip, verify if this is a single-homed to
655 # multi-homed transition or vice versa, and apply the relevant
657 if self.op.secondary_ip:
658 # Ok even without locking, because this can't be changed by any LU
659 master = self.cfg.GetMasterNodeInfo()
660 master_singlehomed = master.secondary_ip == master.primary_ip
661 if master_singlehomed and self.op.secondary_ip != node.primary_ip:
662 if self.op.force and node.uuid == master.uuid:
663 self.LogWarning("Transitioning from single-homed to multi-homed"
664 " cluster; all nodes will require a secondary IP"
667 raise errors.OpPrereqError("Changing the secondary ip on a"
668 " single-homed cluster requires the"
669 " --force option to be passed, and the"
670 " target node to be the master",
672 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
673 if self.op.force and node.uuid == master.uuid:
674 self.LogWarning("Transitioning from multi-homed to single-homed"
675 " cluster; secondary IP addresses will have to be"
678 raise errors.OpPrereqError("Cannot set the secondary IP to be the"
679 " same as the primary IP on a multi-homed"
680 " cluster, unless the --force option is"
681 " passed, and the target node is the"
682 " master", errors.ECODE_INVAL)
684 assert not (set([inst.name for inst in affected_instances.values()]) -
685 self.owned_locks(locking.LEVEL_INSTANCE))
688 if affected_instances:
689 msg = ("Cannot change secondary IP address: offline node has"
690 " instances (%s) configured to use it" %
692 [inst.name for inst in affected_instances.values()]))
693 raise errors.OpPrereqError(msg, errors.ECODE_STATE)
695 # On online nodes, check that no instances are running, and that
696 # the node has the new ip and we can reach it.
697 for instance in affected_instances.values():
698 CheckInstanceState(self, instance, INSTANCE_DOWN,
699 msg="cannot change secondary ip")
701 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
702 if master.uuid != node.uuid:
703 # check reachability from master secondary ip to new secondary ip
704 if not netutils.TcpPing(self.op.secondary_ip,
705 constants.DEFAULT_NODED_PORT,
706 source=master.secondary_ip):
707 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
708 " based ping to node daemon port",
709 errors.ECODE_ENVIRON)
712 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
713 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
714 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
715 "node", "cluster or group")
716 self.new_ndparams = new_ndparams
719 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
720 node.hv_state_static)
722 if self.op.disk_state:
723 self.new_disk_state = \
724 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
726 def Exec(self, feedback_fn):
730 node = self.cfg.GetNodeInfo(self.op.node_uuid)
734 node.ndparams = self.new_ndparams
736 if self.op.powered is not None:
737 node.powered = self.op.powered
740 node.hv_state_static = self.new_hv_state
742 if self.op.disk_state:
743 node.disk_state_static = self.new_disk_state
745 for attr in ["master_capable", "vm_capable"]:
746 val = getattr(self.op, attr)
748 setattr(node, attr, val)
749 result.append((attr, str(val)))
751 if self.new_role != self.old_role:
752 # Tell the node to demote itself, if no longer MC and not offline
753 if self.old_role == self._ROLE_CANDIDATE and \
754 self.new_role != self._ROLE_OFFLINE:
755 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
757 self.LogWarning("Node failed to demote itself: %s", msg)
759 new_flags = self._R2F[self.new_role]
760 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
762 result.append((desc, str(nf)))
763 (node.master_candidate, node.drained, node.offline) = new_flags
765 # we locked all nodes, we adjust the CP before updating this node
767 AdjustCandidatePool(self, [node.uuid])
769 if self.op.secondary_ip:
770 node.secondary_ip = self.op.secondary_ip
771 result.append(("secondary_ip", self.op.secondary_ip))
773 # this will trigger configuration file update, if needed
774 self.cfg.Update(node, feedback_fn)
776 # this will trigger job queue propagation or cleanup if the mc
778 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
779 self.context.ReaddNode(node)
784 class LUNodePowercycle(NoHooksLU):
785 """Powercycles a node.
790 def CheckArguments(self):
791 (self.op.node_uuid, self.op.node_name) = \
792 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
794 if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
795 raise errors.OpPrereqError("The node is the master and the force"
796 " parameter was not set",
799 def ExpandNames(self):
800 """Locking for PowercycleNode.
802 This is a last-resort option and shouldn't block on other
803 jobs. Therefore, we grab no locks.
806 self.needed_locks = {}
808 def Exec(self, feedback_fn):
812 default_hypervisor = self.cfg.GetHypervisorType()
813 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
814 result = self.rpc.call_node_powercycle(self.op.node_uuid,
817 result.Raise("Failed to schedule the reboot")
818 return result.payload
821 def _GetNodeInstancesInner(cfg, fn):
822 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
825 def _GetNodePrimaryInstances(cfg, node_uuid):
826 """Returns primary instances on a node.
829 return _GetNodeInstancesInner(cfg,
830 lambda inst: node_uuid == inst.primary_node)
833 def _GetNodeSecondaryInstances(cfg, node_uuid):
834 """Returns secondary instances on a node.
837 return _GetNodeInstancesInner(cfg,
838 lambda inst: node_uuid in inst.secondary_nodes)
841 def _GetNodeInstances(cfg, node_uuid):
842 """Returns a list of all primary and secondary instances on a node.
846 return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
849 class LUNodeEvacuate(NoHooksLU):
850 """Evacuates instances off a list of nodes.
856 constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
857 constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
858 constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
860 assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
861 assert (frozenset(_MODE2IALLOCATOR.values()) ==
862 constants.IALLOCATOR_NEVAC_MODES)
864 def CheckArguments(self):
865 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
867 def ExpandNames(self):
868 (self.op.node_uuid, self.op.node_name) = \
869 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
871 if self.op.remote_node is not None:
872 (self.op.remote_node_uuid, self.op.remote_node) = \
873 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
875 assert self.op.remote_node
877 if self.op.node_uuid == self.op.remote_node_uuid:
878 raise errors.OpPrereqError("Can not use evacuated node as a new"
879 " secondary node", errors.ECODE_INVAL)
881 if self.op.mode != constants.NODE_EVAC_SEC:
882 raise errors.OpPrereqError("Without the use of an iallocator only"
883 " secondary instances can be evacuated",
887 self.share_locks = ShareAll()
888 self.needed_locks = {
889 locking.LEVEL_INSTANCE: [],
890 locking.LEVEL_NODEGROUP: [],
891 locking.LEVEL_NODE: [],
894 # Determine nodes (via group) optimistically, needs verification once locks
896 self.lock_nodes = self._DetermineNodes()
898 def _DetermineNodes(self):
899 """Gets the list of node UUIDs to operate on.
902 if self.op.remote_node is None:
903 # Iallocator will choose any node(s) in the same group
904 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
906 group_nodes = frozenset([self.op.remote_node_uuid])
908 # Determine nodes to be locked
909 return set([self.op.node_uuid]) | group_nodes
911 def _DetermineInstances(self):
912 """Builds list of instances to operate on.
915 assert self.op.mode in constants.NODE_EVAC_MODES
917 if self.op.mode == constants.NODE_EVAC_PRI:
918 # Primary instances only
919 inst_fn = _GetNodePrimaryInstances
920 assert self.op.remote_node is None, \
921 "Evacuating primary instances requires iallocator"
922 elif self.op.mode == constants.NODE_EVAC_SEC:
923 # Secondary instances only
924 inst_fn = _GetNodeSecondaryInstances
927 assert self.op.mode == constants.NODE_EVAC_ALL
928 inst_fn = _GetNodeInstances
929 # TODO: In 2.6, change the iallocator interface to take an evacuation mode
931 raise errors.OpPrereqError("Due to an issue with the iallocator"
932 " interface it is not possible to evacuate"
933 " all instances at once; specify explicitly"
934 " whether to evacuate primary or secondary"
938 return inst_fn(self.cfg, self.op.node_uuid)
940 def DeclareLocks(self, level):
941 if level == locking.LEVEL_INSTANCE:
942 # Lock instances optimistically, needs verification once node and group
943 # locks have been acquired
944 self.needed_locks[locking.LEVEL_INSTANCE] = \
945 set(i.name for i in self._DetermineInstances())
947 elif level == locking.LEVEL_NODEGROUP:
948 # Lock node groups for all potential target nodes optimistically, needs
949 # verification once nodes have been acquired
950 self.needed_locks[locking.LEVEL_NODEGROUP] = \
951 self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
953 elif level == locking.LEVEL_NODE:
954 self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
956 def CheckPrereq(self):
958 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
959 owned_nodes = self.owned_locks(locking.LEVEL_NODE)
960 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
962 need_nodes = self._DetermineNodes()
964 if not owned_nodes.issuperset(need_nodes):
965 raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
966 " locks were acquired, current nodes are"
967 " are '%s', used to be '%s'; retry the"
970 utils.CommaJoin(need_nodes),
971 utils.CommaJoin(owned_nodes)),
974 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
975 if owned_groups != wanted_groups:
976 raise errors.OpExecError("Node groups changed since locks were acquired,"
977 " current groups are '%s', used to be '%s';"
978 " retry the operation" %
979 (utils.CommaJoin(wanted_groups),
980 utils.CommaJoin(owned_groups)))
982 # Determine affected instances
983 self.instances = self._DetermineInstances()
984 self.instance_names = [i.name for i in self.instances]
986 if set(self.instance_names) != owned_instance_names:
987 raise errors.OpExecError("Instances on node '%s' changed since locks"
988 " were acquired, current instances are '%s',"
989 " used to be '%s'; retry the operation" %
991 utils.CommaJoin(self.instance_names),
992 utils.CommaJoin(owned_instance_names)))
994 if self.instance_names:
995 self.LogInfo("Evacuating instances from node '%s': %s",
997 utils.CommaJoin(utils.NiceSort(self.instance_names)))
999 self.LogInfo("No instances to evacuate from node '%s'",
1002 if self.op.remote_node is not None:
1003 for i in self.instances:
1004 if i.primary_node == self.op.remote_node_uuid:
1005 raise errors.OpPrereqError("Node %s is the primary node of"
1006 " instance %s, cannot use it as"
1008 (self.op.remote_node, i.name),
1011 def Exec(self, feedback_fn):
1012 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1014 if not self.instance_names:
1015 # No instances to evacuate
1018 elif self.op.iallocator is not None:
1019 # TODO: Implement relocation to other group
1020 evac_mode = self._MODE2IALLOCATOR[self.op.mode]
1021 req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
1022 instances=list(self.instance_names))
1023 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1025 ial.Run(self.op.iallocator)
1028 raise errors.OpPrereqError("Can't compute node evacuation using"
1029 " iallocator '%s': %s" %
1030 (self.op.iallocator, ial.info),
1033 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1035 elif self.op.remote_node is not None:
1036 assert self.op.mode == constants.NODE_EVAC_SEC
1038 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1039 remote_node=self.op.remote_node,
1041 mode=constants.REPLACE_DISK_CHG,
1042 early_release=self.op.early_release)]
1043 for instance_name in self.instance_names]
1046 raise errors.ProgrammerError("No iallocator or remote node")
1048 return ResultWithJobs(jobs)
1051 class LUNodeMigrate(LogicalUnit):
1052 """Migrate all instances from a node.
1055 HPATH = "node-migrate"
1056 HTYPE = constants.HTYPE_NODE
1059 def CheckArguments(self):
1062 def ExpandNames(self):
1063 (self.op.node_uuid, self.op.node_name) = \
1064 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1066 self.share_locks = ShareAll()
1067 self.needed_locks = {
1068 locking.LEVEL_NODE: [self.op.node_uuid],
1071 def BuildHooksEnv(self):
1074 This runs on the master, the primary and all the secondaries.
1078 "NODE_NAME": self.op.node_name,
1079 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1082 def BuildHooksNodes(self):
1083 """Build hooks nodes.
1086 nl = [self.cfg.GetMasterNode()]
1089 def CheckPrereq(self):
1092 def Exec(self, feedback_fn):
1093 # Prepare jobs for migration instances
1095 [opcodes.OpInstanceMigrate(
1096 instance_name=inst.name,
1099 iallocator=self.op.iallocator,
1100 target_node=self.op.target_node,
1101 allow_runtime_changes=self.op.allow_runtime_changes,
1102 ignore_ipolicy=self.op.ignore_ipolicy)]
1103 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1105 # TODO: Run iallocator in this opcode and pass correct placement options to
1106 # OpInstanceMigrate. Since other jobs can modify the cluster between
1107 # running the iallocator and the actual migration, a good consistency model
1108 # will have to be found.
1110 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1111 frozenset([self.op.node_uuid]))
1113 return ResultWithJobs(jobs)
1116 def _GetStorageTypeArgs(cfg, storage_type):
1117 """Returns the arguments for a storage type.
1120 # Special case for file storage
1121 if storage_type == constants.ST_FILE:
1122 # storage.FileStorage wants a list of storage directories
1123 return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1128 class LUNodeModifyStorage(NoHooksLU):
1129 """Logical unit for modifying a storage volume on a node.
1134 def CheckArguments(self):
1135 (self.op.node_uuid, self.op.node_name) = \
1136 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1138 storage_type = self.op.storage_type
1141 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1143 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1144 " modified" % storage_type,
1147 diff = set(self.op.changes.keys()) - modifiable
1149 raise errors.OpPrereqError("The following fields can not be modified for"
1150 " storage units of type '%s': %r" %
1151 (storage_type, list(diff)),
1154 def CheckPrereq(self):
1155 """Check prerequisites.
1158 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1160 def ExpandNames(self):
1161 self.needed_locks = {
1162 locking.LEVEL_NODE: self.op.node_uuid,
1165 def Exec(self, feedback_fn):
1166 """Computes the list of nodes and their attributes.
1169 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1170 result = self.rpc.call_storage_modify(self.op.node_uuid,
1171 self.op.storage_type, st_args,
1172 self.op.name, self.op.changes)
1173 result.Raise("Failed to modify storage unit '%s' on %s" %
1174 (self.op.name, self.op.node_name))
1177 class NodeQuery(QueryBase):
1178 FIELDS = query.NODE_FIELDS
1180 def ExpandNames(self, lu):
1181 lu.needed_locks = {}
1182 lu.share_locks = ShareAll()
1185 (self.wanted, _) = GetWantedNodes(lu, self.names)
1187 self.wanted = locking.ALL_SET
1189 self.do_locking = (self.use_locking and
1190 query.NQ_LIVE in self.requested_data)
1193 # If any non-static field is requested we need to lock the nodes
1194 lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1195 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1197 def DeclareLocks(self, lu, level):
1200 def _GetQueryData(self, lu):
1201 """Computes the list of nodes and their attributes.
1204 all_info = lu.cfg.GetAllNodesInfo()
1206 node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1208 # Gather data as requested
1209 if query.NQ_LIVE in self.requested_data:
1210 # filter out non-vm_capable nodes
1211 toquery_node_uuids = [node.uuid for node in all_info.values()
1212 if node.vm_capable and node.uuid in node_uuids]
1213 lvm_enabled = utils.storage.IsLvmEnabled(
1214 lu.cfg.GetClusterInfo().enabled_disk_templates)
1215 # FIXME: this per default asks for storage space information for all
1216 # enabled disk templates. Fix this by making it possible to specify
1217 # space report fields for specific disk templates.
1218 raw_storage_units = utils.storage.GetStorageUnitsOfCluster(
1219 lu.cfg, include_spindles=lvm_enabled)
1220 storage_units = rpc.PrepareStorageUnitsForNodes(
1221 lu.cfg, raw_storage_units, toquery_node_uuids)
1222 default_hypervisor = lu.cfg.GetHypervisorType()
1223 hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1224 hvspecs = [(default_hypervisor, hvparams)]
1225 node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1228 (uuid, rpc.MakeLegacyNodeInfo(nresult.payload,
1229 require_spindles=lvm_enabled))
1230 for (uuid, nresult) in node_data.items()
1231 if not nresult.fail_msg and nresult.payload)
1235 if query.NQ_INST in self.requested_data:
1236 node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1237 node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1239 inst_data = lu.cfg.GetAllInstancesInfo()
1240 inst_uuid_to_inst_name = {}
1242 for inst in inst_data.values():
1243 inst_uuid_to_inst_name[inst.uuid] = inst.name
1244 if inst.primary_node in node_to_primary:
1245 node_to_primary[inst.primary_node].add(inst.uuid)
1246 for secnode in inst.secondary_nodes:
1247 if secnode in node_to_secondary:
1248 node_to_secondary[secnode].add(inst.uuid)
1250 node_to_primary = None
1251 node_to_secondary = None
1252 inst_uuid_to_inst_name = None
1254 if query.NQ_OOB in self.requested_data:
1255 oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1256 for uuid, node in all_info.iteritems())
1260 if query.NQ_GROUP in self.requested_data:
1261 groups = lu.cfg.GetAllNodeGroupsInfo()
1265 return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1266 live_data, lu.cfg.GetMasterNode(),
1267 node_to_primary, node_to_secondary,
1268 inst_uuid_to_inst_name, groups, oob_support,
1269 lu.cfg.GetClusterInfo())
1272 class LUNodeQuery(NoHooksLU):
1273 """Logical unit for querying nodes.
1276 # pylint: disable=W0142
1279 def CheckArguments(self):
1280 self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1281 self.op.output_fields, self.op.use_locking)
1283 def ExpandNames(self):
1284 self.nq.ExpandNames(self)
1286 def DeclareLocks(self, level):
1287 self.nq.DeclareLocks(self, level)
1289 def Exec(self, feedback_fn):
1290 return self.nq.OldStyleQuery(self)
1293 def _CheckOutputFields(fields, selected):
1294 """Checks whether all selected fields are valid according to fields.
1296 @type fields: L{utils.FieldSet}
1297 @param fields: fields set
1298 @type selected: L{utils.FieldSet}
1299 @param selected: fields set
1302 delta = fields.NonMatching(selected)
1304 raise errors.OpPrereqError("Unknown output fields selected: %s"
1305 % ",".join(delta), errors.ECODE_INVAL)
1308 class LUNodeQueryvols(NoHooksLU):
1309 """Logical unit for getting volumes on node(s).
1314 def CheckArguments(self):
1315 _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1316 constants.VF_VG, constants.VF_NAME,
1317 constants.VF_SIZE, constants.VF_INSTANCE),
1318 self.op.output_fields)
1320 def ExpandNames(self):
1321 self.share_locks = ShareAll()
1324 self.needed_locks = {
1325 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1328 self.needed_locks = {
1329 locking.LEVEL_NODE: locking.ALL_SET,
1330 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1333 def Exec(self, feedback_fn):
1334 """Computes the list of nodes and their attributes.
1337 node_uuids = self.owned_locks(locking.LEVEL_NODE)
1338 volumes = self.rpc.call_node_volumes(node_uuids)
1340 ilist = self.cfg.GetAllInstancesInfo()
1341 vol2inst = MapInstanceLvsToNodes(ilist.values())
1344 for node_uuid in node_uuids:
1345 nresult = volumes[node_uuid]
1348 msg = nresult.fail_msg
1350 self.LogWarning("Can't compute volume data on node %s: %s",
1351 self.cfg.GetNodeName(node_uuid), msg)
1354 node_vols = sorted(nresult.payload,
1355 key=operator.itemgetter(constants.VF_DEV))
1357 for vol in node_vols:
1359 for field in self.op.output_fields:
1360 if field == constants.VF_NODE:
1361 val = self.cfg.GetNodeName(node_uuid)
1362 elif field == constants.VF_PHYS:
1363 val = vol[constants.VF_DEV]
1364 elif field == constants.VF_VG:
1365 val = vol[constants.VF_VG]
1366 elif field == constants.VF_NAME:
1367 val = vol[constants.VF_NAME]
1368 elif field == constants.VF_SIZE:
1369 val = int(float(vol[constants.VF_SIZE]))
1370 elif field == constants.VF_INSTANCE:
1371 inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1372 vol[constants.VF_NAME]), None)
1373 if inst is not None:
1378 raise errors.ParameterError(field)
1379 node_output.append(str(val))
1381 output.append(node_output)
1386 class LUNodeQueryStorage(NoHooksLU):
1387 """Logical unit for getting information on storage units on node(s).
1392 def CheckArguments(self):
1393 _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1394 self.op.output_fields)
1396 def ExpandNames(self):
1397 self.share_locks = ShareAll()
1400 self.needed_locks = {
1401 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1404 self.needed_locks = {
1405 locking.LEVEL_NODE: locking.ALL_SET,
1406 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1409 def CheckPrereq(self):
1410 """Check prerequisites.
1413 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1415 def Exec(self, feedback_fn):
1416 """Computes the list of nodes and their attributes.
1419 self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1421 # Always get name to sort by
1422 if constants.SF_NAME in self.op.output_fields:
1423 fields = self.op.output_fields[:]
1425 fields = [constants.SF_NAME] + self.op.output_fields
1427 # Never ask for node or type as it's only known to the LU
1428 for extra in [constants.SF_NODE, constants.SF_TYPE]:
1429 while extra in fields:
1430 fields.remove(extra)
1432 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1433 name_idx = field_idx[constants.SF_NAME]
1435 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1436 data = self.rpc.call_storage_list(self.node_uuids,
1437 self.op.storage_type, st_args,
1438 self.op.name, fields)
1442 for node_uuid in utils.NiceSort(self.node_uuids):
1443 node_name = self.cfg.GetNodeName(node_uuid)
1444 nresult = data[node_uuid]
1448 msg = nresult.fail_msg
1450 self.LogWarning("Can't get storage data from node %s: %s",
1454 rows = dict([(row[name_idx], row) for row in nresult.payload])
1456 for name in utils.NiceSort(rows.keys()):
1461 for field in self.op.output_fields:
1462 if field == constants.SF_NODE:
1464 elif field == constants.SF_TYPE:
1465 val = self.op.storage_type
1466 elif field in field_idx:
1467 val = row[field_idx[field]]
1469 raise errors.ParameterError(field)
1478 class LUNodeRemove(LogicalUnit):
1479 """Logical unit for removing a node.
1482 HPATH = "node-remove"
1483 HTYPE = constants.HTYPE_NODE
1485 def BuildHooksEnv(self):
1490 "OP_TARGET": self.op.node_name,
1491 "NODE_NAME": self.op.node_name,
1494 def BuildHooksNodes(self):
1495 """Build hooks nodes.
1497 This doesn't run on the target node in the pre phase as a failed
1498 node would then be impossible to remove.
1501 all_nodes = self.cfg.GetNodeList()
1503 all_nodes.remove(self.op.node_uuid)
1506 return (all_nodes, all_nodes)
1508 def CheckPrereq(self):
1509 """Check prerequisites.
1512 - the node exists in the configuration
1513 - it does not have primary or secondary instances
1514 - it's not the master
1516 Any errors are signaled by raising errors.OpPrereqError.
1519 (self.op.node_uuid, self.op.node_name) = \
1520 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1521 node = self.cfg.GetNodeInfo(self.op.node_uuid)
1522 assert node is not None
1524 masternode = self.cfg.GetMasterNode()
1525 if node.uuid == masternode:
1526 raise errors.OpPrereqError("Node is the master node, failover to another"
1527 " node is required", errors.ECODE_INVAL)
1529 for _, instance in self.cfg.GetAllInstancesInfo().items():
1530 if node.uuid in instance.all_nodes:
1531 raise errors.OpPrereqError("Instance %s is still running on the node,"
1532 " please remove first" % instance.name,
1534 self.op.node_name = node.name
1537 def Exec(self, feedback_fn):
1538 """Removes the node from the cluster.
1541 logging.info("Stopping the node daemon and removing configs from node %s",
1544 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1546 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1549 # Promote nodes to master candidate as needed
1550 AdjustCandidatePool(self, exceptions=[self.node.uuid])
1551 self.context.RemoveNode(self.node)
1553 # Run post hooks on the node before it's removed
1554 RunPostHook(self, self.node.name)
1556 # we have to call this by name rather than by UUID, as the node is no longer
1558 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1559 msg = result.fail_msg
1561 self.LogWarning("Errors encountered on the remote node while leaving"
1562 " the cluster: %s", msg)
1564 # Remove node from our /etc/hosts
1565 if self.cfg.GetClusterInfo().modify_etc_hosts:
1566 master_node_uuid = self.cfg.GetMasterNode()
1567 result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1568 constants.ETC_HOSTS_REMOVE,
1569 self.node.name, None)
1570 result.Raise("Can't update hosts file with new host data")
1571 RedistributeAncillaryFiles(self)
1574 class LURepairNodeStorage(NoHooksLU):
1575 """Repairs the volume group on a node.
1580 def CheckArguments(self):
1581 (self.op.node_uuid, self.op.node_name) = \
1582 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1584 storage_type = self.op.storage_type
1586 if (constants.SO_FIX_CONSISTENCY not in
1587 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1588 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1589 " repaired" % storage_type,
1592 def ExpandNames(self):
1593 self.needed_locks = {
1594 locking.LEVEL_NODE: [self.op.node_uuid],
1597 def _CheckFaultyDisks(self, instance, node_uuid):
1598 """Ensure faulty disks abort the opcode or at least warn."""
1600 if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1602 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1605 self.cfg.GetNodeName(node_uuid)),
1607 except errors.OpPrereqError, err:
1608 if self.op.ignore_consistency:
1609 self.LogWarning(str(err.args[0]))
1613 def CheckPrereq(self):
1614 """Check prerequisites.
1617 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1619 # Check whether any instance on this node has faulty disks
1620 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1621 if not inst.disks_active:
1623 check_nodes = set(inst.all_nodes)
1624 check_nodes.discard(self.op.node_uuid)
1625 for inst_node_uuid in check_nodes:
1626 self._CheckFaultyDisks(inst, inst_node_uuid)
1628 def Exec(self, feedback_fn):
1629 feedback_fn("Repairing storage unit '%s' on %s ..." %
1630 (self.op.name, self.op.node_name))
1632 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1633 result = self.rpc.call_storage_execute(self.op.node_uuid,
1634 self.op.storage_type, st_args,
1636 constants.SO_FIX_CONSISTENCY)
1637 result.Raise("Failed to repair storage unit '%s' on %s" %
1638 (self.op.name, self.op.node_name))