4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with nodes."""
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti import netutils
31 from ganeti import objects
32 from ganeti import opcodes
33 from ganeti import qlang
34 from ganeti import query
35 from ganeti import rpc
36 from ganeti import utils
37 from ganeti.masterd import iallocator
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
41 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42 MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43 IsExclusiveStorageEnabledNode, CheckNodePVs, \
44 RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45 CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46 AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47 GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48 FindFaultyInstanceDisks, CheckStorageTypeEnabled
51 def _DecideSelfPromotion(lu, exceptions=None):
52 """Decide whether I should promote myself as a master candidate.
55 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57 # the new node will increase mc_max with one, so:
58 mc_should = min(mc_should + 1, cp_size)
59 return mc_now < mc_should
62 def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63 """Ensure that a node has the given secondary ip.
65 @type lu: L{LogicalUnit}
66 @param lu: the LU on behalf of which we make the check
67 @type node: L{objects.Node}
68 @param node: the node to check
69 @type secondary_ip: string
70 @param secondary_ip: the ip to check
72 @param prereq: whether to throw a prerequisite or an execute error
73 @raise errors.OpPrereqError: if the node doesn't have the ip,
75 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
78 # this can be called with a new node, which has no UUID yet, so perform the
79 # RPC call using its name
80 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81 result.Raise("Failure checking secondary ip on node %s" % node.name,
82 prereq=prereq, ecode=errors.ECODE_ENVIRON)
83 if not result.payload:
84 msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85 " please fix and re-run this command" % secondary_ip)
87 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
89 raise errors.OpExecError(msg)
92 class LUNodeAdd(LogicalUnit):
93 """Logical unit for adding node to the cluster.
97 HTYPE = constants.HTYPE_NODE
98 _NFLAGS = ["master_capable", "vm_capable"]
100 def CheckArguments(self):
101 self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102 # validate/normalize the node name
103 self.hostname = netutils.GetHostname(name=self.op.node_name,
104 family=self.primary_ip_family)
105 self.op.node_name = self.hostname.name
107 if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108 raise errors.OpPrereqError("Cannot readd the master node",
111 if self.op.readd and self.op.group:
112 raise errors.OpPrereqError("Cannot pass a node group when a node is"
113 " being readded", errors.ECODE_INVAL)
115 def BuildHooksEnv(self):
118 This will run on all nodes before, and on all nodes + the new node after.
122 "OP_TARGET": self.op.node_name,
123 "NODE_NAME": self.op.node_name,
124 "NODE_PIP": self.op.primary_ip,
125 "NODE_SIP": self.op.secondary_ip,
126 "MASTER_CAPABLE": str(self.op.master_capable),
127 "VM_CAPABLE": str(self.op.vm_capable),
130 def BuildHooksNodes(self):
131 """Build hooks nodes.
134 hook_nodes = self.cfg.GetNodeList()
135 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
136 if new_node_info is not None:
138 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
140 # add the new node as post hook node by name; it does not have an UUID yet
141 return (hook_nodes, hook_nodes, [self.op.node_name, ])
143 def CheckPrereq(self):
144 """Check prerequisites.
147 - the new node is not already in the config
149 - its parameters (single/dual homed) matches the cluster
151 Any errors are signaled by raising errors.OpPrereqError.
154 node_name = self.hostname.name
155 self.op.primary_ip = self.hostname.ip
156 if self.op.secondary_ip is None:
157 if self.primary_ip_family == netutils.IP6Address.family:
158 raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
159 " IPv4 address must be given as secondary",
161 self.op.secondary_ip = self.op.primary_ip
163 secondary_ip = self.op.secondary_ip
164 if not netutils.IP4Address.IsValid(secondary_ip):
165 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
166 " address" % secondary_ip, errors.ECODE_INVAL)
168 existing_node_info = self.cfg.GetNodeInfoByName(node_name)
169 if not self.op.readd and existing_node_info is not None:
170 raise errors.OpPrereqError("Node %s is already in the configuration" %
171 node_name, errors.ECODE_EXISTS)
172 elif self.op.readd and existing_node_info is None:
173 raise errors.OpPrereqError("Node %s is not in the configuration" %
174 node_name, errors.ECODE_NOENT)
176 self.changed_primary_ip = False
178 for existing_node in self.cfg.GetAllNodesInfo().values():
179 if self.op.readd and node_name == existing_node.name:
180 if existing_node.secondary_ip != secondary_ip:
181 raise errors.OpPrereqError("Readded node doesn't have the same IP"
182 " address configuration as before",
184 if existing_node.primary_ip != self.op.primary_ip:
185 self.changed_primary_ip = True
189 if (existing_node.primary_ip == self.op.primary_ip or
190 existing_node.secondary_ip == self.op.primary_ip or
191 existing_node.primary_ip == secondary_ip or
192 existing_node.secondary_ip == secondary_ip):
193 raise errors.OpPrereqError("New node ip address(es) conflict with"
194 " existing node %s" % existing_node.name,
195 errors.ECODE_NOTUNIQUE)
197 # After this 'if' block, None is no longer a valid value for the
198 # _capable op attributes
200 assert existing_node_info is not None, \
201 "Can't retrieve locked node %s" % node_name
202 for attr in self._NFLAGS:
203 if getattr(self.op, attr) is None:
204 setattr(self.op, attr, getattr(existing_node_info, attr))
206 for attr in self._NFLAGS:
207 if getattr(self.op, attr) is None:
208 setattr(self.op, attr, True)
210 if self.op.readd and not self.op.vm_capable:
211 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
213 raise errors.OpPrereqError("Node %s being re-added with vm_capable"
214 " flag set to false, but it already holds"
215 " instances" % node_name,
218 # check that the type of the node (single versus dual homed) is the
219 # same as for the master
220 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
221 master_singlehomed = myself.secondary_ip == myself.primary_ip
222 newbie_singlehomed = secondary_ip == self.op.primary_ip
223 if master_singlehomed != newbie_singlehomed:
224 if master_singlehomed:
225 raise errors.OpPrereqError("The master has no secondary ip but the"
229 raise errors.OpPrereqError("The master has a secondary ip but the"
230 " new node doesn't have one",
233 # checks reachability
234 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
235 raise errors.OpPrereqError("Node not reachable by ping",
236 errors.ECODE_ENVIRON)
238 if not newbie_singlehomed:
239 # check reachability from my secondary ip to newbie's secondary ip
240 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
241 source=myself.secondary_ip):
242 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
243 " based ping to node daemon port",
244 errors.ECODE_ENVIRON)
247 exceptions = [existing_node_info.uuid]
251 if self.op.master_capable:
252 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
254 self.master_candidate = False
257 self.new_node = existing_node_info
259 node_group = self.cfg.LookupNodeGroup(self.op.group)
260 self.new_node = objects.Node(name=node_name,
261 primary_ip=self.op.primary_ip,
262 secondary_ip=secondary_ip,
263 master_candidate=self.master_candidate,
264 offline=False, drained=False,
265 group=node_group, ndparams={})
268 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
269 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
270 "node", "cluster or group")
273 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
275 if self.op.disk_state:
276 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
278 # TODO: If we need to have multiple DnsOnlyRunner we probably should make
279 # it a property on the base class.
280 rpcrunner = rpc.DnsOnlyRunner()
281 result = rpcrunner.call_version([node_name])[node_name]
282 result.Raise("Can't get version information from node %s" % node_name)
283 if constants.PROTOCOL_VERSION == result.payload:
284 logging.info("Communication to node %s fine, sw version %s match",
285 node_name, result.payload)
287 raise errors.OpPrereqError("Version mismatch master version %s,"
289 (constants.PROTOCOL_VERSION, result.payload),
290 errors.ECODE_ENVIRON)
292 vg_name = self.cfg.GetVGName()
293 if vg_name is not None:
294 vparams = {constants.NV_PVLIST: [vg_name]}
295 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
296 cname = self.cfg.GetClusterName()
297 result = rpcrunner.call_node_verify_light(
298 [node_name], vparams, cname,
299 self.cfg.GetClusterInfo().hvparams)[node_name]
300 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
302 raise errors.OpPrereqError("Checks on node PVs failed: %s" %
303 "; ".join(errmsgs), errors.ECODE_ENVIRON)
305 def Exec(self, feedback_fn):
306 """Adds the new node to the cluster.
309 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
312 # We adding a new node so we assume it's powered
313 self.new_node.powered = True
315 # for re-adds, reset the offline/drained/master-candidate flags;
316 # we need to reset here, otherwise offline would prevent RPC calls
317 # later in the procedure; this also means that if the re-add
318 # fails, we are left with a non-offlined, broken node
320 self.new_node.drained = False
321 self.LogInfo("Readding a node, the offline/drained flags were reset")
322 # if we demote the node, we do cleanup later in the procedure
323 self.new_node.master_candidate = self.master_candidate
324 if self.changed_primary_ip:
325 self.new_node.primary_ip = self.op.primary_ip
327 # copy the master/vm_capable flags
328 for attr in self._NFLAGS:
329 setattr(self.new_node, attr, getattr(self.op, attr))
331 # notify the user about any possible mc promotion
332 if self.new_node.master_candidate:
333 self.LogInfo("Node will be a master candidate")
336 self.new_node.ndparams = self.op.ndparams
338 self.new_node.ndparams = {}
341 self.new_node.hv_state_static = self.new_hv_state
343 if self.op.disk_state:
344 self.new_node.disk_state_static = self.new_disk_state
346 # Add node to our /etc/hosts, and add key to known_hosts
347 if self.cfg.GetClusterInfo().modify_etc_hosts:
348 master_node = self.cfg.GetMasterNode()
349 result = self.rpc.call_etc_hosts_modify(
350 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
352 result.Raise("Can't update hosts file with new host data")
354 if self.new_node.secondary_ip != self.new_node.primary_ip:
355 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
358 node_verifier_uuids = [self.cfg.GetMasterNode()]
359 node_verify_param = {
360 constants.NV_NODELIST: ([self.new_node.name], {}),
361 # TODO: do a node-net-test as well?
364 result = self.rpc.call_node_verify(
365 node_verifier_uuids, node_verify_param,
366 self.cfg.GetClusterName(),
367 self.cfg.GetClusterInfo().hvparams)
368 for verifier in node_verifier_uuids:
369 result[verifier].Raise("Cannot communicate with node %s" % verifier)
370 nl_payload = result[verifier].payload[constants.NV_NODELIST]
372 for failed in nl_payload:
373 feedback_fn("ssh/hostname verification failed"
374 " (checking from %s): %s" %
375 (verifier, nl_payload[failed]))
376 raise errors.OpExecError("ssh/hostname verification failed")
379 self.context.ReaddNode(self.new_node)
380 RedistributeAncillaryFiles(self)
381 # make sure we redistribute the config
382 self.cfg.Update(self.new_node, feedback_fn)
383 # and make sure the new node will not have old files around
384 if not self.new_node.master_candidate:
385 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
386 result.Warn("Node failed to demote itself from master candidate status",
389 self.context.AddNode(self.new_node, self.proc.GetECId())
390 RedistributeAncillaryFiles(self)
393 class LUNodeSetParams(LogicalUnit):
394 """Modifies the parameters of a node.
396 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
397 to the node role (as _ROLE_*)
398 @cvar _R2F: a dictionary from node role to tuples of flags
399 @cvar _FLAGS: a list of attribute names corresponding to the flags
402 HPATH = "node-modify"
403 HTYPE = constants.HTYPE_NODE
405 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
407 (True, False, False): _ROLE_CANDIDATE,
408 (False, True, False): _ROLE_DRAINED,
409 (False, False, True): _ROLE_OFFLINE,
410 (False, False, False): _ROLE_REGULAR,
412 _R2F = dict((v, k) for k, v in _F2R.items())
413 _FLAGS = ["master_candidate", "drained", "offline"]
415 def CheckArguments(self):
416 (self.op.node_uuid, self.op.node_name) = \
417 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
418 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
419 self.op.master_capable, self.op.vm_capable,
420 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
422 if all_mods.count(None) == len(all_mods):
423 raise errors.OpPrereqError("Please pass at least one modification",
425 if all_mods.count(True) > 1:
426 raise errors.OpPrereqError("Can't set the node into more than one"
427 " state at the same time",
430 # Boolean value that tells us whether we might be demoting from MC
431 self.might_demote = (self.op.master_candidate is False or
432 self.op.offline is True or
433 self.op.drained is True or
434 self.op.master_capable is False)
436 if self.op.secondary_ip:
437 if not netutils.IP4Address.IsValid(self.op.secondary_ip):
438 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
439 " address" % self.op.secondary_ip,
442 self.lock_all = self.op.auto_promote and self.might_demote
443 self.lock_instances = self.op.secondary_ip is not None
445 def _InstanceFilter(self, instance):
446 """Filter for getting affected instances.
449 return (instance.disk_template in constants.DTS_INT_MIRROR and
450 self.op.node_uuid in instance.all_nodes)
452 def ExpandNames(self):
454 self.needed_locks = {
455 locking.LEVEL_NODE: locking.ALL_SET,
457 # Block allocations when all nodes are locked
458 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
461 self.needed_locks = {
462 locking.LEVEL_NODE: self.op.node_uuid,
465 # Since modifying a node can have severe effects on currently running
466 # operations the resource lock is at least acquired in shared mode
467 self.needed_locks[locking.LEVEL_NODE_RES] = \
468 self.needed_locks[locking.LEVEL_NODE]
470 # Get all locks except nodes in shared mode; they are not used for anything
471 # but read-only access
472 self.share_locks = ShareAll()
473 self.share_locks[locking.LEVEL_NODE] = 0
474 self.share_locks[locking.LEVEL_NODE_RES] = 0
475 self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
477 if self.lock_instances:
478 self.needed_locks[locking.LEVEL_INSTANCE] = \
479 self.cfg.GetInstanceNames(
480 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
482 def BuildHooksEnv(self):
485 This runs on the master node.
489 "OP_TARGET": self.op.node_name,
490 "MASTER_CANDIDATE": str(self.op.master_candidate),
491 "OFFLINE": str(self.op.offline),
492 "DRAINED": str(self.op.drained),
493 "MASTER_CAPABLE": str(self.op.master_capable),
494 "VM_CAPABLE": str(self.op.vm_capable),
497 def BuildHooksNodes(self):
498 """Build hooks nodes.
501 nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
504 def CheckPrereq(self):
505 """Check prerequisites.
507 This only checks the instance list against the existing names.
510 node = self.cfg.GetNodeInfo(self.op.node_uuid)
511 if self.lock_instances:
512 affected_instances = \
513 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
515 # Verify instance locks
516 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
517 wanted_instance_names = frozenset([inst.name for inst in
518 affected_instances.values()])
519 if wanted_instance_names - owned_instance_names:
520 raise errors.OpPrereqError("Instances affected by changing node %s's"
521 " secondary IP address have changed since"
522 " locks were acquired, wanted '%s', have"
523 " '%s'; retry the operation" %
525 utils.CommaJoin(wanted_instance_names),
526 utils.CommaJoin(owned_instance_names)),
529 affected_instances = None
531 if (self.op.master_candidate is not None or
532 self.op.drained is not None or
533 self.op.offline is not None):
534 # we can't change the master's node flags
535 if node.uuid == self.cfg.GetMasterNode():
536 raise errors.OpPrereqError("The master role can be changed"
537 " only via master-failover",
540 if self.op.master_candidate and not node.master_capable:
541 raise errors.OpPrereqError("Node %s is not master capable, cannot make"
542 " it a master candidate" % node.name,
545 if self.op.vm_capable is False:
546 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
548 raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
549 " the vm_capable flag" % node.name,
552 if node.master_candidate and self.might_demote and not self.lock_all:
553 assert not self.op.auto_promote, "auto_promote set but lock_all not"
554 # check if after removing the current node, we're missing master
556 (mc_remaining, mc_should, _) = \
557 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
558 if mc_remaining < mc_should:
559 raise errors.OpPrereqError("Not enough master candidates, please"
560 " pass auto promote option to allow"
561 " promotion (--auto-promote or RAPI"
562 " auto_promote=True)", errors.ECODE_STATE)
564 self.old_flags = old_flags = (node.master_candidate,
565 node.drained, node.offline)
566 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
567 self.old_role = old_role = self._F2R[old_flags]
569 # Check for ineffective changes
570 for attr in self._FLAGS:
571 if getattr(self.op, attr) is False and getattr(node, attr) is False:
572 self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
573 setattr(self.op, attr, None)
575 # Past this point, any flag change to False means a transition
576 # away from the respective state, as only real changes are kept
578 # TODO: We might query the real power state if it supports OOB
579 if SupportsOob(self.cfg, node):
580 if self.op.offline is False and not (node.powered or
581 self.op.powered is True):
582 raise errors.OpPrereqError(("Node %s needs to be turned on before its"
583 " offline status can be reset") %
584 self.op.node_name, errors.ECODE_STATE)
585 elif self.op.powered is not None:
586 raise errors.OpPrereqError(("Unable to change powered state for node %s"
587 " as it does not support out-of-band"
588 " handling") % self.op.node_name,
591 # If we're being deofflined/drained, we'll MC ourself if needed
592 if (self.op.drained is False or self.op.offline is False or
593 (self.op.master_capable and not node.master_capable)):
594 if _DecideSelfPromotion(self):
595 self.op.master_candidate = True
596 self.LogInfo("Auto-promoting node to master candidate")
598 # If we're no longer master capable, we'll demote ourselves from MC
599 if self.op.master_capable is False and node.master_candidate:
600 self.LogInfo("Demoting from master candidate")
601 self.op.master_candidate = False
604 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
605 if self.op.master_candidate:
606 new_role = self._ROLE_CANDIDATE
607 elif self.op.drained:
608 new_role = self._ROLE_DRAINED
609 elif self.op.offline:
610 new_role = self._ROLE_OFFLINE
611 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
612 # False is still in new flags, which means we're un-setting (the
614 new_role = self._ROLE_REGULAR
615 else: # no new flags, nothing, keep old role
618 self.new_role = new_role
620 if old_role == self._ROLE_OFFLINE and new_role != old_role:
621 # Trying to transition out of offline status
622 result = self.rpc.call_version([node.uuid])[node.uuid]
624 raise errors.OpPrereqError("Node %s is being de-offlined but fails"
625 " to report its version: %s" %
626 (node.name, result.fail_msg),
629 self.LogWarning("Transitioning node from offline to online state"
630 " without using re-add. Please make sure the node"
633 # When changing the secondary ip, verify if this is a single-homed to
634 # multi-homed transition or vice versa, and apply the relevant
636 if self.op.secondary_ip:
637 # Ok even without locking, because this can't be changed by any LU
638 master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
639 master_singlehomed = master.secondary_ip == master.primary_ip
640 if master_singlehomed and self.op.secondary_ip != node.primary_ip:
641 if self.op.force and node.uuid == master.uuid:
642 self.LogWarning("Transitioning from single-homed to multi-homed"
643 " cluster; all nodes will require a secondary IP"
646 raise errors.OpPrereqError("Changing the secondary ip on a"
647 " single-homed cluster requires the"
648 " --force option to be passed, and the"
649 " target node to be the master",
651 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
652 if self.op.force and node.uuid == master.uuid:
653 self.LogWarning("Transitioning from multi-homed to single-homed"
654 " cluster; secondary IP addresses will have to be"
657 raise errors.OpPrereqError("Cannot set the secondary IP to be the"
658 " same as the primary IP on a multi-homed"
659 " cluster, unless the --force option is"
660 " passed, and the target node is the"
661 " master", errors.ECODE_INVAL)
663 assert not (set([inst.name for inst in affected_instances.values()]) -
664 self.owned_locks(locking.LEVEL_INSTANCE))
667 if affected_instances:
668 msg = ("Cannot change secondary IP address: offline node has"
669 " instances (%s) configured to use it" %
671 [inst.name for inst in affected_instances.values()]))
672 raise errors.OpPrereqError(msg, errors.ECODE_STATE)
674 # On online nodes, check that no instances are running, and that
675 # the node has the new ip and we can reach it.
676 for instance in affected_instances.values():
677 CheckInstanceState(self, instance, INSTANCE_DOWN,
678 msg="cannot change secondary ip")
680 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
681 if master.uuid != node.uuid:
682 # check reachability from master secondary ip to new secondary ip
683 if not netutils.TcpPing(self.op.secondary_ip,
684 constants.DEFAULT_NODED_PORT,
685 source=master.secondary_ip):
686 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
687 " based ping to node daemon port",
688 errors.ECODE_ENVIRON)
691 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
692 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
693 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
694 "node", "cluster or group")
695 self.new_ndparams = new_ndparams
698 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
699 node.hv_state_static)
701 if self.op.disk_state:
702 self.new_disk_state = \
703 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
705 def Exec(self, feedback_fn):
709 node = self.cfg.GetNodeInfo(self.op.node_uuid)
713 node.ndparams = self.new_ndparams
715 if self.op.powered is not None:
716 node.powered = self.op.powered
719 node.hv_state_static = self.new_hv_state
721 if self.op.disk_state:
722 node.disk_state_static = self.new_disk_state
724 for attr in ["master_capable", "vm_capable"]:
725 val = getattr(self.op, attr)
727 setattr(node, attr, val)
728 result.append((attr, str(val)))
730 if self.new_role != self.old_role:
731 # Tell the node to demote itself, if no longer MC and not offline
732 if self.old_role == self._ROLE_CANDIDATE and \
733 self.new_role != self._ROLE_OFFLINE:
734 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
736 self.LogWarning("Node failed to demote itself: %s", msg)
738 new_flags = self._R2F[self.new_role]
739 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
741 result.append((desc, str(nf)))
742 (node.master_candidate, node.drained, node.offline) = new_flags
744 # we locked all nodes, we adjust the CP before updating this node
746 AdjustCandidatePool(self, [node.uuid])
748 if self.op.secondary_ip:
749 node.secondary_ip = self.op.secondary_ip
750 result.append(("secondary_ip", self.op.secondary_ip))
752 # this will trigger configuration file update, if needed
753 self.cfg.Update(node, feedback_fn)
755 # this will trigger job queue propagation or cleanup if the mc
757 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
758 self.context.ReaddNode(node)
763 class LUNodePowercycle(NoHooksLU):
764 """Powercycles a node.
769 def CheckArguments(self):
770 (self.op.node_uuid, self.op.node_name) = \
771 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
773 if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
774 raise errors.OpPrereqError("The node is the master and the force"
775 " parameter was not set",
778 def ExpandNames(self):
779 """Locking for PowercycleNode.
781 This is a last-resort option and shouldn't block on other
782 jobs. Therefore, we grab no locks.
785 self.needed_locks = {}
787 def Exec(self, feedback_fn):
791 default_hypervisor = self.cfg.GetHypervisorType()
792 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
793 result = self.rpc.call_node_powercycle(self.op.node_uuid,
796 result.Raise("Failed to schedule the reboot")
797 return result.payload
800 def _GetNodeInstancesInner(cfg, fn):
801 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
804 def _GetNodePrimaryInstances(cfg, node_uuid):
805 """Returns primary instances on a node.
808 return _GetNodeInstancesInner(cfg,
809 lambda inst: node_uuid == inst.primary_node)
812 def _GetNodeSecondaryInstances(cfg, node_uuid):
813 """Returns secondary instances on a node.
816 return _GetNodeInstancesInner(cfg,
817 lambda inst: node_uuid in inst.secondary_nodes)
820 def _GetNodeInstances(cfg, node_uuid):
821 """Returns a list of all primary and secondary instances on a node.
825 return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
828 class LUNodeEvacuate(NoHooksLU):
829 """Evacuates instances off a list of nodes.
835 constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
836 constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
837 constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
839 assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
840 assert (frozenset(_MODE2IALLOCATOR.values()) ==
841 constants.IALLOCATOR_NEVAC_MODES)
843 def CheckArguments(self):
844 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
846 def ExpandNames(self):
847 (self.op.node_uuid, self.op.node_name) = \
848 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
850 if self.op.remote_node is not None:
851 (self.op.remote_node_uuid, self.op.remote_node) = \
852 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
854 assert self.op.remote_node
856 if self.op.node_uuid == self.op.remote_node_uuid:
857 raise errors.OpPrereqError("Can not use evacuated node as a new"
858 " secondary node", errors.ECODE_INVAL)
860 if self.op.mode != constants.NODE_EVAC_SEC:
861 raise errors.OpPrereqError("Without the use of an iallocator only"
862 " secondary instances can be evacuated",
866 self.share_locks = ShareAll()
867 self.needed_locks = {
868 locking.LEVEL_INSTANCE: [],
869 locking.LEVEL_NODEGROUP: [],
870 locking.LEVEL_NODE: [],
873 # Determine nodes (via group) optimistically, needs verification once locks
875 self.lock_nodes = self._DetermineNodes()
877 def _DetermineNodes(self):
878 """Gets the list of node UUIDs to operate on.
881 if self.op.remote_node is None:
882 # Iallocator will choose any node(s) in the same group
883 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
885 group_nodes = frozenset([self.op.remote_node_uuid])
887 # Determine nodes to be locked
888 return set([self.op.node_uuid]) | group_nodes
890 def _DetermineInstances(self):
891 """Builds list of instances to operate on.
894 assert self.op.mode in constants.NODE_EVAC_MODES
896 if self.op.mode == constants.NODE_EVAC_PRI:
897 # Primary instances only
898 inst_fn = _GetNodePrimaryInstances
899 assert self.op.remote_node is None, \
900 "Evacuating primary instances requires iallocator"
901 elif self.op.mode == constants.NODE_EVAC_SEC:
902 # Secondary instances only
903 inst_fn = _GetNodeSecondaryInstances
906 assert self.op.mode == constants.NODE_EVAC_ALL
907 inst_fn = _GetNodeInstances
908 # TODO: In 2.6, change the iallocator interface to take an evacuation mode
910 raise errors.OpPrereqError("Due to an issue with the iallocator"
911 " interface it is not possible to evacuate"
912 " all instances at once; specify explicitly"
913 " whether to evacuate primary or secondary"
917 return inst_fn(self.cfg, self.op.node_uuid)
919 def DeclareLocks(self, level):
920 if level == locking.LEVEL_INSTANCE:
921 # Lock instances optimistically, needs verification once node and group
922 # locks have been acquired
923 self.needed_locks[locking.LEVEL_INSTANCE] = \
924 set(i.name for i in self._DetermineInstances())
926 elif level == locking.LEVEL_NODEGROUP:
927 # Lock node groups for all potential target nodes optimistically, needs
928 # verification once nodes have been acquired
929 self.needed_locks[locking.LEVEL_NODEGROUP] = \
930 self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
932 elif level == locking.LEVEL_NODE:
933 self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
935 def CheckPrereq(self):
937 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
938 owned_nodes = self.owned_locks(locking.LEVEL_NODE)
939 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
941 need_nodes = self._DetermineNodes()
943 if not owned_nodes.issuperset(need_nodes):
944 raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
945 " locks were acquired, current nodes are"
946 " are '%s', used to be '%s'; retry the"
949 utils.CommaJoin(need_nodes),
950 utils.CommaJoin(owned_nodes)),
953 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
954 if owned_groups != wanted_groups:
955 raise errors.OpExecError("Node groups changed since locks were acquired,"
956 " current groups are '%s', used to be '%s';"
957 " retry the operation" %
958 (utils.CommaJoin(wanted_groups),
959 utils.CommaJoin(owned_groups)))
961 # Determine affected instances
962 self.instances = self._DetermineInstances()
963 self.instance_names = [i.name for i in self.instances]
965 if set(self.instance_names) != owned_instance_names:
966 raise errors.OpExecError("Instances on node '%s' changed since locks"
967 " were acquired, current instances are '%s',"
968 " used to be '%s'; retry the operation" %
970 utils.CommaJoin(self.instance_names),
971 utils.CommaJoin(owned_instance_names)))
973 if self.instance_names:
974 self.LogInfo("Evacuating instances from node '%s': %s",
976 utils.CommaJoin(utils.NiceSort(self.instance_names)))
978 self.LogInfo("No instances to evacuate from node '%s'",
981 if self.op.remote_node is not None:
982 for i in self.instances:
983 if i.primary_node == self.op.remote_node_uuid:
984 raise errors.OpPrereqError("Node %s is the primary node of"
985 " instance %s, cannot use it as"
987 (self.op.remote_node, i.name),
990 def Exec(self, feedback_fn):
991 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
993 if not self.instance_names:
994 # No instances to evacuate
997 elif self.op.iallocator is not None:
998 # TODO: Implement relocation to other group
999 evac_mode = self._MODE2IALLOCATOR[self.op.mode]
1000 req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
1001 instances=list(self.instance_names))
1002 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1004 ial.Run(self.op.iallocator)
1007 raise errors.OpPrereqError("Can't compute node evacuation using"
1008 " iallocator '%s': %s" %
1009 (self.op.iallocator, ial.info),
1012 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1014 elif self.op.remote_node is not None:
1015 assert self.op.mode == constants.NODE_EVAC_SEC
1017 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1018 remote_node=self.op.remote_node,
1020 mode=constants.REPLACE_DISK_CHG,
1021 early_release=self.op.early_release)]
1022 for instance_name in self.instance_names]
1025 raise errors.ProgrammerError("No iallocator or remote node")
1027 return ResultWithJobs(jobs)
1030 class LUNodeMigrate(LogicalUnit):
1031 """Migrate all instances from a node.
1034 HPATH = "node-migrate"
1035 HTYPE = constants.HTYPE_NODE
1038 def CheckArguments(self):
1041 def ExpandNames(self):
1042 (self.op.node_uuid, self.op.node_name) = \
1043 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1045 self.share_locks = ShareAll()
1046 self.needed_locks = {
1047 locking.LEVEL_NODE: [self.op.node_uuid],
1050 def BuildHooksEnv(self):
1053 This runs on the master, the primary and all the secondaries.
1057 "NODE_NAME": self.op.node_name,
1058 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1061 def BuildHooksNodes(self):
1062 """Build hooks nodes.
1065 nl = [self.cfg.GetMasterNode()]
1068 def CheckPrereq(self):
1071 def Exec(self, feedback_fn):
1072 # Prepare jobs for migration instances
1074 [opcodes.OpInstanceMigrate(
1075 instance_name=inst.name,
1078 iallocator=self.op.iallocator,
1079 target_node=self.op.target_node,
1080 allow_runtime_changes=self.op.allow_runtime_changes,
1081 ignore_ipolicy=self.op.ignore_ipolicy)]
1082 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1084 # TODO: Run iallocator in this opcode and pass correct placement options to
1085 # OpInstanceMigrate. Since other jobs can modify the cluster between
1086 # running the iallocator and the actual migration, a good consistency model
1087 # will have to be found.
1089 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1090 frozenset([self.op.node_uuid]))
1092 return ResultWithJobs(jobs)
1095 def _GetStorageTypeArgs(cfg, storage_type):
1096 """Returns the arguments for a storage type.
1099 # Special case for file storage
1100 if storage_type == constants.ST_FILE:
1101 # storage.FileStorage wants a list of storage directories
1102 return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1107 class LUNodeModifyStorage(NoHooksLU):
1108 """Logical unit for modifying a storage volume on a node.
1113 def CheckArguments(self):
1114 (self.op.node_uuid, self.op.node_name) = \
1115 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1117 storage_type = self.op.storage_type
1120 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1122 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1123 " modified" % storage_type,
1126 diff = set(self.op.changes.keys()) - modifiable
1128 raise errors.OpPrereqError("The following fields can not be modified for"
1129 " storage units of type '%s': %r" %
1130 (storage_type, list(diff)),
1133 def CheckPrereq(self):
1134 """Check prerequisites.
1137 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1139 def ExpandNames(self):
1140 self.needed_locks = {
1141 locking.LEVEL_NODE: self.op.node_uuid,
1144 def Exec(self, feedback_fn):
1145 """Computes the list of nodes and their attributes.
1148 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1149 result = self.rpc.call_storage_modify(self.op.node_uuid,
1150 self.op.storage_type, st_args,
1151 self.op.name, self.op.changes)
1152 result.Raise("Failed to modify storage unit '%s' on %s" %
1153 (self.op.name, self.op.node_name))
1156 class NodeQuery(QueryBase):
1157 FIELDS = query.NODE_FIELDS
1159 def ExpandNames(self, lu):
1160 lu.needed_locks = {}
1161 lu.share_locks = ShareAll()
1164 (self.wanted, _) = GetWantedNodes(lu, self.names)
1166 self.wanted = locking.ALL_SET
1168 self.do_locking = (self.use_locking and
1169 query.NQ_LIVE in self.requested_data)
1172 # If any non-static field is requested we need to lock the nodes
1173 lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1174 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1176 def DeclareLocks(self, lu, level):
1179 def _GetQueryData(self, lu):
1180 """Computes the list of nodes and their attributes.
1183 all_info = lu.cfg.GetAllNodesInfo()
1185 node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1187 # Gather data as requested
1188 if query.NQ_LIVE in self.requested_data:
1189 # filter out non-vm_capable nodes
1190 toquery_node_uuids = [node.uuid for node in all_info.values()
1191 if node.vm_capable and node.uuid in node_uuids]
1192 lvm_enabled = utils.storage.IsLvmEnabled(
1193 lu.cfg.GetClusterInfo().enabled_disk_templates)
1194 # FIXME: this per default asks for storage space information for all
1195 # enabled disk templates. Fix this by making it possible to specify
1196 # space report fields for specific disk templates.
1197 raw_storage_units = utils.storage.GetStorageUnitsOfCluster(
1198 lu.cfg, include_spindles=lvm_enabled)
1199 storage_units = rpc.PrepareStorageUnitsForNodes(
1200 lu.cfg, raw_storage_units, toquery_node_uuids)
1201 default_hypervisor = lu.cfg.GetHypervisorType()
1202 hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1203 hvspecs = [(default_hypervisor, hvparams)]
1204 node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1207 (uuid, rpc.MakeLegacyNodeInfo(nresult.payload,
1208 require_spindles=lvm_enabled))
1209 for (uuid, nresult) in node_data.items()
1210 if not nresult.fail_msg and nresult.payload)
1214 if query.NQ_INST in self.requested_data:
1215 node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1216 node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1218 inst_data = lu.cfg.GetAllInstancesInfo()
1219 inst_uuid_to_inst_name = {}
1221 for inst in inst_data.values():
1222 inst_uuid_to_inst_name[inst.uuid] = inst.name
1223 if inst.primary_node in node_to_primary:
1224 node_to_primary[inst.primary_node].add(inst.uuid)
1225 for secnode in inst.secondary_nodes:
1226 if secnode in node_to_secondary:
1227 node_to_secondary[secnode].add(inst.uuid)
1229 node_to_primary = None
1230 node_to_secondary = None
1231 inst_uuid_to_inst_name = None
1233 if query.NQ_OOB in self.requested_data:
1234 oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1235 for uuid, node in all_info.iteritems())
1239 if query.NQ_GROUP in self.requested_data:
1240 groups = lu.cfg.GetAllNodeGroupsInfo()
1244 return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1245 live_data, lu.cfg.GetMasterNode(),
1246 node_to_primary, node_to_secondary,
1247 inst_uuid_to_inst_name, groups, oob_support,
1248 lu.cfg.GetClusterInfo())
1251 class LUNodeQuery(NoHooksLU):
1252 """Logical unit for querying nodes.
1255 # pylint: disable=W0142
1258 def CheckArguments(self):
1259 self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1260 self.op.output_fields, self.op.use_locking)
1262 def ExpandNames(self):
1263 self.nq.ExpandNames(self)
1265 def DeclareLocks(self, level):
1266 self.nq.DeclareLocks(self, level)
1268 def Exec(self, feedback_fn):
1269 return self.nq.OldStyleQuery(self)
1272 def _CheckOutputFields(static, dynamic, selected):
1273 """Checks whether all selected fields are valid.
1275 @type static: L{utils.FieldSet}
1276 @param static: static fields set
1277 @type dynamic: L{utils.FieldSet}
1278 @param dynamic: dynamic fields set
1281 f = utils.FieldSet()
1285 delta = f.NonMatching(selected)
1287 raise errors.OpPrereqError("Unknown output fields selected: %s"
1288 % ",".join(delta), errors.ECODE_INVAL)
1291 class LUNodeQueryvols(NoHooksLU):
1292 """Logical unit for getting volumes on node(s).
1296 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1297 _FIELDS_STATIC = utils.FieldSet("node")
1299 def CheckArguments(self):
1300 _CheckOutputFields(static=self._FIELDS_STATIC,
1301 dynamic=self._FIELDS_DYNAMIC,
1302 selected=self.op.output_fields)
1304 def ExpandNames(self):
1305 self.share_locks = ShareAll()
1308 self.needed_locks = {
1309 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1312 self.needed_locks = {
1313 locking.LEVEL_NODE: locking.ALL_SET,
1314 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1317 def Exec(self, feedback_fn):
1318 """Computes the list of nodes and their attributes.
1321 node_uuids = self.owned_locks(locking.LEVEL_NODE)
1322 volumes = self.rpc.call_node_volumes(node_uuids)
1324 ilist = self.cfg.GetAllInstancesInfo()
1325 vol2inst = MapInstanceLvsToNodes(ilist.values())
1328 for node_uuid in node_uuids:
1329 nresult = volumes[node_uuid]
1332 msg = nresult.fail_msg
1334 self.LogWarning("Can't compute volume data on node %s: %s",
1335 self.cfg.GetNodeName(node_uuid), msg)
1338 node_vols = sorted(nresult.payload,
1339 key=operator.itemgetter("dev"))
1341 for vol in node_vols:
1343 for field in self.op.output_fields:
1345 val = self.cfg.GetNodeName(node_uuid)
1346 elif field == "phys":
1350 elif field == "name":
1352 elif field == "size":
1353 val = int(float(vol["size"]))
1354 elif field == "instance":
1355 inst = vol2inst.get((node_uuid, vol["vg"] + "/" + vol["name"]),
1357 if inst is not None:
1362 raise errors.ParameterError(field)
1363 node_output.append(str(val))
1365 output.append(node_output)
1370 class LUNodeQueryStorage(NoHooksLU):
1371 """Logical unit for getting information on storage units on node(s).
1374 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1377 def CheckArguments(self):
1378 _CheckOutputFields(static=self._FIELDS_STATIC,
1379 dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1380 selected=self.op.output_fields)
1382 def ExpandNames(self):
1383 self.share_locks = ShareAll()
1386 self.needed_locks = {
1387 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1390 self.needed_locks = {
1391 locking.LEVEL_NODE: locking.ALL_SET,
1392 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1395 def CheckPrereq(self):
1396 """Check prerequisites.
1399 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1401 def Exec(self, feedback_fn):
1402 """Computes the list of nodes and their attributes.
1405 self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1407 # Always get name to sort by
1408 if constants.SF_NAME in self.op.output_fields:
1409 fields = self.op.output_fields[:]
1411 fields = [constants.SF_NAME] + self.op.output_fields
1413 # Never ask for node or type as it's only known to the LU
1414 for extra in [constants.SF_NODE, constants.SF_TYPE]:
1415 while extra in fields:
1416 fields.remove(extra)
1418 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1419 name_idx = field_idx[constants.SF_NAME]
1421 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1422 data = self.rpc.call_storage_list(self.node_uuids,
1423 self.op.storage_type, st_args,
1424 self.op.name, fields)
1428 for node_uuid in utils.NiceSort(self.node_uuids):
1429 node_name = self.cfg.GetNodeName(node_uuid)
1430 nresult = data[node_uuid]
1434 msg = nresult.fail_msg
1436 self.LogWarning("Can't get storage data from node %s: %s",
1440 rows = dict([(row[name_idx], row) for row in nresult.payload])
1442 for name in utils.NiceSort(rows.keys()):
1447 for field in self.op.output_fields:
1448 if field == constants.SF_NODE:
1450 elif field == constants.SF_TYPE:
1451 val = self.op.storage_type
1452 elif field in field_idx:
1453 val = row[field_idx[field]]
1455 raise errors.ParameterError(field)
1464 class LUNodeRemove(LogicalUnit):
1465 """Logical unit for removing a node.
1468 HPATH = "node-remove"
1469 HTYPE = constants.HTYPE_NODE
1471 def BuildHooksEnv(self):
1476 "OP_TARGET": self.op.node_name,
1477 "NODE_NAME": self.op.node_name,
1480 def BuildHooksNodes(self):
1481 """Build hooks nodes.
1483 This doesn't run on the target node in the pre phase as a failed
1484 node would then be impossible to remove.
1487 all_nodes = self.cfg.GetNodeList()
1489 all_nodes.remove(self.op.node_uuid)
1492 return (all_nodes, all_nodes)
1494 def CheckPrereq(self):
1495 """Check prerequisites.
1498 - the node exists in the configuration
1499 - it does not have primary or secondary instances
1500 - it's not the master
1502 Any errors are signaled by raising errors.OpPrereqError.
1505 (self.op.node_uuid, self.op.node_name) = \
1506 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1507 node = self.cfg.GetNodeInfo(self.op.node_uuid)
1508 assert node is not None
1510 masternode = self.cfg.GetMasterNode()
1511 if node.uuid == masternode:
1512 raise errors.OpPrereqError("Node is the master node, failover to another"
1513 " node is required", errors.ECODE_INVAL)
1515 for _, instance in self.cfg.GetAllInstancesInfo().items():
1516 if node.uuid in instance.all_nodes:
1517 raise errors.OpPrereqError("Instance %s is still running on the node,"
1518 " please remove first" % instance.name,
1520 self.op.node_name = node.name
1523 def Exec(self, feedback_fn):
1524 """Removes the node from the cluster.
1527 logging.info("Stopping the node daemon and removing configs from node %s",
1530 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1532 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1535 # Promote nodes to master candidate as needed
1536 AdjustCandidatePool(self, exceptions=[self.node.uuid])
1537 self.context.RemoveNode(self.node)
1539 # Run post hooks on the node before it's removed
1540 RunPostHook(self, self.node.name)
1542 # we have to call this by name rather than by UUID, as the node is no longer
1544 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1545 msg = result.fail_msg
1547 self.LogWarning("Errors encountered on the remote node while leaving"
1548 " the cluster: %s", msg)
1550 # Remove node from our /etc/hosts
1551 if self.cfg.GetClusterInfo().modify_etc_hosts:
1552 master_node_uuid = self.cfg.GetMasterNode()
1553 result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1554 constants.ETC_HOSTS_REMOVE,
1555 self.node.name, None)
1556 result.Raise("Can't update hosts file with new host data")
1557 RedistributeAncillaryFiles(self)
1560 class LURepairNodeStorage(NoHooksLU):
1561 """Repairs the volume group on a node.
1566 def CheckArguments(self):
1567 (self.op.node_uuid, self.op.node_name) = \
1568 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1570 storage_type = self.op.storage_type
1572 if (constants.SO_FIX_CONSISTENCY not in
1573 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1574 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1575 " repaired" % storage_type,
1578 def ExpandNames(self):
1579 self.needed_locks = {
1580 locking.LEVEL_NODE: [self.op.node_uuid],
1583 def _CheckFaultyDisks(self, instance, node_uuid):
1584 """Ensure faulty disks abort the opcode or at least warn."""
1586 if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1588 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1591 self.cfg.GetNodeName(node_uuid)),
1593 except errors.OpPrereqError, err:
1594 if self.op.ignore_consistency:
1595 self.LogWarning(str(err.args[0]))
1599 def CheckPrereq(self):
1600 """Check prerequisites.
1603 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1605 # Check whether any instance on this node has faulty disks
1606 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1607 if not inst.disks_active:
1609 check_nodes = set(inst.all_nodes)
1610 check_nodes.discard(self.op.node_uuid)
1611 for inst_node_uuid in check_nodes:
1612 self._CheckFaultyDisks(inst, inst_node_uuid)
1614 def Exec(self, feedback_fn):
1615 feedback_fn("Repairing storage unit '%s' on %s ..." %
1616 (self.op.name, self.op.node_name))
1618 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1619 result = self.rpc.call_storage_execute(self.op.node_uuid,
1620 self.op.storage_type, st_args,
1622 constants.SO_FIX_CONSISTENCY)
1623 result.Raise("Failed to repair storage unit '%s' on %s" %
1624 (self.op.name, self.op.node_name))