4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with nodes."""
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti import netutils
31 from ganeti import objects
32 from ganeti import opcodes
33 from ganeti import qlang
34 from ganeti import query
35 from ganeti import rpc
36 from ganeti import utils
37 from ganeti.masterd import iallocator
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
41 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42 MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43 IsExclusiveStorageEnabledNode, CheckNodePVs, \
44 RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45 CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46 AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47 GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48 FindFaultyInstanceDisks, CheckStorageTypeEnabled
51 def _DecideSelfPromotion(lu, exceptions=None):
52 """Decide whether I should promote myself as a master candidate.
55 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57 # the new node will increase mc_max with one, so:
58 mc_should = min(mc_should + 1, cp_size)
59 return mc_now < mc_should
62 def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63 """Ensure that a node has the given secondary ip.
65 @type lu: L{LogicalUnit}
66 @param lu: the LU on behalf of which we make the check
67 @type node: L{objects.Node}
68 @param node: the node to check
69 @type secondary_ip: string
70 @param secondary_ip: the ip to check
72 @param prereq: whether to throw a prerequisite or an execute error
73 @raise errors.OpPrereqError: if the node doesn't have the ip,
75 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
78 # this can be called with a new node, which has no UUID yet, so perform the
79 # RPC call using its name
80 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81 result.Raise("Failure checking secondary ip on node %s" % node.name,
82 prereq=prereq, ecode=errors.ECODE_ENVIRON)
83 if not result.payload:
84 msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85 " please fix and re-run this command" % secondary_ip)
87 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
89 raise errors.OpExecError(msg)
92 class LUNodeAdd(LogicalUnit):
93 """Logical unit for adding node to the cluster.
97 HTYPE = constants.HTYPE_NODE
98 _NFLAGS = ["master_capable", "vm_capable"]
100 def CheckArguments(self):
101 self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102 # validate/normalize the node name
103 self.hostname = netutils.GetHostname(name=self.op.node_name,
104 family=self.primary_ip_family)
105 self.op.node_name = self.hostname.name
107 if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108 raise errors.OpPrereqError("Cannot readd the master node",
111 if self.op.readd and self.op.group:
112 raise errors.OpPrereqError("Cannot pass a node group when a node is"
113 " being readded", errors.ECODE_INVAL)
115 def BuildHooksEnv(self):
118 This will run on all nodes before, and on all nodes + the new node after.
122 "OP_TARGET": self.op.node_name,
123 "NODE_NAME": self.op.node_name,
124 "NODE_PIP": self.op.primary_ip,
125 "NODE_SIP": self.op.secondary_ip,
126 "MASTER_CAPABLE": str(self.op.master_capable),
127 "VM_CAPABLE": str(self.op.vm_capable),
130 def BuildHooksNodes(self):
131 """Build hooks nodes.
134 hook_nodes = self.cfg.GetNodeList()
135 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
136 if new_node_info is not None:
138 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
140 # add the new node as post hook node by name; it does not have an UUID yet
141 return (hook_nodes, hook_nodes, [self.op.node_name, ])
143 def CheckPrereq(self):
144 """Check prerequisites.
147 - the new node is not already in the config
149 - its parameters (single/dual homed) matches the cluster
151 Any errors are signaled by raising errors.OpPrereqError.
154 node_name = self.hostname.name
155 self.op.primary_ip = self.hostname.ip
156 if self.op.secondary_ip is None:
157 if self.primary_ip_family == netutils.IP6Address.family:
158 raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
159 " IPv4 address must be given as secondary",
161 self.op.secondary_ip = self.op.primary_ip
163 secondary_ip = self.op.secondary_ip
164 if not netutils.IP4Address.IsValid(secondary_ip):
165 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
166 " address" % secondary_ip, errors.ECODE_INVAL)
168 existing_node_info = self.cfg.GetNodeInfoByName(node_name)
169 if not self.op.readd and existing_node_info is not None:
170 raise errors.OpPrereqError("Node %s is already in the configuration" %
171 node_name, errors.ECODE_EXISTS)
172 elif self.op.readd and existing_node_info is None:
173 raise errors.OpPrereqError("Node %s is not in the configuration" %
174 node_name, errors.ECODE_NOENT)
176 self.changed_primary_ip = False
178 for existing_node in self.cfg.GetAllNodesInfo().values():
179 if self.op.readd and node_name == existing_node.name:
180 if existing_node.secondary_ip != secondary_ip:
181 raise errors.OpPrereqError("Readded node doesn't have the same IP"
182 " address configuration as before",
184 if existing_node.primary_ip != self.op.primary_ip:
185 self.changed_primary_ip = True
189 if (existing_node.primary_ip == self.op.primary_ip or
190 existing_node.secondary_ip == self.op.primary_ip or
191 existing_node.primary_ip == secondary_ip or
192 existing_node.secondary_ip == secondary_ip):
193 raise errors.OpPrereqError("New node ip address(es) conflict with"
194 " existing node %s" % existing_node.name,
195 errors.ECODE_NOTUNIQUE)
197 # After this 'if' block, None is no longer a valid value for the
198 # _capable op attributes
200 assert existing_node_info is not None, \
201 "Can't retrieve locked node %s" % node_name
202 for attr in self._NFLAGS:
203 if getattr(self.op, attr) is None:
204 setattr(self.op, attr, getattr(existing_node_info, attr))
206 for attr in self._NFLAGS:
207 if getattr(self.op, attr) is None:
208 setattr(self.op, attr, True)
210 if self.op.readd and not self.op.vm_capable:
211 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
213 raise errors.OpPrereqError("Node %s being re-added with vm_capable"
214 " flag set to false, but it already holds"
215 " instances" % node_name,
218 # check that the type of the node (single versus dual homed) is the
219 # same as for the master
220 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
221 master_singlehomed = myself.secondary_ip == myself.primary_ip
222 newbie_singlehomed = secondary_ip == self.op.primary_ip
223 if master_singlehomed != newbie_singlehomed:
224 if master_singlehomed:
225 raise errors.OpPrereqError("The master has no secondary ip but the"
229 raise errors.OpPrereqError("The master has a secondary ip but the"
230 " new node doesn't have one",
233 # checks reachability
234 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
235 raise errors.OpPrereqError("Node not reachable by ping",
236 errors.ECODE_ENVIRON)
238 if not newbie_singlehomed:
239 # check reachability from my secondary ip to newbie's secondary ip
240 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
241 source=myself.secondary_ip):
242 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
243 " based ping to node daemon port",
244 errors.ECODE_ENVIRON)
247 exceptions = [existing_node_info.uuid]
251 if self.op.master_capable:
252 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
254 self.master_candidate = False
257 self.new_node = existing_node_info
259 node_group = self.cfg.LookupNodeGroup(self.op.group)
260 self.new_node = objects.Node(name=node_name,
261 primary_ip=self.op.primary_ip,
262 secondary_ip=secondary_ip,
263 master_candidate=self.master_candidate,
264 offline=False, drained=False,
265 group=node_group, ndparams={})
268 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
269 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
270 "node", "cluster or group")
273 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
275 if self.op.disk_state:
276 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
278 # TODO: If we need to have multiple DnsOnlyRunner we probably should make
279 # it a property on the base class.
280 rpcrunner = rpc.DnsOnlyRunner()
281 result = rpcrunner.call_version([node_name])[node_name]
282 result.Raise("Can't get version information from node %s" % node_name)
283 if constants.PROTOCOL_VERSION == result.payload:
284 logging.info("Communication to node %s fine, sw version %s match",
285 node_name, result.payload)
287 raise errors.OpPrereqError("Version mismatch master version %s,"
289 (constants.PROTOCOL_VERSION, result.payload),
290 errors.ECODE_ENVIRON)
292 vg_name = self.cfg.GetVGName()
293 if vg_name is not None:
294 vparams = {constants.NV_PVLIST: [vg_name]}
295 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
296 cname = self.cfg.GetClusterName()
297 result = rpcrunner.call_node_verify_light(
298 [node_name], vparams, cname,
299 self.cfg.GetClusterInfo().hvparams)[node_name]
300 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
302 raise errors.OpPrereqError("Checks on node PVs failed: %s" %
303 "; ".join(errmsgs), errors.ECODE_ENVIRON)
305 def Exec(self, feedback_fn):
306 """Adds the new node to the cluster.
309 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
312 # We adding a new node so we assume it's powered
313 self.new_node.powered = True
315 # for re-adds, reset the offline/drained/master-candidate flags;
316 # we need to reset here, otherwise offline would prevent RPC calls
317 # later in the procedure; this also means that if the re-add
318 # fails, we are left with a non-offlined, broken node
320 self.new_node.offline = False
321 self.new_node.drained = False
322 self.LogInfo("Readding a node, the offline/drained flags were reset")
323 # if we demote the node, we do cleanup later in the procedure
324 self.new_node.master_candidate = self.master_candidate
325 if self.changed_primary_ip:
326 self.new_node.primary_ip = self.op.primary_ip
328 # copy the master/vm_capable flags
329 for attr in self._NFLAGS:
330 setattr(self.new_node, attr, getattr(self.op, attr))
332 # notify the user about any possible mc promotion
333 if self.new_node.master_candidate:
334 self.LogInfo("Node will be a master candidate")
337 self.new_node.ndparams = self.op.ndparams
339 self.new_node.ndparams = {}
342 self.new_node.hv_state_static = self.new_hv_state
344 if self.op.disk_state:
345 self.new_node.disk_state_static = self.new_disk_state
347 # Add node to our /etc/hosts, and add key to known_hosts
348 if self.cfg.GetClusterInfo().modify_etc_hosts:
349 master_node = self.cfg.GetMasterNode()
350 result = self.rpc.call_etc_hosts_modify(
351 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
353 result.Raise("Can't update hosts file with new host data")
355 if self.new_node.secondary_ip != self.new_node.primary_ip:
356 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
359 node_verifier_uuids = [self.cfg.GetMasterNode()]
360 node_verify_param = {
361 constants.NV_NODELIST: ([self.new_node.name], {}),
362 # TODO: do a node-net-test as well?
365 result = self.rpc.call_node_verify(
366 node_verifier_uuids, node_verify_param,
367 self.cfg.GetClusterName(),
368 self.cfg.GetClusterInfo().hvparams)
369 for verifier in node_verifier_uuids:
370 result[verifier].Raise("Cannot communicate with node %s" % verifier)
371 nl_payload = result[verifier].payload[constants.NV_NODELIST]
373 for failed in nl_payload:
374 feedback_fn("ssh/hostname verification failed"
375 " (checking from %s): %s" %
376 (verifier, nl_payload[failed]))
377 raise errors.OpExecError("ssh/hostname verification failed")
380 self.context.ReaddNode(self.new_node)
381 RedistributeAncillaryFiles(self)
382 # make sure we redistribute the config
383 self.cfg.Update(self.new_node, feedback_fn)
384 # and make sure the new node will not have old files around
385 if not self.new_node.master_candidate:
386 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
387 result.Warn("Node failed to demote itself from master candidate status",
390 self.context.AddNode(self.new_node, self.proc.GetECId())
391 RedistributeAncillaryFiles(self)
394 class LUNodeSetParams(LogicalUnit):
395 """Modifies the parameters of a node.
397 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
398 to the node role (as _ROLE_*)
399 @cvar _R2F: a dictionary from node role to tuples of flags
400 @cvar _FLAGS: a list of attribute names corresponding to the flags
403 HPATH = "node-modify"
404 HTYPE = constants.HTYPE_NODE
406 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
408 (True, False, False): _ROLE_CANDIDATE,
409 (False, True, False): _ROLE_DRAINED,
410 (False, False, True): _ROLE_OFFLINE,
411 (False, False, False): _ROLE_REGULAR,
413 _R2F = dict((v, k) for k, v in _F2R.items())
414 _FLAGS = ["master_candidate", "drained", "offline"]
416 def CheckArguments(self):
417 (self.op.node_uuid, self.op.node_name) = \
418 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
419 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
420 self.op.master_capable, self.op.vm_capable,
421 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
423 if all_mods.count(None) == len(all_mods):
424 raise errors.OpPrereqError("Please pass at least one modification",
426 if all_mods.count(True) > 1:
427 raise errors.OpPrereqError("Can't set the node into more than one"
428 " state at the same time",
431 # Boolean value that tells us whether we might be demoting from MC
432 self.might_demote = (self.op.master_candidate is False or
433 self.op.offline is True or
434 self.op.drained is True or
435 self.op.master_capable is False)
437 if self.op.secondary_ip:
438 if not netutils.IP4Address.IsValid(self.op.secondary_ip):
439 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
440 " address" % self.op.secondary_ip,
443 self.lock_all = self.op.auto_promote and self.might_demote
444 self.lock_instances = self.op.secondary_ip is not None
446 def _InstanceFilter(self, instance):
447 """Filter for getting affected instances.
450 return (instance.disk_template in constants.DTS_INT_MIRROR and
451 self.op.node_uuid in instance.all_nodes)
453 def ExpandNames(self):
455 self.needed_locks = {
456 locking.LEVEL_NODE: locking.ALL_SET,
458 # Block allocations when all nodes are locked
459 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
462 self.needed_locks = {
463 locking.LEVEL_NODE: self.op.node_uuid,
466 # Since modifying a node can have severe effects on currently running
467 # operations the resource lock is at least acquired in shared mode
468 self.needed_locks[locking.LEVEL_NODE_RES] = \
469 self.needed_locks[locking.LEVEL_NODE]
471 # Get all locks except nodes in shared mode; they are not used for anything
472 # but read-only access
473 self.share_locks = ShareAll()
474 self.share_locks[locking.LEVEL_NODE] = 0
475 self.share_locks[locking.LEVEL_NODE_RES] = 0
476 self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
478 if self.lock_instances:
479 self.needed_locks[locking.LEVEL_INSTANCE] = \
480 self.cfg.GetInstanceNames(
481 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
483 def BuildHooksEnv(self):
486 This runs on the master node.
490 "OP_TARGET": self.op.node_name,
491 "MASTER_CANDIDATE": str(self.op.master_candidate),
492 "OFFLINE": str(self.op.offline),
493 "DRAINED": str(self.op.drained),
494 "MASTER_CAPABLE": str(self.op.master_capable),
495 "VM_CAPABLE": str(self.op.vm_capable),
498 def BuildHooksNodes(self):
499 """Build hooks nodes.
502 nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
505 def CheckPrereq(self):
506 """Check prerequisites.
508 This only checks the instance list against the existing names.
511 node = self.cfg.GetNodeInfo(self.op.node_uuid)
512 if self.lock_instances:
513 affected_instances = \
514 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
516 # Verify instance locks
517 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
518 wanted_instance_names = frozenset([inst.name for inst in
519 affected_instances.values()])
520 if wanted_instance_names - owned_instance_names:
521 raise errors.OpPrereqError("Instances affected by changing node %s's"
522 " secondary IP address have changed since"
523 " locks were acquired, wanted '%s', have"
524 " '%s'; retry the operation" %
526 utils.CommaJoin(wanted_instance_names),
527 utils.CommaJoin(owned_instance_names)),
530 affected_instances = None
532 if (self.op.master_candidate is not None or
533 self.op.drained is not None or
534 self.op.offline is not None):
535 # we can't change the master's node flags
536 if node.uuid == self.cfg.GetMasterNode():
537 raise errors.OpPrereqError("The master role can be changed"
538 " only via master-failover",
541 if self.op.master_candidate and not node.master_capable:
542 raise errors.OpPrereqError("Node %s is not master capable, cannot make"
543 " it a master candidate" % node.name,
546 if self.op.vm_capable is False:
547 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
549 raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
550 " the vm_capable flag" % node.name,
553 if node.master_candidate and self.might_demote and not self.lock_all:
554 assert not self.op.auto_promote, "auto_promote set but lock_all not"
555 # check if after removing the current node, we're missing master
557 (mc_remaining, mc_should, _) = \
558 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
559 if mc_remaining < mc_should:
560 raise errors.OpPrereqError("Not enough master candidates, please"
561 " pass auto promote option to allow"
562 " promotion (--auto-promote or RAPI"
563 " auto_promote=True)", errors.ECODE_STATE)
565 self.old_flags = old_flags = (node.master_candidate,
566 node.drained, node.offline)
567 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
568 self.old_role = old_role = self._F2R[old_flags]
570 # Check for ineffective changes
571 for attr in self._FLAGS:
572 if getattr(self.op, attr) is False and getattr(node, attr) is False:
573 self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
574 setattr(self.op, attr, None)
576 # Past this point, any flag change to False means a transition
577 # away from the respective state, as only real changes are kept
579 # TODO: We might query the real power state if it supports OOB
580 if SupportsOob(self.cfg, node):
581 if self.op.offline is False and not (node.powered or
582 self.op.powered is True):
583 raise errors.OpPrereqError(("Node %s needs to be turned on before its"
584 " offline status can be reset") %
585 self.op.node_name, errors.ECODE_STATE)
586 elif self.op.powered is not None:
587 raise errors.OpPrereqError(("Unable to change powered state for node %s"
588 " as it does not support out-of-band"
589 " handling") % self.op.node_name,
592 # If we're being deofflined/drained, we'll MC ourself if needed
593 if (self.op.drained is False or self.op.offline is False or
594 (self.op.master_capable and not node.master_capable)):
595 if _DecideSelfPromotion(self):
596 self.op.master_candidate = True
597 self.LogInfo("Auto-promoting node to master candidate")
599 # If we're no longer master capable, we'll demote ourselves from MC
600 if self.op.master_capable is False and node.master_candidate:
601 self.LogInfo("Demoting from master candidate")
602 self.op.master_candidate = False
605 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
606 if self.op.master_candidate:
607 new_role = self._ROLE_CANDIDATE
608 elif self.op.drained:
609 new_role = self._ROLE_DRAINED
610 elif self.op.offline:
611 new_role = self._ROLE_OFFLINE
612 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
613 # False is still in new flags, which means we're un-setting (the
615 new_role = self._ROLE_REGULAR
616 else: # no new flags, nothing, keep old role
619 self.new_role = new_role
621 if old_role == self._ROLE_OFFLINE and new_role != old_role:
622 # Trying to transition out of offline status
623 result = self.rpc.call_version([node.uuid])[node.uuid]
625 raise errors.OpPrereqError("Node %s is being de-offlined but fails"
626 " to report its version: %s" %
627 (node.name, result.fail_msg),
630 self.LogWarning("Transitioning node from offline to online state"
631 " without using re-add. Please make sure the node"
634 # When changing the secondary ip, verify if this is a single-homed to
635 # multi-homed transition or vice versa, and apply the relevant
637 if self.op.secondary_ip:
638 # Ok even without locking, because this can't be changed by any LU
639 master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
640 master_singlehomed = master.secondary_ip == master.primary_ip
641 if master_singlehomed and self.op.secondary_ip != node.primary_ip:
642 if self.op.force and node.uuid == master.uuid:
643 self.LogWarning("Transitioning from single-homed to multi-homed"
644 " cluster; all nodes will require a secondary IP"
647 raise errors.OpPrereqError("Changing the secondary ip on a"
648 " single-homed cluster requires the"
649 " --force option to be passed, and the"
650 " target node to be the master",
652 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
653 if self.op.force and node.uuid == master.uuid:
654 self.LogWarning("Transitioning from multi-homed to single-homed"
655 " cluster; secondary IP addresses will have to be"
658 raise errors.OpPrereqError("Cannot set the secondary IP to be the"
659 " same as the primary IP on a multi-homed"
660 " cluster, unless the --force option is"
661 " passed, and the target node is the"
662 " master", errors.ECODE_INVAL)
664 assert not (set([inst.name for inst in affected_instances.values()]) -
665 self.owned_locks(locking.LEVEL_INSTANCE))
668 if affected_instances:
669 msg = ("Cannot change secondary IP address: offline node has"
670 " instances (%s) configured to use it" %
672 [inst.name for inst in affected_instances.values()]))
673 raise errors.OpPrereqError(msg, errors.ECODE_STATE)
675 # On online nodes, check that no instances are running, and that
676 # the node has the new ip and we can reach it.
677 for instance in affected_instances.values():
678 CheckInstanceState(self, instance, INSTANCE_DOWN,
679 msg="cannot change secondary ip")
681 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
682 if master.uuid != node.uuid:
683 # check reachability from master secondary ip to new secondary ip
684 if not netutils.TcpPing(self.op.secondary_ip,
685 constants.DEFAULT_NODED_PORT,
686 source=master.secondary_ip):
687 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
688 " based ping to node daemon port",
689 errors.ECODE_ENVIRON)
692 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
693 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
694 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
695 "node", "cluster or group")
696 self.new_ndparams = new_ndparams
699 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
700 node.hv_state_static)
702 if self.op.disk_state:
703 self.new_disk_state = \
704 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
706 def Exec(self, feedback_fn):
710 node = self.cfg.GetNodeInfo(self.op.node_uuid)
714 node.ndparams = self.new_ndparams
716 if self.op.powered is not None:
717 node.powered = self.op.powered
720 node.hv_state_static = self.new_hv_state
722 if self.op.disk_state:
723 node.disk_state_static = self.new_disk_state
725 for attr in ["master_capable", "vm_capable"]:
726 val = getattr(self.op, attr)
728 setattr(node, attr, val)
729 result.append((attr, str(val)))
731 if self.new_role != self.old_role:
732 # Tell the node to demote itself, if no longer MC and not offline
733 if self.old_role == self._ROLE_CANDIDATE and \
734 self.new_role != self._ROLE_OFFLINE:
735 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
737 self.LogWarning("Node failed to demote itself: %s", msg)
739 new_flags = self._R2F[self.new_role]
740 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
742 result.append((desc, str(nf)))
743 (node.master_candidate, node.drained, node.offline) = new_flags
745 # we locked all nodes, we adjust the CP before updating this node
747 AdjustCandidatePool(self, [node.uuid])
749 if self.op.secondary_ip:
750 node.secondary_ip = self.op.secondary_ip
751 result.append(("secondary_ip", self.op.secondary_ip))
753 # this will trigger configuration file update, if needed
754 self.cfg.Update(node, feedback_fn)
756 # this will trigger job queue propagation or cleanup if the mc
758 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
759 self.context.ReaddNode(node)
764 class LUNodePowercycle(NoHooksLU):
765 """Powercycles a node.
770 def CheckArguments(self):
771 (self.op.node_uuid, self.op.node_name) = \
772 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
774 if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
775 raise errors.OpPrereqError("The node is the master and the force"
776 " parameter was not set",
779 def ExpandNames(self):
780 """Locking for PowercycleNode.
782 This is a last-resort option and shouldn't block on other
783 jobs. Therefore, we grab no locks.
786 self.needed_locks = {}
788 def Exec(self, feedback_fn):
792 default_hypervisor = self.cfg.GetHypervisorType()
793 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
794 result = self.rpc.call_node_powercycle(self.op.node_uuid,
797 result.Raise("Failed to schedule the reboot")
798 return result.payload
801 def _GetNodeInstancesInner(cfg, fn):
802 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
805 def _GetNodePrimaryInstances(cfg, node_uuid):
806 """Returns primary instances on a node.
809 return _GetNodeInstancesInner(cfg,
810 lambda inst: node_uuid == inst.primary_node)
813 def _GetNodeSecondaryInstances(cfg, node_uuid):
814 """Returns secondary instances on a node.
817 return _GetNodeInstancesInner(cfg,
818 lambda inst: node_uuid in inst.secondary_nodes)
821 def _GetNodeInstances(cfg, node_uuid):
822 """Returns a list of all primary and secondary instances on a node.
826 return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
829 class LUNodeEvacuate(NoHooksLU):
830 """Evacuates instances off a list of nodes.
836 constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
837 constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
838 constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
840 assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
841 assert (frozenset(_MODE2IALLOCATOR.values()) ==
842 constants.IALLOCATOR_NEVAC_MODES)
844 def CheckArguments(self):
845 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
847 def ExpandNames(self):
848 (self.op.node_uuid, self.op.node_name) = \
849 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
851 if self.op.remote_node is not None:
852 (self.op.remote_node_uuid, self.op.remote_node) = \
853 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
855 assert self.op.remote_node
857 if self.op.node_uuid == self.op.remote_node_uuid:
858 raise errors.OpPrereqError("Can not use evacuated node as a new"
859 " secondary node", errors.ECODE_INVAL)
861 if self.op.mode != constants.NODE_EVAC_SEC:
862 raise errors.OpPrereqError("Without the use of an iallocator only"
863 " secondary instances can be evacuated",
867 self.share_locks = ShareAll()
868 self.needed_locks = {
869 locking.LEVEL_INSTANCE: [],
870 locking.LEVEL_NODEGROUP: [],
871 locking.LEVEL_NODE: [],
874 # Determine nodes (via group) optimistically, needs verification once locks
876 self.lock_nodes = self._DetermineNodes()
878 def _DetermineNodes(self):
879 """Gets the list of node UUIDs to operate on.
882 if self.op.remote_node is None:
883 # Iallocator will choose any node(s) in the same group
884 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
886 group_nodes = frozenset([self.op.remote_node_uuid])
888 # Determine nodes to be locked
889 return set([self.op.node_uuid]) | group_nodes
891 def _DetermineInstances(self):
892 """Builds list of instances to operate on.
895 assert self.op.mode in constants.NODE_EVAC_MODES
897 if self.op.mode == constants.NODE_EVAC_PRI:
898 # Primary instances only
899 inst_fn = _GetNodePrimaryInstances
900 assert self.op.remote_node is None, \
901 "Evacuating primary instances requires iallocator"
902 elif self.op.mode == constants.NODE_EVAC_SEC:
903 # Secondary instances only
904 inst_fn = _GetNodeSecondaryInstances
907 assert self.op.mode == constants.NODE_EVAC_ALL
908 inst_fn = _GetNodeInstances
909 # TODO: In 2.6, change the iallocator interface to take an evacuation mode
911 raise errors.OpPrereqError("Due to an issue with the iallocator"
912 " interface it is not possible to evacuate"
913 " all instances at once; specify explicitly"
914 " whether to evacuate primary or secondary"
918 return inst_fn(self.cfg, self.op.node_uuid)
920 def DeclareLocks(self, level):
921 if level == locking.LEVEL_INSTANCE:
922 # Lock instances optimistically, needs verification once node and group
923 # locks have been acquired
924 self.needed_locks[locking.LEVEL_INSTANCE] = \
925 set(i.name for i in self._DetermineInstances())
927 elif level == locking.LEVEL_NODEGROUP:
928 # Lock node groups for all potential target nodes optimistically, needs
929 # verification once nodes have been acquired
930 self.needed_locks[locking.LEVEL_NODEGROUP] = \
931 self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
933 elif level == locking.LEVEL_NODE:
934 self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
936 def CheckPrereq(self):
938 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
939 owned_nodes = self.owned_locks(locking.LEVEL_NODE)
940 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
942 need_nodes = self._DetermineNodes()
944 if not owned_nodes.issuperset(need_nodes):
945 raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
946 " locks were acquired, current nodes are"
947 " are '%s', used to be '%s'; retry the"
950 utils.CommaJoin(need_nodes),
951 utils.CommaJoin(owned_nodes)),
954 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
955 if owned_groups != wanted_groups:
956 raise errors.OpExecError("Node groups changed since locks were acquired,"
957 " current groups are '%s', used to be '%s';"
958 " retry the operation" %
959 (utils.CommaJoin(wanted_groups),
960 utils.CommaJoin(owned_groups)))
962 # Determine affected instances
963 self.instances = self._DetermineInstances()
964 self.instance_names = [i.name for i in self.instances]
966 if set(self.instance_names) != owned_instance_names:
967 raise errors.OpExecError("Instances on node '%s' changed since locks"
968 " were acquired, current instances are '%s',"
969 " used to be '%s'; retry the operation" %
971 utils.CommaJoin(self.instance_names),
972 utils.CommaJoin(owned_instance_names)))
974 if self.instance_names:
975 self.LogInfo("Evacuating instances from node '%s': %s",
977 utils.CommaJoin(utils.NiceSort(self.instance_names)))
979 self.LogInfo("No instances to evacuate from node '%s'",
982 if self.op.remote_node is not None:
983 for i in self.instances:
984 if i.primary_node == self.op.remote_node_uuid:
985 raise errors.OpPrereqError("Node %s is the primary node of"
986 " instance %s, cannot use it as"
988 (self.op.remote_node, i.name),
991 def Exec(self, feedback_fn):
992 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
994 if not self.instance_names:
995 # No instances to evacuate
998 elif self.op.iallocator is not None:
999 # TODO: Implement relocation to other group
1000 evac_mode = self._MODE2IALLOCATOR[self.op.mode]
1001 req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
1002 instances=list(self.instance_names))
1003 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1005 ial.Run(self.op.iallocator)
1008 raise errors.OpPrereqError("Can't compute node evacuation using"
1009 " iallocator '%s': %s" %
1010 (self.op.iallocator, ial.info),
1013 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1015 elif self.op.remote_node is not None:
1016 assert self.op.mode == constants.NODE_EVAC_SEC
1018 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1019 remote_node=self.op.remote_node,
1021 mode=constants.REPLACE_DISK_CHG,
1022 early_release=self.op.early_release)]
1023 for instance_name in self.instance_names]
1026 raise errors.ProgrammerError("No iallocator or remote node")
1028 return ResultWithJobs(jobs)
1031 class LUNodeMigrate(LogicalUnit):
1032 """Migrate all instances from a node.
1035 HPATH = "node-migrate"
1036 HTYPE = constants.HTYPE_NODE
1039 def CheckArguments(self):
1042 def ExpandNames(self):
1043 (self.op.node_uuid, self.op.node_name) = \
1044 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1046 self.share_locks = ShareAll()
1047 self.needed_locks = {
1048 locking.LEVEL_NODE: [self.op.node_uuid],
1051 def BuildHooksEnv(self):
1054 This runs on the master, the primary and all the secondaries.
1058 "NODE_NAME": self.op.node_name,
1059 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1062 def BuildHooksNodes(self):
1063 """Build hooks nodes.
1066 nl = [self.cfg.GetMasterNode()]
1069 def CheckPrereq(self):
1072 def Exec(self, feedback_fn):
1073 # Prepare jobs for migration instances
1075 [opcodes.OpInstanceMigrate(
1076 instance_name=inst.name,
1079 iallocator=self.op.iallocator,
1080 target_node=self.op.target_node,
1081 allow_runtime_changes=self.op.allow_runtime_changes,
1082 ignore_ipolicy=self.op.ignore_ipolicy)]
1083 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1085 # TODO: Run iallocator in this opcode and pass correct placement options to
1086 # OpInstanceMigrate. Since other jobs can modify the cluster between
1087 # running the iallocator and the actual migration, a good consistency model
1088 # will have to be found.
1090 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1091 frozenset([self.op.node_uuid]))
1093 return ResultWithJobs(jobs)
1096 def _GetStorageTypeArgs(cfg, storage_type):
1097 """Returns the arguments for a storage type.
1100 # Special case for file storage
1101 if storage_type == constants.ST_FILE:
1102 # storage.FileStorage wants a list of storage directories
1103 return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1108 class LUNodeModifyStorage(NoHooksLU):
1109 """Logical unit for modifying a storage volume on a node.
1114 def CheckArguments(self):
1115 (self.op.node_uuid, self.op.node_name) = \
1116 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1118 storage_type = self.op.storage_type
1121 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1123 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1124 " modified" % storage_type,
1127 diff = set(self.op.changes.keys()) - modifiable
1129 raise errors.OpPrereqError("The following fields can not be modified for"
1130 " storage units of type '%s': %r" %
1131 (storage_type, list(diff)),
1134 def CheckPrereq(self):
1135 """Check prerequisites.
1138 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1140 def ExpandNames(self):
1141 self.needed_locks = {
1142 locking.LEVEL_NODE: self.op.node_uuid,
1145 def Exec(self, feedback_fn):
1146 """Computes the list of nodes and their attributes.
1149 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1150 result = self.rpc.call_storage_modify(self.op.node_uuid,
1151 self.op.storage_type, st_args,
1152 self.op.name, self.op.changes)
1153 result.Raise("Failed to modify storage unit '%s' on %s" %
1154 (self.op.name, self.op.node_name))
1157 class NodeQuery(QueryBase):
1158 FIELDS = query.NODE_FIELDS
1160 def ExpandNames(self, lu):
1161 lu.needed_locks = {}
1162 lu.share_locks = ShareAll()
1165 (self.wanted, _) = GetWantedNodes(lu, self.names)
1167 self.wanted = locking.ALL_SET
1169 self.do_locking = (self.use_locking and
1170 query.NQ_LIVE in self.requested_data)
1173 # If any non-static field is requested we need to lock the nodes
1174 lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1175 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1177 def DeclareLocks(self, lu, level):
1180 def _GetQueryData(self, lu):
1181 """Computes the list of nodes and their attributes.
1184 all_info = lu.cfg.GetAllNodesInfo()
1186 node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1188 # Gather data as requested
1189 if query.NQ_LIVE in self.requested_data:
1190 # filter out non-vm_capable nodes
1191 toquery_node_uuids = [node.uuid for node in all_info.values()
1192 if node.vm_capable and node.uuid in node_uuids]
1193 lvm_enabled = utils.storage.IsLvmEnabled(
1194 lu.cfg.GetClusterInfo().enabled_disk_templates)
1195 # FIXME: this per default asks for storage space information for all
1196 # enabled disk templates. Fix this by making it possible to specify
1197 # space report fields for specific disk templates.
1198 raw_storage_units = utils.storage.GetStorageUnitsOfCluster(
1199 lu.cfg, include_spindles=lvm_enabled)
1200 storage_units = rpc.PrepareStorageUnitsForNodes(
1201 lu.cfg, raw_storage_units, toquery_node_uuids)
1202 default_hypervisor = lu.cfg.GetHypervisorType()
1203 hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1204 hvspecs = [(default_hypervisor, hvparams)]
1205 node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1208 (uuid, rpc.MakeLegacyNodeInfo(nresult.payload,
1209 require_spindles=lvm_enabled))
1210 for (uuid, nresult) in node_data.items()
1211 if not nresult.fail_msg and nresult.payload)
1215 if query.NQ_INST in self.requested_data:
1216 node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1217 node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1219 inst_data = lu.cfg.GetAllInstancesInfo()
1220 inst_uuid_to_inst_name = {}
1222 for inst in inst_data.values():
1223 inst_uuid_to_inst_name[inst.uuid] = inst.name
1224 if inst.primary_node in node_to_primary:
1225 node_to_primary[inst.primary_node].add(inst.uuid)
1226 for secnode in inst.secondary_nodes:
1227 if secnode in node_to_secondary:
1228 node_to_secondary[secnode].add(inst.uuid)
1230 node_to_primary = None
1231 node_to_secondary = None
1232 inst_uuid_to_inst_name = None
1234 if query.NQ_OOB in self.requested_data:
1235 oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1236 for uuid, node in all_info.iteritems())
1240 if query.NQ_GROUP in self.requested_data:
1241 groups = lu.cfg.GetAllNodeGroupsInfo()
1245 return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1246 live_data, lu.cfg.GetMasterNode(),
1247 node_to_primary, node_to_secondary,
1248 inst_uuid_to_inst_name, groups, oob_support,
1249 lu.cfg.GetClusterInfo())
1252 class LUNodeQuery(NoHooksLU):
1253 """Logical unit for querying nodes.
1256 # pylint: disable=W0142
1259 def CheckArguments(self):
1260 self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1261 self.op.output_fields, self.op.use_locking)
1263 def ExpandNames(self):
1264 self.nq.ExpandNames(self)
1266 def DeclareLocks(self, level):
1267 self.nq.DeclareLocks(self, level)
1269 def Exec(self, feedback_fn):
1270 return self.nq.OldStyleQuery(self)
1273 def _CheckOutputFields(static, dynamic, selected):
1274 """Checks whether all selected fields are valid.
1276 @type static: L{utils.FieldSet}
1277 @param static: static fields set
1278 @type dynamic: L{utils.FieldSet}
1279 @param dynamic: dynamic fields set
1282 f = utils.FieldSet()
1286 delta = f.NonMatching(selected)
1288 raise errors.OpPrereqError("Unknown output fields selected: %s"
1289 % ",".join(delta), errors.ECODE_INVAL)
1292 class LUNodeQueryvols(NoHooksLU):
1293 """Logical unit for getting volumes on node(s).
1297 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1298 _FIELDS_STATIC = utils.FieldSet("node")
1300 def CheckArguments(self):
1301 _CheckOutputFields(static=self._FIELDS_STATIC,
1302 dynamic=self._FIELDS_DYNAMIC,
1303 selected=self.op.output_fields)
1305 def ExpandNames(self):
1306 self.share_locks = ShareAll()
1309 self.needed_locks = {
1310 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1313 self.needed_locks = {
1314 locking.LEVEL_NODE: locking.ALL_SET,
1315 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1318 def Exec(self, feedback_fn):
1319 """Computes the list of nodes and their attributes.
1322 node_uuids = self.owned_locks(locking.LEVEL_NODE)
1323 volumes = self.rpc.call_node_volumes(node_uuids)
1325 ilist = self.cfg.GetAllInstancesInfo()
1326 vol2inst = MapInstanceLvsToNodes(ilist.values())
1329 for node_uuid in node_uuids:
1330 nresult = volumes[node_uuid]
1333 msg = nresult.fail_msg
1335 self.LogWarning("Can't compute volume data on node %s: %s",
1336 self.cfg.GetNodeName(node_uuid), msg)
1339 node_vols = sorted(nresult.payload,
1340 key=operator.itemgetter("dev"))
1342 for vol in node_vols:
1344 for field in self.op.output_fields:
1346 val = self.cfg.GetNodeName(node_uuid)
1347 elif field == "phys":
1351 elif field == "name":
1353 elif field == "size":
1354 val = int(float(vol["size"]))
1355 elif field == "instance":
1356 inst = vol2inst.get((node_uuid, vol["vg"] + "/" + vol["name"]),
1358 if inst is not None:
1363 raise errors.ParameterError(field)
1364 node_output.append(str(val))
1366 output.append(node_output)
1371 class LUNodeQueryStorage(NoHooksLU):
1372 """Logical unit for getting information on storage units on node(s).
1375 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1378 def CheckArguments(self):
1379 _CheckOutputFields(static=self._FIELDS_STATIC,
1380 dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1381 selected=self.op.output_fields)
1383 def ExpandNames(self):
1384 self.share_locks = ShareAll()
1387 self.needed_locks = {
1388 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1391 self.needed_locks = {
1392 locking.LEVEL_NODE: locking.ALL_SET,
1393 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1396 def CheckPrereq(self):
1397 """Check prerequisites.
1400 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1402 def Exec(self, feedback_fn):
1403 """Computes the list of nodes and their attributes.
1406 self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1408 # Always get name to sort by
1409 if constants.SF_NAME in self.op.output_fields:
1410 fields = self.op.output_fields[:]
1412 fields = [constants.SF_NAME] + self.op.output_fields
1414 # Never ask for node or type as it's only known to the LU
1415 for extra in [constants.SF_NODE, constants.SF_TYPE]:
1416 while extra in fields:
1417 fields.remove(extra)
1419 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1420 name_idx = field_idx[constants.SF_NAME]
1422 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1423 data = self.rpc.call_storage_list(self.node_uuids,
1424 self.op.storage_type, st_args,
1425 self.op.name, fields)
1429 for node_uuid in utils.NiceSort(self.node_uuids):
1430 node_name = self.cfg.GetNodeName(node_uuid)
1431 nresult = data[node_uuid]
1435 msg = nresult.fail_msg
1437 self.LogWarning("Can't get storage data from node %s: %s",
1441 rows = dict([(row[name_idx], row) for row in nresult.payload])
1443 for name in utils.NiceSort(rows.keys()):
1448 for field in self.op.output_fields:
1449 if field == constants.SF_NODE:
1451 elif field == constants.SF_TYPE:
1452 val = self.op.storage_type
1453 elif field in field_idx:
1454 val = row[field_idx[field]]
1456 raise errors.ParameterError(field)
1465 class LUNodeRemove(LogicalUnit):
1466 """Logical unit for removing a node.
1469 HPATH = "node-remove"
1470 HTYPE = constants.HTYPE_NODE
1472 def BuildHooksEnv(self):
1477 "OP_TARGET": self.op.node_name,
1478 "NODE_NAME": self.op.node_name,
1481 def BuildHooksNodes(self):
1482 """Build hooks nodes.
1484 This doesn't run on the target node in the pre phase as a failed
1485 node would then be impossible to remove.
1488 all_nodes = self.cfg.GetNodeList()
1490 all_nodes.remove(self.op.node_uuid)
1493 return (all_nodes, all_nodes)
1495 def CheckPrereq(self):
1496 """Check prerequisites.
1499 - the node exists in the configuration
1500 - it does not have primary or secondary instances
1501 - it's not the master
1503 Any errors are signaled by raising errors.OpPrereqError.
1506 (self.op.node_uuid, self.op.node_name) = \
1507 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1508 node = self.cfg.GetNodeInfo(self.op.node_uuid)
1509 assert node is not None
1511 masternode = self.cfg.GetMasterNode()
1512 if node.uuid == masternode:
1513 raise errors.OpPrereqError("Node is the master node, failover to another"
1514 " node is required", errors.ECODE_INVAL)
1516 for _, instance in self.cfg.GetAllInstancesInfo().items():
1517 if node.uuid in instance.all_nodes:
1518 raise errors.OpPrereqError("Instance %s is still running on the node,"
1519 " please remove first" % instance.name,
1521 self.op.node_name = node.name
1524 def Exec(self, feedback_fn):
1525 """Removes the node from the cluster.
1528 logging.info("Stopping the node daemon and removing configs from node %s",
1531 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1533 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1536 # Promote nodes to master candidate as needed
1537 AdjustCandidatePool(self, exceptions=[self.node.uuid])
1538 self.context.RemoveNode(self.node)
1540 # Run post hooks on the node before it's removed
1541 RunPostHook(self, self.node.name)
1543 # we have to call this by name rather than by UUID, as the node is no longer
1545 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1546 msg = result.fail_msg
1548 self.LogWarning("Errors encountered on the remote node while leaving"
1549 " the cluster: %s", msg)
1551 # Remove node from our /etc/hosts
1552 if self.cfg.GetClusterInfo().modify_etc_hosts:
1553 master_node_uuid = self.cfg.GetMasterNode()
1554 result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1555 constants.ETC_HOSTS_REMOVE,
1556 self.node.name, None)
1557 result.Raise("Can't update hosts file with new host data")
1558 RedistributeAncillaryFiles(self)
1561 class LURepairNodeStorage(NoHooksLU):
1562 """Repairs the volume group on a node.
1567 def CheckArguments(self):
1568 (self.op.node_uuid, self.op.node_name) = \
1569 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1571 storage_type = self.op.storage_type
1573 if (constants.SO_FIX_CONSISTENCY not in
1574 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1575 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1576 " repaired" % storage_type,
1579 def ExpandNames(self):
1580 self.needed_locks = {
1581 locking.LEVEL_NODE: [self.op.node_uuid],
1584 def _CheckFaultyDisks(self, instance, node_uuid):
1585 """Ensure faulty disks abort the opcode or at least warn."""
1587 if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1589 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1592 self.cfg.GetNodeName(node_uuid)),
1594 except errors.OpPrereqError, err:
1595 if self.op.ignore_consistency:
1596 self.LogWarning(str(err.args[0]))
1600 def CheckPrereq(self):
1601 """Check prerequisites.
1604 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1606 # Check whether any instance on this node has faulty disks
1607 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1608 if not inst.disks_active:
1610 check_nodes = set(inst.all_nodes)
1611 check_nodes.discard(self.op.node_uuid)
1612 for inst_node_uuid in check_nodes:
1613 self._CheckFaultyDisks(inst, inst_node_uuid)
1615 def Exec(self, feedback_fn):
1616 feedback_fn("Repairing storage unit '%s' on %s ..." %
1617 (self.op.name, self.op.node_name))
1619 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1620 result = self.rpc.call_storage_execute(self.op.node_uuid,
1621 self.op.storage_type, st_args,
1623 constants.SO_FIX_CONSISTENCY)
1624 result.Raise("Failed to repair storage unit '%s' on %s" %
1625 (self.op.name, self.op.node_name))