4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with nodes."""
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti import netutils
31 from ganeti import objects
32 from ganeti import opcodes
33 from ganeti import qlang
34 from ganeti import query
35 from ganeti import rpc
36 from ganeti import utils
37 from ganeti.masterd import iallocator
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
41 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42 MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43 IsExclusiveStorageEnabledNode, CheckNodePVs, \
44 RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45 CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46 AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47 GetWantedNodes, MapInstanceDisksToNodes, RunPostHook, \
48 FindFaultyInstanceDisks
51 def _DecideSelfPromotion(lu, exceptions=None):
52 """Decide whether I should promote myself as a master candidate.
55 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57 # the new node will increase mc_max with one, so:
58 mc_should = min(mc_should + 1, cp_size)
59 return mc_now < mc_should
62 def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63 """Ensure that a node has the given secondary ip.
65 @type lu: L{LogicalUnit}
66 @param lu: the LU on behalf of which we make the check
67 @type node: L{objects.Node}
68 @param node: the node to check
69 @type secondary_ip: string
70 @param secondary_ip: the ip to check
72 @param prereq: whether to throw a prerequisite or an execute error
73 @raise errors.OpPrereqError: if the node doesn't have the ip,
75 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
78 # this can be called with a new node, which has no UUID yet, so perform the
79 # RPC call using its name
80 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81 result.Raise("Failure checking secondary ip on node %s" % node.name,
82 prereq=prereq, ecode=errors.ECODE_ENVIRON)
83 if not result.payload:
84 msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85 " please fix and re-run this command" % secondary_ip)
87 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
89 raise errors.OpExecError(msg)
92 class LUNodeAdd(LogicalUnit):
93 """Logical unit for adding node to the cluster.
97 HTYPE = constants.HTYPE_NODE
98 _NFLAGS = ["master_capable", "vm_capable"]
100 def CheckArguments(self):
101 self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102 # validate/normalize the node name
103 self.hostname = netutils.GetHostname(name=self.op.node_name,
104 family=self.primary_ip_family)
105 self.op.node_name = self.hostname.name
107 if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108 raise errors.OpPrereqError("Cannot readd the master node",
111 if self.op.readd and self.op.group:
112 raise errors.OpPrereqError("Cannot pass a node group when a node is"
113 " being readded", errors.ECODE_INVAL)
115 def BuildHooksEnv(self):
118 This will run on all nodes before, and on all nodes + the new node after.
122 "OP_TARGET": self.op.node_name,
123 "NODE_NAME": self.op.node_name,
124 "NODE_PIP": self.op.primary_ip,
125 "NODE_SIP": self.op.secondary_ip,
126 "MASTER_CAPABLE": str(self.op.master_capable),
127 "VM_CAPABLE": str(self.op.vm_capable),
130 def BuildHooksNodes(self):
131 """Build hooks nodes.
134 hook_nodes = self.cfg.GetNodeList()
135 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
136 if new_node_info is not None:
138 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
140 # add the new node as post hook node by name; it does not have an UUID yet
141 return (hook_nodes, hook_nodes, [self.op.node_name, ])
143 def CheckPrereq(self):
144 """Check prerequisites.
147 - the new node is not already in the config
149 - its parameters (single/dual homed) matches the cluster
151 Any errors are signaled by raising errors.OpPrereqError.
155 hostname = self.hostname
156 node_name = hostname.name
157 primary_ip = self.op.primary_ip = hostname.ip
158 if self.op.secondary_ip is None:
159 if self.primary_ip_family == netutils.IP6Address.family:
160 raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
161 " IPv4 address must be given as secondary",
163 self.op.secondary_ip = primary_ip
165 secondary_ip = self.op.secondary_ip
166 if not netutils.IP4Address.IsValid(secondary_ip):
167 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
168 " address" % secondary_ip, errors.ECODE_INVAL)
170 existing_node_info = cfg.GetNodeInfoByName(node_name)
171 if not self.op.readd and existing_node_info is not None:
172 raise errors.OpPrereqError("Node %s is already in the configuration" %
173 node_name, errors.ECODE_EXISTS)
174 elif self.op.readd and existing_node_info is None:
175 raise errors.OpPrereqError("Node %s is not in the configuration" %
176 node_name, errors.ECODE_NOENT)
178 self.changed_primary_ip = False
180 for existing_node in cfg.GetAllNodesInfo().values():
181 if self.op.readd and node_name == existing_node.name:
182 if existing_node.secondary_ip != secondary_ip:
183 raise errors.OpPrereqError("Readded node doesn't have the same IP"
184 " address configuration as before",
186 if existing_node.primary_ip != primary_ip:
187 self.changed_primary_ip = True
191 if (existing_node.primary_ip == primary_ip or
192 existing_node.secondary_ip == primary_ip or
193 existing_node.primary_ip == secondary_ip or
194 existing_node.secondary_ip == secondary_ip):
195 raise errors.OpPrereqError("New node ip address(es) conflict with"
196 " existing node %s" % existing_node.name,
197 errors.ECODE_NOTUNIQUE)
199 # After this 'if' block, None is no longer a valid value for the
200 # _capable op attributes
202 assert existing_node_info is not None, \
203 "Can't retrieve locked node %s" % node_name
204 for attr in self._NFLAGS:
205 if getattr(self.op, attr) is None:
206 setattr(self.op, attr, getattr(existing_node_info, attr))
208 for attr in self._NFLAGS:
209 if getattr(self.op, attr) is None:
210 setattr(self.op, attr, True)
212 if self.op.readd and not self.op.vm_capable:
213 pri, sec = cfg.GetNodeInstances(existing_node_info.uuid)
215 raise errors.OpPrereqError("Node %s being re-added with vm_capable"
216 " flag set to false, but it already holds"
217 " instances" % node_name,
220 # check that the type of the node (single versus dual homed) is the
221 # same as for the master
222 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
223 master_singlehomed = myself.secondary_ip == myself.primary_ip
224 newbie_singlehomed = secondary_ip == primary_ip
225 if master_singlehomed != newbie_singlehomed:
226 if master_singlehomed:
227 raise errors.OpPrereqError("The master has no secondary ip but the"
231 raise errors.OpPrereqError("The master has a secondary ip but the"
232 " new node doesn't have one",
235 # checks reachability
236 if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
237 raise errors.OpPrereqError("Node not reachable by ping",
238 errors.ECODE_ENVIRON)
240 if not newbie_singlehomed:
241 # check reachability from my secondary ip to newbie's secondary ip
242 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
243 source=myself.secondary_ip):
244 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
245 " based ping to node daemon port",
246 errors.ECODE_ENVIRON)
249 exceptions = [existing_node_info.uuid]
253 if self.op.master_capable:
254 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
256 self.master_candidate = False
259 self.new_node = existing_node_info
261 node_group = cfg.LookupNodeGroup(self.op.group)
262 self.new_node = objects.Node(name=node_name,
263 primary_ip=primary_ip,
264 secondary_ip=secondary_ip,
265 master_candidate=self.master_candidate,
266 offline=False, drained=False,
267 group=node_group, ndparams={})
270 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
271 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
272 "node", "cluster or group")
275 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
277 if self.op.disk_state:
278 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
280 # TODO: If we need to have multiple DnsOnlyRunner we probably should make
281 # it a property on the base class.
282 rpcrunner = rpc.DnsOnlyRunner()
283 result = rpcrunner.call_version([node_name])[node_name]
284 result.Raise("Can't get version information from node %s" % node_name)
285 if constants.PROTOCOL_VERSION == result.payload:
286 logging.info("Communication to node %s fine, sw version %s match",
287 node_name, result.payload)
289 raise errors.OpPrereqError("Version mismatch master version %s,"
291 (constants.PROTOCOL_VERSION, result.payload),
292 errors.ECODE_ENVIRON)
294 vg_name = cfg.GetVGName()
295 if vg_name is not None:
296 vparams = {constants.NV_PVLIST: [vg_name]}
297 excl_stor = IsExclusiveStorageEnabledNode(cfg, self.new_node)
298 cname = self.cfg.GetClusterName()
299 result = rpcrunner.call_node_verify_light(
300 [node_name], vparams, cname, cfg.GetClusterInfo().hvparams)[node_name]
301 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
303 raise errors.OpPrereqError("Checks on node PVs failed: %s" %
304 "; ".join(errmsgs), errors.ECODE_ENVIRON)
306 def Exec(self, feedback_fn):
307 """Adds the new node to the cluster.
310 new_node = self.new_node
311 node_name = new_node.name
313 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
316 # We adding a new node so we assume it's powered
317 new_node.powered = True
319 # for re-adds, reset the offline/drained/master-candidate flags;
320 # we need to reset here, otherwise offline would prevent RPC calls
321 # later in the procedure; this also means that if the re-add
322 # fails, we are left with a non-offlined, broken node
324 new_node.drained = new_node.offline = False # pylint: disable=W0201
325 self.LogInfo("Readding a node, the offline/drained flags were reset")
326 # if we demote the node, we do cleanup later in the procedure
327 new_node.master_candidate = self.master_candidate
328 if self.changed_primary_ip:
329 new_node.primary_ip = self.op.primary_ip
331 # copy the master/vm_capable flags
332 for attr in self._NFLAGS:
333 setattr(new_node, attr, getattr(self.op, attr))
335 # notify the user about any possible mc promotion
336 if new_node.master_candidate:
337 self.LogInfo("Node will be a master candidate")
340 new_node.ndparams = self.op.ndparams
342 new_node.ndparams = {}
345 new_node.hv_state_static = self.new_hv_state
347 if self.op.disk_state:
348 new_node.disk_state_static = self.new_disk_state
350 # Add node to our /etc/hosts, and add key to known_hosts
351 if self.cfg.GetClusterInfo().modify_etc_hosts:
352 master_node = self.cfg.GetMasterNode()
353 result = self.rpc.call_etc_hosts_modify(
354 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
356 result.Raise("Can't update hosts file with new host data")
358 if new_node.secondary_ip != new_node.primary_ip:
359 _CheckNodeHasSecondaryIP(self, new_node, new_node.secondary_ip, False)
361 node_verifier_uuids = [self.cfg.GetMasterNode()]
362 node_verify_param = {
363 constants.NV_NODELIST: ([node_name], {}),
364 # TODO: do a node-net-test as well?
367 result = self.rpc.call_node_verify(
368 node_verifier_uuids, node_verify_param,
369 self.cfg.GetClusterName(),
370 self.cfg.GetClusterInfo().hvparams)
371 for verifier in node_verifier_uuids:
372 result[verifier].Raise("Cannot communicate with node %s" % verifier)
373 nl_payload = result[verifier].payload[constants.NV_NODELIST]
375 for failed in nl_payload:
376 feedback_fn("ssh/hostname verification failed"
377 " (checking from %s): %s" %
378 (verifier, nl_payload[failed]))
379 raise errors.OpExecError("ssh/hostname verification failed")
382 self.context.ReaddNode(new_node)
383 RedistributeAncillaryFiles(self)
384 # make sure we redistribute the config
385 self.cfg.Update(new_node, feedback_fn)
386 # and make sure the new node will not have old files around
387 if not new_node.master_candidate:
388 result = self.rpc.call_node_demote_from_mc(new_node.uuid)
389 result.Warn("Node failed to demote itself from master candidate status",
392 self.context.AddNode(new_node, self.proc.GetECId())
393 RedistributeAncillaryFiles(self)
396 class LUNodeSetParams(LogicalUnit):
397 """Modifies the parameters of a node.
399 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
400 to the node role (as _ROLE_*)
401 @cvar _R2F: a dictionary from node role to tuples of flags
402 @cvar _FLAGS: a list of attribute names corresponding to the flags
405 HPATH = "node-modify"
406 HTYPE = constants.HTYPE_NODE
408 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
410 (True, False, False): _ROLE_CANDIDATE,
411 (False, True, False): _ROLE_DRAINED,
412 (False, False, True): _ROLE_OFFLINE,
413 (False, False, False): _ROLE_REGULAR,
415 _R2F = dict((v, k) for k, v in _F2R.items())
416 _FLAGS = ["master_candidate", "drained", "offline"]
418 def CheckArguments(self):
419 (self.op.node_uuid, self.op.node_name) = \
420 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
421 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
422 self.op.master_capable, self.op.vm_capable,
423 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
425 if all_mods.count(None) == len(all_mods):
426 raise errors.OpPrereqError("Please pass at least one modification",
428 if all_mods.count(True) > 1:
429 raise errors.OpPrereqError("Can't set the node into more than one"
430 " state at the same time",
433 # Boolean value that tells us whether we might be demoting from MC
434 self.might_demote = (self.op.master_candidate is False or
435 self.op.offline is True or
436 self.op.drained is True or
437 self.op.master_capable is False)
439 if self.op.secondary_ip:
440 if not netutils.IP4Address.IsValid(self.op.secondary_ip):
441 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
442 " address" % self.op.secondary_ip,
445 self.lock_all = self.op.auto_promote and self.might_demote
446 self.lock_instances = self.op.secondary_ip is not None
448 def _InstanceFilter(self, instance):
449 """Filter for getting affected instances.
452 return (instance.disk_template in constants.DTS_INT_MIRROR and
453 self.op.node_uuid in instance.all_nodes)
455 def ExpandNames(self):
457 self.needed_locks = {
458 locking.LEVEL_NODE: locking.ALL_SET,
460 # Block allocations when all nodes are locked
461 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
464 self.needed_locks = {
465 locking.LEVEL_NODE: self.op.node_uuid,
468 # Since modifying a node can have severe effects on currently running
469 # operations the resource lock is at least acquired in shared mode
470 self.needed_locks[locking.LEVEL_NODE_RES] = \
471 self.needed_locks[locking.LEVEL_NODE]
473 # Get all locks except nodes in shared mode; they are not used for anything
474 # but read-only access
475 self.share_locks = ShareAll()
476 self.share_locks[locking.LEVEL_NODE] = 0
477 self.share_locks[locking.LEVEL_NODE_RES] = 0
478 self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
480 if self.lock_instances:
481 self.needed_locks[locking.LEVEL_INSTANCE] = \
482 frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
484 def BuildHooksEnv(self):
487 This runs on the master node.
491 "OP_TARGET": self.op.node_name,
492 "MASTER_CANDIDATE": str(self.op.master_candidate),
493 "OFFLINE": str(self.op.offline),
494 "DRAINED": str(self.op.drained),
495 "MASTER_CAPABLE": str(self.op.master_capable),
496 "VM_CAPABLE": str(self.op.vm_capable),
499 def BuildHooksNodes(self):
500 """Build hooks nodes.
503 nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
506 def CheckPrereq(self):
507 """Check prerequisites.
509 This only checks the instance list against the existing names.
512 node = self.cfg.GetNodeInfo(self.op.node_uuid)
513 if self.lock_instances:
514 affected_instances = \
515 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
517 # Verify instance locks
518 owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
519 wanted_instances = frozenset(affected_instances.keys())
520 if wanted_instances - owned_instances:
521 raise errors.OpPrereqError("Instances affected by changing node %s's"
522 " secondary IP address have changed since"
523 " locks were acquired, wanted '%s', have"
524 " '%s'; retry the operation" %
526 utils.CommaJoin(wanted_instances),
527 utils.CommaJoin(owned_instances)),
530 affected_instances = None
532 if (self.op.master_candidate is not None or
533 self.op.drained is not None or
534 self.op.offline is not None):
535 # we can't change the master's node flags
536 if node.uuid == self.cfg.GetMasterNode():
537 raise errors.OpPrereqError("The master role can be changed"
538 " only via master-failover",
541 if self.op.master_candidate and not node.master_capable:
542 raise errors.OpPrereqError("Node %s is not master capable, cannot make"
543 " it a master candidate" % node.name,
546 if self.op.vm_capable is False:
547 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
549 raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
550 " the vm_capable flag" % node.name,
553 if node.master_candidate and self.might_demote and not self.lock_all:
554 assert not self.op.auto_promote, "auto_promote set but lock_all not"
555 # check if after removing the current node, we're missing master
557 (mc_remaining, mc_should, _) = \
558 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
559 if mc_remaining < mc_should:
560 raise errors.OpPrereqError("Not enough master candidates, please"
561 " pass auto promote option to allow"
562 " promotion (--auto-promote or RAPI"
563 " auto_promote=True)", errors.ECODE_STATE)
565 self.old_flags = old_flags = (node.master_candidate,
566 node.drained, node.offline)
567 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
568 self.old_role = old_role = self._F2R[old_flags]
570 # Check for ineffective changes
571 for attr in self._FLAGS:
572 if getattr(self.op, attr) is False and getattr(node, attr) is False:
573 self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
574 setattr(self.op, attr, None)
576 # Past this point, any flag change to False means a transition
577 # away from the respective state, as only real changes are kept
579 # TODO: We might query the real power state if it supports OOB
580 if SupportsOob(self.cfg, node):
581 if self.op.offline is False and not (node.powered or
582 self.op.powered is True):
583 raise errors.OpPrereqError(("Node %s needs to be turned on before its"
584 " offline status can be reset") %
585 self.op.node_name, errors.ECODE_STATE)
586 elif self.op.powered is not None:
587 raise errors.OpPrereqError(("Unable to change powered state for node %s"
588 " as it does not support out-of-band"
589 " handling") % self.op.node_name,
592 # If we're being deofflined/drained, we'll MC ourself if needed
593 if (self.op.drained is False or self.op.offline is False or
594 (self.op.master_capable and not node.master_capable)):
595 if _DecideSelfPromotion(self):
596 self.op.master_candidate = True
597 self.LogInfo("Auto-promoting node to master candidate")
599 # If we're no longer master capable, we'll demote ourselves from MC
600 if self.op.master_capable is False and node.master_candidate:
601 self.LogInfo("Demoting from master candidate")
602 self.op.master_candidate = False
605 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
606 if self.op.master_candidate:
607 new_role = self._ROLE_CANDIDATE
608 elif self.op.drained:
609 new_role = self._ROLE_DRAINED
610 elif self.op.offline:
611 new_role = self._ROLE_OFFLINE
612 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
613 # False is still in new flags, which means we're un-setting (the
615 new_role = self._ROLE_REGULAR
616 else: # no new flags, nothing, keep old role
619 self.new_role = new_role
621 if old_role == self._ROLE_OFFLINE and new_role != old_role:
622 # Trying to transition out of offline status
623 result = self.rpc.call_version([node.uuid])[node.uuid]
625 raise errors.OpPrereqError("Node %s is being de-offlined but fails"
626 " to report its version: %s" %
627 (node.name, result.fail_msg),
630 self.LogWarning("Transitioning node from offline to online state"
631 " without using re-add. Please make sure the node"
634 # When changing the secondary ip, verify if this is a single-homed to
635 # multi-homed transition or vice versa, and apply the relevant
637 if self.op.secondary_ip:
638 # Ok even without locking, because this can't be changed by any LU
639 master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
640 master_singlehomed = master.secondary_ip == master.primary_ip
641 if master_singlehomed and self.op.secondary_ip != node.primary_ip:
642 if self.op.force and node.uuid == master.uuid:
643 self.LogWarning("Transitioning from single-homed to multi-homed"
644 " cluster; all nodes will require a secondary IP"
647 raise errors.OpPrereqError("Changing the secondary ip on a"
648 " single-homed cluster requires the"
649 " --force option to be passed, and the"
650 " target node to be the master",
652 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
653 if self.op.force and node.uuid == master.uuid:
654 self.LogWarning("Transitioning from multi-homed to single-homed"
655 " cluster; secondary IP addresses will have to be"
658 raise errors.OpPrereqError("Cannot set the secondary IP to be the"
659 " same as the primary IP on a multi-homed"
660 " cluster, unless the --force option is"
661 " passed, and the target node is the"
662 " master", errors.ECODE_INVAL)
664 assert not (frozenset(affected_instances) -
665 self.owned_locks(locking.LEVEL_INSTANCE))
668 if affected_instances:
669 msg = ("Cannot change secondary IP address: offline node has"
670 " instances (%s) configured to use it" %
671 utils.CommaJoin(affected_instances.keys()))
672 raise errors.OpPrereqError(msg, errors.ECODE_STATE)
674 # On online nodes, check that no instances are running, and that
675 # the node has the new ip and we can reach it.
676 for instance in affected_instances.values():
677 CheckInstanceState(self, instance, INSTANCE_DOWN,
678 msg="cannot change secondary ip")
680 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
681 if master.uuid != node.uuid:
682 # check reachability from master secondary ip to new secondary ip
683 if not netutils.TcpPing(self.op.secondary_ip,
684 constants.DEFAULT_NODED_PORT,
685 source=master.secondary_ip):
686 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
687 " based ping to node daemon port",
688 errors.ECODE_ENVIRON)
691 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
692 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
693 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
694 "node", "cluster or group")
695 self.new_ndparams = new_ndparams
698 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
699 node.hv_state_static)
701 if self.op.disk_state:
702 self.new_disk_state = \
703 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
705 def Exec(self, feedback_fn):
709 node = self.cfg.GetNodeInfo(self.op.node_uuid)
710 old_role = self.old_role
711 new_role = self.new_role
716 node.ndparams = self.new_ndparams
718 if self.op.powered is not None:
719 node.powered = self.op.powered
722 node.hv_state_static = self.new_hv_state
724 if self.op.disk_state:
725 node.disk_state_static = self.new_disk_state
727 for attr in ["master_capable", "vm_capable"]:
728 val = getattr(self.op, attr)
730 setattr(node, attr, val)
731 result.append((attr, str(val)))
733 if new_role != old_role:
734 # Tell the node to demote itself, if no longer MC and not offline
735 if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE:
736 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
738 self.LogWarning("Node failed to demote itself: %s", msg)
740 new_flags = self._R2F[new_role]
741 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
743 result.append((desc, str(nf)))
744 (node.master_candidate, node.drained, node.offline) = new_flags
746 # we locked all nodes, we adjust the CP before updating this node
748 AdjustCandidatePool(self, [node.uuid])
750 if self.op.secondary_ip:
751 node.secondary_ip = self.op.secondary_ip
752 result.append(("secondary_ip", self.op.secondary_ip))
754 # this will trigger configuration file update, if needed
755 self.cfg.Update(node, feedback_fn)
757 # this will trigger job queue propagation or cleanup if the mc
759 if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1:
760 self.context.ReaddNode(node)
765 class LUNodePowercycle(NoHooksLU):
766 """Powercycles a node.
771 def CheckArguments(self):
772 (self.op.node_uuid, self.op.node_name) = \
773 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
775 if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
776 raise errors.OpPrereqError("The node is the master and the force"
777 " parameter was not set",
780 def ExpandNames(self):
781 """Locking for PowercycleNode.
783 This is a last-resort option and shouldn't block on other
784 jobs. Therefore, we grab no locks.
787 self.needed_locks = {}
789 def Exec(self, feedback_fn):
793 default_hypervisor = self.cfg.GetHypervisorType()
794 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
795 result = self.rpc.call_node_powercycle(self.op.node_uuid,
798 result.Raise("Failed to schedule the reboot")
799 return result.payload
802 def _GetNodeInstancesInner(cfg, fn):
803 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
806 def _GetNodePrimaryInstances(cfg, node_uuid):
807 """Returns primary instances on a node.
810 return _GetNodeInstancesInner(cfg,
811 lambda inst: node_uuid == inst.primary_node)
814 def _GetNodeSecondaryInstances(cfg, node_uuid):
815 """Returns secondary instances on a node.
818 return _GetNodeInstancesInner(cfg,
819 lambda inst: node_uuid in inst.secondary_nodes)
822 def _GetNodeInstances(cfg, node_uuid):
823 """Returns a list of all primary and secondary instances on a node.
827 return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
830 class LUNodeEvacuate(NoHooksLU):
831 """Evacuates instances off a list of nodes.
837 constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
838 constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
839 constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
841 assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
842 assert (frozenset(_MODE2IALLOCATOR.values()) ==
843 constants.IALLOCATOR_NEVAC_MODES)
845 def CheckArguments(self):
846 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
848 def ExpandNames(self):
849 (self.op.node_uuid, self.op.node_name) = \
850 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
852 if self.op.remote_node is not None:
853 (self.op.remote_node_uuid, self.op.remote_node) = \
854 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
856 assert self.op.remote_node
858 if self.op.node_uuid == self.op.remote_node_uuid:
859 raise errors.OpPrereqError("Can not use evacuated node as a new"
860 " secondary node", errors.ECODE_INVAL)
862 if self.op.mode != constants.NODE_EVAC_SEC:
863 raise errors.OpPrereqError("Without the use of an iallocator only"
864 " secondary instances can be evacuated",
868 self.share_locks = ShareAll()
869 self.needed_locks = {
870 locking.LEVEL_INSTANCE: [],
871 locking.LEVEL_NODEGROUP: [],
872 locking.LEVEL_NODE: [],
875 # Determine nodes (via group) optimistically, needs verification once locks
877 self.lock_nodes = self._DetermineNodes()
879 def _DetermineNodes(self):
880 """Gets the list of node UUIDs to operate on.
883 if self.op.remote_node is None:
884 # Iallocator will choose any node(s) in the same group
885 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
887 group_nodes = frozenset([self.op.remote_node_uuid])
889 # Determine nodes to be locked
890 return set([self.op.node_uuid]) | group_nodes
892 def _DetermineInstances(self):
893 """Builds list of instances to operate on.
896 assert self.op.mode in constants.NODE_EVAC_MODES
898 if self.op.mode == constants.NODE_EVAC_PRI:
899 # Primary instances only
900 inst_fn = _GetNodePrimaryInstances
901 assert self.op.remote_node is None, \
902 "Evacuating primary instances requires iallocator"
903 elif self.op.mode == constants.NODE_EVAC_SEC:
904 # Secondary instances only
905 inst_fn = _GetNodeSecondaryInstances
908 assert self.op.mode == constants.NODE_EVAC_ALL
909 inst_fn = _GetNodeInstances
910 # TODO: In 2.6, change the iallocator interface to take an evacuation mode
912 raise errors.OpPrereqError("Due to an issue with the iallocator"
913 " interface it is not possible to evacuate"
914 " all instances at once; specify explicitly"
915 " whether to evacuate primary or secondary"
919 return inst_fn(self.cfg, self.op.node_uuid)
921 def DeclareLocks(self, level):
922 if level == locking.LEVEL_INSTANCE:
923 # Lock instances optimistically, needs verification once node and group
924 # locks have been acquired
925 self.needed_locks[locking.LEVEL_INSTANCE] = \
926 set(i.name for i in self._DetermineInstances())
928 elif level == locking.LEVEL_NODEGROUP:
929 # Lock node groups for all potential target nodes optimistically, needs
930 # verification once nodes have been acquired
931 self.needed_locks[locking.LEVEL_NODEGROUP] = \
932 self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
934 elif level == locking.LEVEL_NODE:
935 self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
937 def CheckPrereq(self):
939 owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
940 owned_nodes = self.owned_locks(locking.LEVEL_NODE)
941 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
943 need_nodes = self._DetermineNodes()
945 if not owned_nodes.issuperset(need_nodes):
946 raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
947 " locks were acquired, current nodes are"
948 " are '%s', used to be '%s'; retry the"
951 utils.CommaJoin(need_nodes),
952 utils.CommaJoin(owned_nodes)),
955 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
956 if owned_groups != wanted_groups:
957 raise errors.OpExecError("Node groups changed since locks were acquired,"
958 " current groups are '%s', used to be '%s';"
959 " retry the operation" %
960 (utils.CommaJoin(wanted_groups),
961 utils.CommaJoin(owned_groups)))
963 # Determine affected instances
964 self.instances = self._DetermineInstances()
965 self.instance_names = [i.name for i in self.instances]
967 if set(self.instance_names) != owned_instances:
968 raise errors.OpExecError("Instances on node '%s' changed since locks"
969 " were acquired, current instances are '%s',"
970 " used to be '%s'; retry the operation" %
972 utils.CommaJoin(self.instance_names),
973 utils.CommaJoin(owned_instances)))
975 if self.instance_names:
976 self.LogInfo("Evacuating instances from node '%s': %s",
978 utils.CommaJoin(utils.NiceSort(self.instance_names)))
980 self.LogInfo("No instances to evacuate from node '%s'",
983 if self.op.remote_node is not None:
984 for i in self.instances:
985 if i.primary_node == self.op.remote_node_uuid:
986 raise errors.OpPrereqError("Node %s is the primary node of"
987 " instance %s, cannot use it as"
989 (self.op.remote_node, i.name),
992 def Exec(self, feedback_fn):
993 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
995 if not self.instance_names:
996 # No instances to evacuate
999 elif self.op.iallocator is not None:
1000 # TODO: Implement relocation to other group
1001 evac_mode = self._MODE2IALLOCATOR[self.op.mode]
1002 req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
1003 instances=list(self.instance_names))
1004 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1006 ial.Run(self.op.iallocator)
1009 raise errors.OpPrereqError("Can't compute node evacuation using"
1010 " iallocator '%s': %s" %
1011 (self.op.iallocator, ial.info),
1014 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1016 elif self.op.remote_node is not None:
1017 assert self.op.mode == constants.NODE_EVAC_SEC
1019 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1020 remote_node=self.op.remote_node,
1022 mode=constants.REPLACE_DISK_CHG,
1023 early_release=self.op.early_release)]
1024 for instance_name in self.instance_names]
1027 raise errors.ProgrammerError("No iallocator or remote node")
1029 return ResultWithJobs(jobs)
1032 class LUNodeMigrate(LogicalUnit):
1033 """Migrate all instances from a node.
1036 HPATH = "node-migrate"
1037 HTYPE = constants.HTYPE_NODE
1040 def CheckArguments(self):
1043 def ExpandNames(self):
1044 (self.op.node_uuid, self.op.node_name) = \
1045 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1047 self.share_locks = ShareAll()
1048 self.needed_locks = {
1049 locking.LEVEL_NODE: [self.op.node_uuid],
1052 def BuildHooksEnv(self):
1055 This runs on the master, the primary and all the secondaries.
1059 "NODE_NAME": self.op.node_name,
1060 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1063 def BuildHooksNodes(self):
1064 """Build hooks nodes.
1067 nl = [self.cfg.GetMasterNode()]
1070 def CheckPrereq(self):
1073 def Exec(self, feedback_fn):
1074 # Prepare jobs for migration instances
1075 allow_runtime_changes = self.op.allow_runtime_changes
1077 [opcodes.OpInstanceMigrate(instance_name=inst.name,
1080 iallocator=self.op.iallocator,
1081 target_node=self.op.target_node,
1082 allow_runtime_changes=allow_runtime_changes,
1083 ignore_ipolicy=self.op.ignore_ipolicy)]
1084 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1086 # TODO: Run iallocator in this opcode and pass correct placement options to
1087 # OpInstanceMigrate. Since other jobs can modify the cluster between
1088 # running the iallocator and the actual migration, a good consistency model
1089 # will have to be found.
1091 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1092 frozenset([self.op.node_name]))
1094 return ResultWithJobs(jobs)
1097 def _GetStorageTypeArgs(cfg, storage_type):
1098 """Returns the arguments for a storage type.
1101 # Special case for file storage
1102 if storage_type == constants.ST_FILE:
1103 # storage.FileStorage wants a list of storage directories
1104 return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1109 class LUNodeModifyStorage(NoHooksLU):
1110 """Logical unit for modifying a storage volume on a node.
1115 def CheckArguments(self):
1116 (self.op.node_uuid, self.op.node_name) = \
1117 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1119 storage_type = self.op.storage_type
1122 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1124 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1125 " modified" % storage_type,
1128 diff = set(self.op.changes.keys()) - modifiable
1130 raise errors.OpPrereqError("The following fields can not be modified for"
1131 " storage units of type '%s': %r" %
1132 (storage_type, list(diff)),
1135 def ExpandNames(self):
1136 self.needed_locks = {
1137 locking.LEVEL_NODE: self.op.node_uuid,
1140 def Exec(self, feedback_fn):
1141 """Computes the list of nodes and their attributes.
1144 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1145 result = self.rpc.call_storage_modify(self.op.node_uuid,
1146 self.op.storage_type, st_args,
1147 self.op.name, self.op.changes)
1148 result.Raise("Failed to modify storage unit '%s' on %s" %
1149 (self.op.name, self.op.node_name))
1152 class NodeQuery(QueryBase):
1153 FIELDS = query.NODE_FIELDS
1155 def ExpandNames(self, lu):
1156 lu.needed_locks = {}
1157 lu.share_locks = ShareAll()
1160 (self.wanted, _) = GetWantedNodes(lu, self.names)
1162 self.wanted = locking.ALL_SET
1164 self.do_locking = (self.use_locking and
1165 query.NQ_LIVE in self.requested_data)
1168 # If any non-static field is requested we need to lock the nodes
1169 lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1170 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1172 def DeclareLocks(self, lu, level):
1175 def _GetQueryData(self, lu):
1176 """Computes the list of nodes and their attributes.
1179 all_info = lu.cfg.GetAllNodesInfo()
1181 node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1183 # Gather data as requested
1184 if query.NQ_LIVE in self.requested_data:
1185 # filter out non-vm_capable nodes
1186 toquery_node_uuids = [node.uuid for node in all_info.values()
1187 if node.vm_capable and node.uuid in node_uuids]
1189 es_flags = rpc.GetExclusiveStorageForNodes(lu.cfg, toquery_node_uuids)
1190 # FIXME: This currently maps everything to lvm, this should be more
1192 vg_req = rpc.BuildVgInfoQuery(lu.cfg)
1193 default_hypervisor = lu.cfg.GetHypervisorType()
1194 hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1195 hvspecs = [(default_hypervisor, hvparams)]
1196 node_data = lu.rpc.call_node_info(toquery_node_uuids, vg_req,
1198 live_data = dict((uuid, rpc.MakeLegacyNodeInfo(nresult.payload))
1199 for (uuid, nresult) in node_data.items()
1200 if not nresult.fail_msg and nresult.payload)
1204 if query.NQ_INST in self.requested_data:
1205 node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1206 node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1208 inst_data = lu.cfg.GetAllInstancesInfo()
1210 for inst in inst_data.values():
1211 if inst.primary_node in node_to_primary:
1212 node_to_primary[inst.primary_node].add(inst.name)
1213 for secnode in inst.secondary_nodes:
1214 if secnode in node_to_secondary:
1215 node_to_secondary[secnode].add(inst.name)
1217 node_to_primary = None
1218 node_to_secondary = None
1220 if query.NQ_OOB in self.requested_data:
1221 oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1222 for uuid, node in all_info.iteritems())
1226 if query.NQ_GROUP in self.requested_data:
1227 groups = lu.cfg.GetAllNodeGroupsInfo()
1231 return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1232 live_data, lu.cfg.GetMasterNode(),
1233 node_to_primary, node_to_secondary, groups,
1234 oob_support, lu.cfg.GetClusterInfo())
1237 class LUNodeQuery(NoHooksLU):
1238 """Logical unit for querying nodes.
1241 # pylint: disable=W0142
1244 def CheckArguments(self):
1245 self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1246 self.op.output_fields, self.op.use_locking)
1248 def ExpandNames(self):
1249 self.nq.ExpandNames(self)
1251 def DeclareLocks(self, level):
1252 self.nq.DeclareLocks(self, level)
1254 def Exec(self, feedback_fn):
1255 return self.nq.OldStyleQuery(self)
1258 def _CheckOutputFields(static, dynamic, selected):
1259 """Checks whether all selected fields are valid.
1261 @type static: L{utils.FieldSet}
1262 @param static: static fields set
1263 @type dynamic: L{utils.FieldSet}
1264 @param dynamic: dynamic fields set
1267 f = utils.FieldSet()
1271 delta = f.NonMatching(selected)
1273 raise errors.OpPrereqError("Unknown output fields selected: %s"
1274 % ",".join(delta), errors.ECODE_INVAL)
1277 class LUNodeQueryvols(NoHooksLU):
1278 """Logical unit for getting volumes on node(s).
1282 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1283 _FIELDS_STATIC = utils.FieldSet("node")
1285 def CheckArguments(self):
1286 _CheckOutputFields(static=self._FIELDS_STATIC,
1287 dynamic=self._FIELDS_DYNAMIC,
1288 selected=self.op.output_fields)
1290 def ExpandNames(self):
1291 self.share_locks = ShareAll()
1294 self.needed_locks = {
1295 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1298 self.needed_locks = {
1299 locking.LEVEL_NODE: locking.ALL_SET,
1300 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1303 def Exec(self, feedback_fn):
1304 """Computes the list of nodes and their attributes.
1307 node_uuids = self.owned_locks(locking.LEVEL_NODE)
1308 volumes = self.rpc.call_node_volumes(node_uuids)
1310 ilist = self.cfg.GetAllInstancesInfo()
1311 vol2inst = MapInstanceDisksToNodes(ilist.values())
1314 for node_uuid in node_uuids:
1315 nresult = volumes[node_uuid]
1318 msg = nresult.fail_msg
1320 self.LogWarning("Can't compute volume data on node %s: %s",
1321 self.cfg.GetNodeName(node_uuid), msg)
1324 node_vols = sorted(nresult.payload,
1325 key=operator.itemgetter("dev"))
1327 for vol in node_vols:
1329 for field in self.op.output_fields:
1331 val = self.cfg.GetNodeName(node_uuid)
1332 elif field == "phys":
1336 elif field == "name":
1338 elif field == "size":
1339 val = int(float(vol["size"]))
1340 elif field == "instance":
1341 val = vol2inst.get((node_uuid, vol["vg"] + "/" + vol["name"]), "-")
1343 raise errors.ParameterError(field)
1344 node_output.append(str(val))
1346 output.append(node_output)
1351 class LUNodeQueryStorage(NoHooksLU):
1352 """Logical unit for getting information on storage units on node(s).
1355 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1358 def CheckArguments(self):
1359 _CheckOutputFields(static=self._FIELDS_STATIC,
1360 dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1361 selected=self.op.output_fields)
1363 def ExpandNames(self):
1364 self.share_locks = ShareAll()
1367 self.needed_locks = {
1368 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1371 self.needed_locks = {
1372 locking.LEVEL_NODE: locking.ALL_SET,
1373 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1376 def Exec(self, feedback_fn):
1377 """Computes the list of nodes and their attributes.
1380 self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1382 # Always get name to sort by
1383 if constants.SF_NAME in self.op.output_fields:
1384 fields = self.op.output_fields[:]
1386 fields = [constants.SF_NAME] + self.op.output_fields
1388 # Never ask for node or type as it's only known to the LU
1389 for extra in [constants.SF_NODE, constants.SF_TYPE]:
1390 while extra in fields:
1391 fields.remove(extra)
1393 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1394 name_idx = field_idx[constants.SF_NAME]
1396 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1397 data = self.rpc.call_storage_list(self.node_uuids,
1398 self.op.storage_type, st_args,
1399 self.op.name, fields)
1403 for node_uuid in utils.NiceSort(self.node_uuids):
1404 node_name = self.cfg.GetNodeName(node_uuid)
1405 nresult = data[node_uuid]
1409 msg = nresult.fail_msg
1411 self.LogWarning("Can't get storage data from node %s: %s",
1415 rows = dict([(row[name_idx], row) for row in nresult.payload])
1417 for name in utils.NiceSort(rows.keys()):
1422 for field in self.op.output_fields:
1423 if field == constants.SF_NODE:
1425 elif field == constants.SF_TYPE:
1426 val = self.op.storage_type
1427 elif field in field_idx:
1428 val = row[field_idx[field]]
1430 raise errors.ParameterError(field)
1439 class LUNodeRemove(LogicalUnit):
1440 """Logical unit for removing a node.
1443 HPATH = "node-remove"
1444 HTYPE = constants.HTYPE_NODE
1446 def BuildHooksEnv(self):
1451 "OP_TARGET": self.op.node_name,
1452 "NODE_NAME": self.op.node_name,
1455 def BuildHooksNodes(self):
1456 """Build hooks nodes.
1458 This doesn't run on the target node in the pre phase as a failed
1459 node would then be impossible to remove.
1462 all_nodes = self.cfg.GetNodeList()
1464 all_nodes.remove(self.op.node_uuid)
1467 return (all_nodes, all_nodes)
1469 def CheckPrereq(self):
1470 """Check prerequisites.
1473 - the node exists in the configuration
1474 - it does not have primary or secondary instances
1475 - it's not the master
1477 Any errors are signaled by raising errors.OpPrereqError.
1480 (self.op.node_uuid, self.op.node_name) = \
1481 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1482 node = self.cfg.GetNodeInfo(self.op.node_uuid)
1483 assert node is not None
1485 masternode = self.cfg.GetMasterNode()
1486 if node.uuid == masternode:
1487 raise errors.OpPrereqError("Node is the master node, failover to another"
1488 " node is required", errors.ECODE_INVAL)
1490 for instance_name, instance in self.cfg.GetAllInstancesInfo().items():
1491 if node.uuid in instance.all_nodes:
1492 raise errors.OpPrereqError("Instance %s is still running on the node,"
1493 " please remove first" % instance_name,
1495 self.op.node_name = node.name
1498 def Exec(self, feedback_fn):
1499 """Removes the node from the cluster.
1503 logging.info("Stopping the node daemon and removing configs from node %s",
1506 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1508 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1511 # Promote nodes to master candidate as needed
1512 AdjustCandidatePool(self, exceptions=[node.uuid])
1513 self.context.RemoveNode(node)
1515 # Run post hooks on the node before it's removed
1516 RunPostHook(self, node.name)
1518 # we have to call this by name rather than by UUID, as the node is no longer
1520 result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
1521 msg = result.fail_msg
1523 self.LogWarning("Errors encountered on the remote node while leaving"
1524 " the cluster: %s", msg)
1526 # Remove node from our /etc/hosts
1527 if self.cfg.GetClusterInfo().modify_etc_hosts:
1528 master_node_uuid = self.cfg.GetMasterNode()
1529 result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1530 constants.ETC_HOSTS_REMOVE,
1532 result.Raise("Can't update hosts file with new host data")
1533 RedistributeAncillaryFiles(self)
1536 class LURepairNodeStorage(NoHooksLU):
1537 """Repairs the volume group on a node.
1542 def CheckArguments(self):
1543 (self.op.node_uuid, self.op.node_name) = \
1544 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1546 storage_type = self.op.storage_type
1548 if (constants.SO_FIX_CONSISTENCY not in
1549 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1550 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1551 " repaired" % storage_type,
1554 def ExpandNames(self):
1555 self.needed_locks = {
1556 locking.LEVEL_NODE: [self.op.node_uuid],
1559 def _CheckFaultyDisks(self, instance, node_uuid):
1560 """Ensure faulty disks abort the opcode or at least warn."""
1562 if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1564 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1567 self.cfg.GetNodeName(node_uuid)),
1569 except errors.OpPrereqError, err:
1570 if self.op.ignore_consistency:
1571 self.LogWarning(str(err.args[0]))
1575 def CheckPrereq(self):
1576 """Check prerequisites.
1579 # Check whether any instance on this node has faulty disks
1580 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1581 if not inst.disks_active:
1583 check_nodes = set(inst.all_nodes)
1584 check_nodes.discard(self.op.node_uuid)
1585 for inst_node_uuid in check_nodes:
1586 self._CheckFaultyDisks(inst, inst_node_uuid)
1588 def Exec(self, feedback_fn):
1589 feedback_fn("Repairing storage unit '%s' on %s ..." %
1590 (self.op.name, self.op.node_name))
1592 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1593 result = self.rpc.call_storage_execute(self.op.node_uuid,
1594 self.op.storage_type, st_args,
1596 constants.SO_FIX_CONSISTENCY)
1597 result.Raise("Failed to repair storage unit '%s' on %s" %
1598 (self.op.name, self.op.node_name))