4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Logical units dealing with nodes."""
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti import netutils
31 from ganeti import objects
32 from ganeti import opcodes
33 from ganeti import qlang
34 from ganeti import query
35 from ganeti import rpc
36 from ganeti import utils
37 from ganeti.masterd import iallocator
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
41 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42 MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43 IsExclusiveStorageEnabledNode, CheckNodePVs, \
44 RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45 CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46 AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47 GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48 FindFaultyInstanceDisks
51 def _DecideSelfPromotion(lu, exceptions=None):
52 """Decide whether I should promote myself as a master candidate.
55 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57 # the new node will increase mc_max with one, so:
58 mc_should = min(mc_should + 1, cp_size)
59 return mc_now < mc_should
62 def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63 """Ensure that a node has the given secondary ip.
65 @type lu: L{LogicalUnit}
66 @param lu: the LU on behalf of which we make the check
67 @type node: L{objects.Node}
68 @param node: the node to check
69 @type secondary_ip: string
70 @param secondary_ip: the ip to check
72 @param prereq: whether to throw a prerequisite or an execute error
73 @raise errors.OpPrereqError: if the node doesn't have the ip,
75 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
78 # this can be called with a new node, which has no UUID yet, so perform the
79 # RPC call using its name
80 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81 result.Raise("Failure checking secondary ip on node %s" % node.name,
82 prereq=prereq, ecode=errors.ECODE_ENVIRON)
83 if not result.payload:
84 msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85 " please fix and re-run this command" % secondary_ip)
87 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
89 raise errors.OpExecError(msg)
92 class LUNodeAdd(LogicalUnit):
93 """Logical unit for adding node to the cluster.
97 HTYPE = constants.HTYPE_NODE
98 _NFLAGS = ["master_capable", "vm_capable"]
100 def CheckArguments(self):
101 self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102 # validate/normalize the node name
103 self.hostname = netutils.GetHostname(name=self.op.node_name,
104 family=self.primary_ip_family)
105 self.op.node_name = self.hostname.name
107 if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108 raise errors.OpPrereqError("Cannot readd the master node",
111 if self.op.readd and self.op.group:
112 raise errors.OpPrereqError("Cannot pass a node group when a node is"
113 " being readded", errors.ECODE_INVAL)
115 def BuildHooksEnv(self):
118 This will run on all nodes before, and on all nodes + the new node after.
122 "OP_TARGET": self.op.node_name,
123 "NODE_NAME": self.op.node_name,
124 "NODE_PIP": self.op.primary_ip,
125 "NODE_SIP": self.op.secondary_ip,
126 "MASTER_CAPABLE": str(self.op.master_capable),
127 "VM_CAPABLE": str(self.op.vm_capable),
130 def BuildHooksNodes(self):
131 """Build hooks nodes.
134 hook_nodes = self.cfg.GetNodeList()
135 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
136 if new_node_info is not None:
138 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
140 # add the new node as post hook node by name; it does not have an UUID yet
141 return (hook_nodes, hook_nodes, [self.op.node_name, ])
143 def CheckPrereq(self):
144 """Check prerequisites.
147 - the new node is not already in the config
149 - its parameters (single/dual homed) matches the cluster
151 Any errors are signaled by raising errors.OpPrereqError.
154 node_name = self.hostname.name
155 self.op.primary_ip = self.hostname.ip
156 if self.op.secondary_ip is None:
157 if self.primary_ip_family == netutils.IP6Address.family:
158 raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
159 " IPv4 address must be given as secondary",
161 self.op.secondary_ip = self.op.primary_ip
163 secondary_ip = self.op.secondary_ip
164 if not netutils.IP4Address.IsValid(secondary_ip):
165 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
166 " address" % secondary_ip, errors.ECODE_INVAL)
168 existing_node_info = self.cfg.GetNodeInfoByName(node_name)
169 if not self.op.readd and existing_node_info is not None:
170 raise errors.OpPrereqError("Node %s is already in the configuration" %
171 node_name, errors.ECODE_EXISTS)
172 elif self.op.readd and existing_node_info is None:
173 raise errors.OpPrereqError("Node %s is not in the configuration" %
174 node_name, errors.ECODE_NOENT)
176 self.changed_primary_ip = False
178 for existing_node in self.cfg.GetAllNodesInfo().values():
179 if self.op.readd and node_name == existing_node.name:
180 if existing_node.secondary_ip != secondary_ip:
181 raise errors.OpPrereqError("Readded node doesn't have the same IP"
182 " address configuration as before",
184 if existing_node.primary_ip != self.op.primary_ip:
185 self.changed_primary_ip = True
189 if (existing_node.primary_ip == self.op.primary_ip or
190 existing_node.secondary_ip == self.op.primary_ip or
191 existing_node.primary_ip == secondary_ip or
192 existing_node.secondary_ip == secondary_ip):
193 raise errors.OpPrereqError("New node ip address(es) conflict with"
194 " existing node %s" % existing_node.name,
195 errors.ECODE_NOTUNIQUE)
197 # After this 'if' block, None is no longer a valid value for the
198 # _capable op attributes
200 assert existing_node_info is not None, \
201 "Can't retrieve locked node %s" % node_name
202 for attr in self._NFLAGS:
203 if getattr(self.op, attr) is None:
204 setattr(self.op, attr, getattr(existing_node_info, attr))
206 for attr in self._NFLAGS:
207 if getattr(self.op, attr) is None:
208 setattr(self.op, attr, True)
210 if self.op.readd and not self.op.vm_capable:
211 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
213 raise errors.OpPrereqError("Node %s being re-added with vm_capable"
214 " flag set to false, but it already holds"
215 " instances" % node_name,
218 # check that the type of the node (single versus dual homed) is the
219 # same as for the master
220 myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
221 master_singlehomed = myself.secondary_ip == myself.primary_ip
222 newbie_singlehomed = secondary_ip == self.op.primary_ip
223 if master_singlehomed != newbie_singlehomed:
224 if master_singlehomed:
225 raise errors.OpPrereqError("The master has no secondary ip but the"
229 raise errors.OpPrereqError("The master has a secondary ip but the"
230 " new node doesn't have one",
233 # checks reachability
234 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
235 raise errors.OpPrereqError("Node not reachable by ping",
236 errors.ECODE_ENVIRON)
238 if not newbie_singlehomed:
239 # check reachability from my secondary ip to newbie's secondary ip
240 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
241 source=myself.secondary_ip):
242 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
243 " based ping to node daemon port",
244 errors.ECODE_ENVIRON)
247 exceptions = [existing_node_info.uuid]
251 if self.op.master_capable:
252 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
254 self.master_candidate = False
257 self.new_node = existing_node_info
259 node_group = self.cfg.LookupNodeGroup(self.op.group)
260 self.new_node = objects.Node(name=node_name,
261 primary_ip=self.op.primary_ip,
262 secondary_ip=secondary_ip,
263 master_candidate=self.master_candidate,
264 offline=False, drained=False,
265 group=node_group, ndparams={})
268 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
269 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
270 "node", "cluster or group")
273 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
275 if self.op.disk_state:
276 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
278 # TODO: If we need to have multiple DnsOnlyRunner we probably should make
279 # it a property on the base class.
280 rpcrunner = rpc.DnsOnlyRunner()
281 result = rpcrunner.call_version([node_name])[node_name]
282 result.Raise("Can't get version information from node %s" % node_name)
283 if constants.PROTOCOL_VERSION == result.payload:
284 logging.info("Communication to node %s fine, sw version %s match",
285 node_name, result.payload)
287 raise errors.OpPrereqError("Version mismatch master version %s,"
289 (constants.PROTOCOL_VERSION, result.payload),
290 errors.ECODE_ENVIRON)
292 vg_name = self.cfg.GetVGName()
293 if vg_name is not None:
294 vparams = {constants.NV_PVLIST: [vg_name]}
295 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
296 cname = self.cfg.GetClusterName()
297 result = rpcrunner.call_node_verify_light(
298 [node_name], vparams, cname,
299 self.cfg.GetClusterInfo().hvparams)[node_name]
300 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
302 raise errors.OpPrereqError("Checks on node PVs failed: %s" %
303 "; ".join(errmsgs), errors.ECODE_ENVIRON)
305 def Exec(self, feedback_fn):
306 """Adds the new node to the cluster.
309 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
312 # We adding a new node so we assume it's powered
313 self.new_node.powered = True
315 # for re-adds, reset the offline/drained/master-candidate flags;
316 # we need to reset here, otherwise offline would prevent RPC calls
317 # later in the procedure; this also means that if the re-add
318 # fails, we are left with a non-offlined, broken node
320 self.new_node.drained = False
321 self.LogInfo("Readding a node, the offline/drained flags were reset")
322 # if we demote the node, we do cleanup later in the procedure
323 self.new_node.master_candidate = self.master_candidate
324 if self.changed_primary_ip:
325 self.new_node.primary_ip = self.op.primary_ip
327 # copy the master/vm_capable flags
328 for attr in self._NFLAGS:
329 setattr(self.new_node, attr, getattr(self.op, attr))
331 # notify the user about any possible mc promotion
332 if self.new_node.master_candidate:
333 self.LogInfo("Node will be a master candidate")
336 self.new_node.ndparams = self.op.ndparams
338 self.new_node.ndparams = {}
341 self.new_node.hv_state_static = self.new_hv_state
343 if self.op.disk_state:
344 self.new_node.disk_state_static = self.new_disk_state
346 # Add node to our /etc/hosts, and add key to known_hosts
347 if self.cfg.GetClusterInfo().modify_etc_hosts:
348 master_node = self.cfg.GetMasterNode()
349 result = self.rpc.call_etc_hosts_modify(
350 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
352 result.Raise("Can't update hosts file with new host data")
354 if self.new_node.secondary_ip != self.new_node.primary_ip:
355 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
358 node_verifier_uuids = [self.cfg.GetMasterNode()]
359 node_verify_param = {
360 constants.NV_NODELIST: ([self.new_node.name], {}),
361 # TODO: do a node-net-test as well?
364 result = self.rpc.call_node_verify(
365 node_verifier_uuids, node_verify_param,
366 self.cfg.GetClusterName(),
367 self.cfg.GetClusterInfo().hvparams)
368 for verifier in node_verifier_uuids:
369 result[verifier].Raise("Cannot communicate with node %s" % verifier)
370 nl_payload = result[verifier].payload[constants.NV_NODELIST]
372 for failed in nl_payload:
373 feedback_fn("ssh/hostname verification failed"
374 " (checking from %s): %s" %
375 (verifier, nl_payload[failed]))
376 raise errors.OpExecError("ssh/hostname verification failed")
379 self.context.ReaddNode(self.new_node)
380 RedistributeAncillaryFiles(self)
381 # make sure we redistribute the config
382 self.cfg.Update(self.new_node, feedback_fn)
383 # and make sure the new node will not have old files around
384 if not self.new_node.master_candidate:
385 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
386 result.Warn("Node failed to demote itself from master candidate status",
389 self.context.AddNode(self.new_node, self.proc.GetECId())
390 RedistributeAncillaryFiles(self)
393 class LUNodeSetParams(LogicalUnit):
394 """Modifies the parameters of a node.
396 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
397 to the node role (as _ROLE_*)
398 @cvar _R2F: a dictionary from node role to tuples of flags
399 @cvar _FLAGS: a list of attribute names corresponding to the flags
402 HPATH = "node-modify"
403 HTYPE = constants.HTYPE_NODE
405 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
407 (True, False, False): _ROLE_CANDIDATE,
408 (False, True, False): _ROLE_DRAINED,
409 (False, False, True): _ROLE_OFFLINE,
410 (False, False, False): _ROLE_REGULAR,
412 _R2F = dict((v, k) for k, v in _F2R.items())
413 _FLAGS = ["master_candidate", "drained", "offline"]
415 def CheckArguments(self):
416 (self.op.node_uuid, self.op.node_name) = \
417 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
418 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
419 self.op.master_capable, self.op.vm_capable,
420 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
422 if all_mods.count(None) == len(all_mods):
423 raise errors.OpPrereqError("Please pass at least one modification",
425 if all_mods.count(True) > 1:
426 raise errors.OpPrereqError("Can't set the node into more than one"
427 " state at the same time",
430 # Boolean value that tells us whether we might be demoting from MC
431 self.might_demote = (self.op.master_candidate is False or
432 self.op.offline is True or
433 self.op.drained is True or
434 self.op.master_capable is False)
436 if self.op.secondary_ip:
437 if not netutils.IP4Address.IsValid(self.op.secondary_ip):
438 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
439 " address" % self.op.secondary_ip,
442 self.lock_all = self.op.auto_promote and self.might_demote
443 self.lock_instances = self.op.secondary_ip is not None
445 def _InstanceFilter(self, instance):
446 """Filter for getting affected instances.
449 return (instance.disk_template in constants.DTS_INT_MIRROR and
450 self.op.node_uuid in instance.all_nodes)
452 def ExpandNames(self):
454 self.needed_locks = {
455 locking.LEVEL_NODE: locking.ALL_SET,
457 # Block allocations when all nodes are locked
458 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
461 self.needed_locks = {
462 locking.LEVEL_NODE: self.op.node_uuid,
465 # Since modifying a node can have severe effects on currently running
466 # operations the resource lock is at least acquired in shared mode
467 self.needed_locks[locking.LEVEL_NODE_RES] = \
468 self.needed_locks[locking.LEVEL_NODE]
470 # Get all locks except nodes in shared mode; they are not used for anything
471 # but read-only access
472 self.share_locks = ShareAll()
473 self.share_locks[locking.LEVEL_NODE] = 0
474 self.share_locks[locking.LEVEL_NODE_RES] = 0
475 self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
477 if self.lock_instances:
478 self.needed_locks[locking.LEVEL_INSTANCE] = \
479 self.cfg.GetInstanceNames(
480 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
482 def BuildHooksEnv(self):
485 This runs on the master node.
489 "OP_TARGET": self.op.node_name,
490 "MASTER_CANDIDATE": str(self.op.master_candidate),
491 "OFFLINE": str(self.op.offline),
492 "DRAINED": str(self.op.drained),
493 "MASTER_CAPABLE": str(self.op.master_capable),
494 "VM_CAPABLE": str(self.op.vm_capable),
497 def BuildHooksNodes(self):
498 """Build hooks nodes.
501 nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
504 def CheckPrereq(self):
505 """Check prerequisites.
507 This only checks the instance list against the existing names.
510 node = self.cfg.GetNodeInfo(self.op.node_uuid)
511 if self.lock_instances:
512 affected_instances = \
513 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
515 # Verify instance locks
516 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
517 wanted_instance_names = frozenset([inst.name for inst in
518 affected_instances.values()])
519 if wanted_instance_names - owned_instance_names:
520 raise errors.OpPrereqError("Instances affected by changing node %s's"
521 " secondary IP address have changed since"
522 " locks were acquired, wanted '%s', have"
523 " '%s'; retry the operation" %
525 utils.CommaJoin(wanted_instance_names),
526 utils.CommaJoin(owned_instance_names)),
529 affected_instances = None
531 if (self.op.master_candidate is not None or
532 self.op.drained is not None or
533 self.op.offline is not None):
534 # we can't change the master's node flags
535 if node.uuid == self.cfg.GetMasterNode():
536 raise errors.OpPrereqError("The master role can be changed"
537 " only via master-failover",
540 if self.op.master_candidate and not node.master_capable:
541 raise errors.OpPrereqError("Node %s is not master capable, cannot make"
542 " it a master candidate" % node.name,
545 if self.op.vm_capable is False:
546 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
548 raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
549 " the vm_capable flag" % node.name,
552 if node.master_candidate and self.might_demote and not self.lock_all:
553 assert not self.op.auto_promote, "auto_promote set but lock_all not"
554 # check if after removing the current node, we're missing master
556 (mc_remaining, mc_should, _) = \
557 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
558 if mc_remaining < mc_should:
559 raise errors.OpPrereqError("Not enough master candidates, please"
560 " pass auto promote option to allow"
561 " promotion (--auto-promote or RAPI"
562 " auto_promote=True)", errors.ECODE_STATE)
564 self.old_flags = old_flags = (node.master_candidate,
565 node.drained, node.offline)
566 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
567 self.old_role = old_role = self._F2R[old_flags]
569 # Check for ineffective changes
570 for attr in self._FLAGS:
571 if getattr(self.op, attr) is False and getattr(node, attr) is False:
572 self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
573 setattr(self.op, attr, None)
575 # Past this point, any flag change to False means a transition
576 # away from the respective state, as only real changes are kept
578 # TODO: We might query the real power state if it supports OOB
579 if SupportsOob(self.cfg, node):
580 if self.op.offline is False and not (node.powered or
581 self.op.powered is True):
582 raise errors.OpPrereqError(("Node %s needs to be turned on before its"
583 " offline status can be reset") %
584 self.op.node_name, errors.ECODE_STATE)
585 elif self.op.powered is not None:
586 raise errors.OpPrereqError(("Unable to change powered state for node %s"
587 " as it does not support out-of-band"
588 " handling") % self.op.node_name,
591 # If we're being deofflined/drained, we'll MC ourself if needed
592 if (self.op.drained is False or self.op.offline is False or
593 (self.op.master_capable and not node.master_capable)):
594 if _DecideSelfPromotion(self):
595 self.op.master_candidate = True
596 self.LogInfo("Auto-promoting node to master candidate")
598 # If we're no longer master capable, we'll demote ourselves from MC
599 if self.op.master_capable is False and node.master_candidate:
600 self.LogInfo("Demoting from master candidate")
601 self.op.master_candidate = False
604 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
605 if self.op.master_candidate:
606 new_role = self._ROLE_CANDIDATE
607 elif self.op.drained:
608 new_role = self._ROLE_DRAINED
609 elif self.op.offline:
610 new_role = self._ROLE_OFFLINE
611 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
612 # False is still in new flags, which means we're un-setting (the
614 new_role = self._ROLE_REGULAR
615 else: # no new flags, nothing, keep old role
618 self.new_role = new_role
620 if old_role == self._ROLE_OFFLINE and new_role != old_role:
621 # Trying to transition out of offline status
622 result = self.rpc.call_version([node.uuid])[node.uuid]
624 raise errors.OpPrereqError("Node %s is being de-offlined but fails"
625 " to report its version: %s" %
626 (node.name, result.fail_msg),
629 self.LogWarning("Transitioning node from offline to online state"
630 " without using re-add. Please make sure the node"
633 # When changing the secondary ip, verify if this is a single-homed to
634 # multi-homed transition or vice versa, and apply the relevant
636 if self.op.secondary_ip:
637 # Ok even without locking, because this can't be changed by any LU
638 master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
639 master_singlehomed = master.secondary_ip == master.primary_ip
640 if master_singlehomed and self.op.secondary_ip != node.primary_ip:
641 if self.op.force and node.uuid == master.uuid:
642 self.LogWarning("Transitioning from single-homed to multi-homed"
643 " cluster; all nodes will require a secondary IP"
646 raise errors.OpPrereqError("Changing the secondary ip on a"
647 " single-homed cluster requires the"
648 " --force option to be passed, and the"
649 " target node to be the master",
651 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
652 if self.op.force and node.uuid == master.uuid:
653 self.LogWarning("Transitioning from multi-homed to single-homed"
654 " cluster; secondary IP addresses will have to be"
657 raise errors.OpPrereqError("Cannot set the secondary IP to be the"
658 " same as the primary IP on a multi-homed"
659 " cluster, unless the --force option is"
660 " passed, and the target node is the"
661 " master", errors.ECODE_INVAL)
663 assert not (set([inst.name for inst in affected_instances.values()]) -
664 self.owned_locks(locking.LEVEL_INSTANCE))
667 if affected_instances:
668 msg = ("Cannot change secondary IP address: offline node has"
669 " instances (%s) configured to use it" %
671 [inst.name for inst in affected_instances.values()]))
672 raise errors.OpPrereqError(msg, errors.ECODE_STATE)
674 # On online nodes, check that no instances are running, and that
675 # the node has the new ip and we can reach it.
676 for instance in affected_instances.values():
677 CheckInstanceState(self, instance, INSTANCE_DOWN,
678 msg="cannot change secondary ip")
680 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
681 if master.uuid != node.uuid:
682 # check reachability from master secondary ip to new secondary ip
683 if not netutils.TcpPing(self.op.secondary_ip,
684 constants.DEFAULT_NODED_PORT,
685 source=master.secondary_ip):
686 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
687 " based ping to node daemon port",
688 errors.ECODE_ENVIRON)
691 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
692 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
693 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
694 "node", "cluster or group")
695 self.new_ndparams = new_ndparams
698 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
699 node.hv_state_static)
701 if self.op.disk_state:
702 self.new_disk_state = \
703 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
705 def Exec(self, feedback_fn):
709 node = self.cfg.GetNodeInfo(self.op.node_uuid)
713 node.ndparams = self.new_ndparams
715 if self.op.powered is not None:
716 node.powered = self.op.powered
719 node.hv_state_static = self.new_hv_state
721 if self.op.disk_state:
722 node.disk_state_static = self.new_disk_state
724 for attr in ["master_capable", "vm_capable"]:
725 val = getattr(self.op, attr)
727 setattr(node, attr, val)
728 result.append((attr, str(val)))
730 if self.new_role != self.old_role:
731 # Tell the node to demote itself, if no longer MC and not offline
732 if self.old_role == self._ROLE_CANDIDATE and \
733 self.new_role != self._ROLE_OFFLINE:
734 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
736 self.LogWarning("Node failed to demote itself: %s", msg)
738 new_flags = self._R2F[self.new_role]
739 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
741 result.append((desc, str(nf)))
742 (node.master_candidate, node.drained, node.offline) = new_flags
744 # we locked all nodes, we adjust the CP before updating this node
746 AdjustCandidatePool(self, [node.uuid])
748 if self.op.secondary_ip:
749 node.secondary_ip = self.op.secondary_ip
750 result.append(("secondary_ip", self.op.secondary_ip))
752 # this will trigger configuration file update, if needed
753 self.cfg.Update(node, feedback_fn)
755 # this will trigger job queue propagation or cleanup if the mc
757 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
758 self.context.ReaddNode(node)
763 class LUNodePowercycle(NoHooksLU):
764 """Powercycles a node.
769 def CheckArguments(self):
770 (self.op.node_uuid, self.op.node_name) = \
771 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
773 if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
774 raise errors.OpPrereqError("The node is the master and the force"
775 " parameter was not set",
778 def ExpandNames(self):
779 """Locking for PowercycleNode.
781 This is a last-resort option and shouldn't block on other
782 jobs. Therefore, we grab no locks.
785 self.needed_locks = {}
787 def Exec(self, feedback_fn):
791 default_hypervisor = self.cfg.GetHypervisorType()
792 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
793 result = self.rpc.call_node_powercycle(self.op.node_uuid,
796 result.Raise("Failed to schedule the reboot")
797 return result.payload
800 def _GetNodeInstancesInner(cfg, fn):
801 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
804 def _GetNodePrimaryInstances(cfg, node_uuid):
805 """Returns primary instances on a node.
808 return _GetNodeInstancesInner(cfg,
809 lambda inst: node_uuid == inst.primary_node)
812 def _GetNodeSecondaryInstances(cfg, node_uuid):
813 """Returns secondary instances on a node.
816 return _GetNodeInstancesInner(cfg,
817 lambda inst: node_uuid in inst.secondary_nodes)
820 def _GetNodeInstances(cfg, node_uuid):
821 """Returns a list of all primary and secondary instances on a node.
825 return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
828 class LUNodeEvacuate(NoHooksLU):
829 """Evacuates instances off a list of nodes.
835 constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
836 constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
837 constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
839 assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
840 assert (frozenset(_MODE2IALLOCATOR.values()) ==
841 constants.IALLOCATOR_NEVAC_MODES)
843 def CheckArguments(self):
844 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
846 def ExpandNames(self):
847 (self.op.node_uuid, self.op.node_name) = \
848 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
850 if self.op.remote_node is not None:
851 (self.op.remote_node_uuid, self.op.remote_node) = \
852 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
854 assert self.op.remote_node
856 if self.op.node_uuid == self.op.remote_node_uuid:
857 raise errors.OpPrereqError("Can not use evacuated node as a new"
858 " secondary node", errors.ECODE_INVAL)
860 if self.op.mode != constants.NODE_EVAC_SEC:
861 raise errors.OpPrereqError("Without the use of an iallocator only"
862 " secondary instances can be evacuated",
866 self.share_locks = ShareAll()
867 self.needed_locks = {
868 locking.LEVEL_INSTANCE: [],
869 locking.LEVEL_NODEGROUP: [],
870 locking.LEVEL_NODE: [],
873 # Determine nodes (via group) optimistically, needs verification once locks
875 self.lock_nodes = self._DetermineNodes()
877 def _DetermineNodes(self):
878 """Gets the list of node UUIDs to operate on.
881 if self.op.remote_node is None:
882 # Iallocator will choose any node(s) in the same group
883 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
885 group_nodes = frozenset([self.op.remote_node_uuid])
887 # Determine nodes to be locked
888 return set([self.op.node_uuid]) | group_nodes
890 def _DetermineInstances(self):
891 """Builds list of instances to operate on.
894 assert self.op.mode in constants.NODE_EVAC_MODES
896 if self.op.mode == constants.NODE_EVAC_PRI:
897 # Primary instances only
898 inst_fn = _GetNodePrimaryInstances
899 assert self.op.remote_node is None, \
900 "Evacuating primary instances requires iallocator"
901 elif self.op.mode == constants.NODE_EVAC_SEC:
902 # Secondary instances only
903 inst_fn = _GetNodeSecondaryInstances
906 assert self.op.mode == constants.NODE_EVAC_ALL
907 inst_fn = _GetNodeInstances
908 # TODO: In 2.6, change the iallocator interface to take an evacuation mode
910 raise errors.OpPrereqError("Due to an issue with the iallocator"
911 " interface it is not possible to evacuate"
912 " all instances at once; specify explicitly"
913 " whether to evacuate primary or secondary"
917 return inst_fn(self.cfg, self.op.node_uuid)
919 def DeclareLocks(self, level):
920 if level == locking.LEVEL_INSTANCE:
921 # Lock instances optimistically, needs verification once node and group
922 # locks have been acquired
923 self.needed_locks[locking.LEVEL_INSTANCE] = \
924 set(i.name for i in self._DetermineInstances())
926 elif level == locking.LEVEL_NODEGROUP:
927 # Lock node groups for all potential target nodes optimistically, needs
928 # verification once nodes have been acquired
929 self.needed_locks[locking.LEVEL_NODEGROUP] = \
930 self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
932 elif level == locking.LEVEL_NODE:
933 self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
935 def CheckPrereq(self):
937 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
938 owned_nodes = self.owned_locks(locking.LEVEL_NODE)
939 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
941 need_nodes = self._DetermineNodes()
943 if not owned_nodes.issuperset(need_nodes):
944 raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
945 " locks were acquired, current nodes are"
946 " are '%s', used to be '%s'; retry the"
949 utils.CommaJoin(need_nodes),
950 utils.CommaJoin(owned_nodes)),
953 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
954 if owned_groups != wanted_groups:
955 raise errors.OpExecError("Node groups changed since locks were acquired,"
956 " current groups are '%s', used to be '%s';"
957 " retry the operation" %
958 (utils.CommaJoin(wanted_groups),
959 utils.CommaJoin(owned_groups)))
961 # Determine affected instances
962 self.instances = self._DetermineInstances()
963 self.instance_names = [i.name for i in self.instances]
965 if set(self.instance_names) != owned_instance_names:
966 raise errors.OpExecError("Instances on node '%s' changed since locks"
967 " were acquired, current instances are '%s',"
968 " used to be '%s'; retry the operation" %
970 utils.CommaJoin(self.instance_names),
971 utils.CommaJoin(owned_instance_names)))
973 if self.instance_names:
974 self.LogInfo("Evacuating instances from node '%s': %s",
976 utils.CommaJoin(utils.NiceSort(self.instance_names)))
978 self.LogInfo("No instances to evacuate from node '%s'",
981 if self.op.remote_node is not None:
982 for i in self.instances:
983 if i.primary_node == self.op.remote_node_uuid:
984 raise errors.OpPrereqError("Node %s is the primary node of"
985 " instance %s, cannot use it as"
987 (self.op.remote_node, i.name),
990 def Exec(self, feedback_fn):
991 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
993 if not self.instance_names:
994 # No instances to evacuate
997 elif self.op.iallocator is not None:
998 # TODO: Implement relocation to other group
999 evac_mode = self._MODE2IALLOCATOR[self.op.mode]
1000 req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
1001 instances=list(self.instance_names))
1002 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1004 ial.Run(self.op.iallocator)
1007 raise errors.OpPrereqError("Can't compute node evacuation using"
1008 " iallocator '%s': %s" %
1009 (self.op.iallocator, ial.info),
1012 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1014 elif self.op.remote_node is not None:
1015 assert self.op.mode == constants.NODE_EVAC_SEC
1017 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1018 remote_node=self.op.remote_node,
1020 mode=constants.REPLACE_DISK_CHG,
1021 early_release=self.op.early_release)]
1022 for instance_name in self.instance_names]
1025 raise errors.ProgrammerError("No iallocator or remote node")
1027 return ResultWithJobs(jobs)
1030 class LUNodeMigrate(LogicalUnit):
1031 """Migrate all instances from a node.
1034 HPATH = "node-migrate"
1035 HTYPE = constants.HTYPE_NODE
1038 def CheckArguments(self):
1041 def ExpandNames(self):
1042 (self.op.node_uuid, self.op.node_name) = \
1043 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1045 self.share_locks = ShareAll()
1046 self.needed_locks = {
1047 locking.LEVEL_NODE: [self.op.node_uuid],
1050 def BuildHooksEnv(self):
1053 This runs on the master, the primary and all the secondaries.
1057 "NODE_NAME": self.op.node_name,
1058 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1061 def BuildHooksNodes(self):
1062 """Build hooks nodes.
1065 nl = [self.cfg.GetMasterNode()]
1068 def CheckPrereq(self):
1071 def Exec(self, feedback_fn):
1072 # Prepare jobs for migration instances
1074 [opcodes.OpInstanceMigrate(
1075 instance_name=inst.name,
1078 iallocator=self.op.iallocator,
1079 target_node=self.op.target_node,
1080 allow_runtime_changes=self.op.allow_runtime_changes,
1081 ignore_ipolicy=self.op.ignore_ipolicy)]
1082 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1084 # TODO: Run iallocator in this opcode and pass correct placement options to
1085 # OpInstanceMigrate. Since other jobs can modify the cluster between
1086 # running the iallocator and the actual migration, a good consistency model
1087 # will have to be found.
1089 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1090 frozenset([self.op.node_uuid]))
1092 return ResultWithJobs(jobs)
1095 def _GetStorageTypeArgs(cfg, storage_type):
1096 """Returns the arguments for a storage type.
1099 # Special case for file storage
1100 if storage_type == constants.ST_FILE:
1101 # storage.FileStorage wants a list of storage directories
1102 return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1107 class LUNodeModifyStorage(NoHooksLU):
1108 """Logical unit for modifying a storage volume on a node.
1113 def CheckArguments(self):
1114 (self.op.node_uuid, self.op.node_name) = \
1115 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1117 storage_type = self.op.storage_type
1120 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1122 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1123 " modified" % storage_type,
1126 diff = set(self.op.changes.keys()) - modifiable
1128 raise errors.OpPrereqError("The following fields can not be modified for"
1129 " storage units of type '%s': %r" %
1130 (storage_type, list(diff)),
1133 def ExpandNames(self):
1134 self.needed_locks = {
1135 locking.LEVEL_NODE: self.op.node_uuid,
1138 def Exec(self, feedback_fn):
1139 """Computes the list of nodes and their attributes.
1142 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1143 result = self.rpc.call_storage_modify(self.op.node_uuid,
1144 self.op.storage_type, st_args,
1145 self.op.name, self.op.changes)
1146 result.Raise("Failed to modify storage unit '%s' on %s" %
1147 (self.op.name, self.op.node_name))
1150 class NodeQuery(QueryBase):
1151 FIELDS = query.NODE_FIELDS
1153 def ExpandNames(self, lu):
1154 lu.needed_locks = {}
1155 lu.share_locks = ShareAll()
1158 (self.wanted, _) = GetWantedNodes(lu, self.names)
1160 self.wanted = locking.ALL_SET
1162 self.do_locking = (self.use_locking and
1163 query.NQ_LIVE in self.requested_data)
1166 # If any non-static field is requested we need to lock the nodes
1167 lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1168 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1170 def DeclareLocks(self, lu, level):
1173 def _GetQueryData(self, lu):
1174 """Computes the list of nodes and their attributes.
1177 all_info = lu.cfg.GetAllNodesInfo()
1179 node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1181 # Gather data as requested
1182 if query.NQ_LIVE in self.requested_data:
1183 # filter out non-vm_capable nodes
1184 toquery_node_uuids = [node.uuid for node in all_info.values()
1185 if node.vm_capable and node.uuid in node_uuids]
1186 # FIXME: this per default asks for storage space information for all
1187 # enabled disk templates. Fix this by making it possible to specify
1188 # space report fields for specific disk templates.
1189 raw_storage_units = utils.storage.GetStorageUnitsOfCluster(
1190 lu.cfg, include_spindles=True)
1191 storage_units = rpc.PrepareStorageUnitsForNodes(
1192 lu.cfg, raw_storage_units, toquery_node_uuids)
1193 lvm_enabled = utils.storage.IsLvmEnabled(
1194 lu.cfg.GetClusterInfo().enabled_disk_templates)
1195 default_hypervisor = lu.cfg.GetHypervisorType()
1196 hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1197 hvspecs = [(default_hypervisor, hvparams)]
1198 node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1201 (uuid, rpc.MakeLegacyNodeInfo(nresult.payload,
1202 require_vg_info=lvm_enabled))
1203 for (uuid, nresult) in node_data.items()
1204 if not nresult.fail_msg and nresult.payload)
1208 if query.NQ_INST in self.requested_data:
1209 node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1210 node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1212 inst_data = lu.cfg.GetAllInstancesInfo()
1213 inst_uuid_to_inst_name = {}
1215 for inst in inst_data.values():
1216 inst_uuid_to_inst_name[inst.uuid] = inst.name
1217 if inst.primary_node in node_to_primary:
1218 node_to_primary[inst.primary_node].add(inst.uuid)
1219 for secnode in inst.secondary_nodes:
1220 if secnode in node_to_secondary:
1221 node_to_secondary[secnode].add(inst.uuid)
1223 node_to_primary = None
1224 node_to_secondary = None
1225 inst_uuid_to_inst_name = None
1227 if query.NQ_OOB in self.requested_data:
1228 oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1229 for uuid, node in all_info.iteritems())
1233 if query.NQ_GROUP in self.requested_data:
1234 groups = lu.cfg.GetAllNodeGroupsInfo()
1238 return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1239 live_data, lu.cfg.GetMasterNode(),
1240 node_to_primary, node_to_secondary,
1241 inst_uuid_to_inst_name, groups, oob_support,
1242 lu.cfg.GetClusterInfo())
1245 class LUNodeQuery(NoHooksLU):
1246 """Logical unit for querying nodes.
1249 # pylint: disable=W0142
1252 def CheckArguments(self):
1253 self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1254 self.op.output_fields, self.op.use_locking)
1256 def ExpandNames(self):
1257 self.nq.ExpandNames(self)
1259 def DeclareLocks(self, level):
1260 self.nq.DeclareLocks(self, level)
1262 def Exec(self, feedback_fn):
1263 return self.nq.OldStyleQuery(self)
1266 def _CheckOutputFields(static, dynamic, selected):
1267 """Checks whether all selected fields are valid.
1269 @type static: L{utils.FieldSet}
1270 @param static: static fields set
1271 @type dynamic: L{utils.FieldSet}
1272 @param dynamic: dynamic fields set
1275 f = utils.FieldSet()
1279 delta = f.NonMatching(selected)
1281 raise errors.OpPrereqError("Unknown output fields selected: %s"
1282 % ",".join(delta), errors.ECODE_INVAL)
1285 class LUNodeQueryvols(NoHooksLU):
1286 """Logical unit for getting volumes on node(s).
1290 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1291 _FIELDS_STATIC = utils.FieldSet("node")
1293 def CheckArguments(self):
1294 _CheckOutputFields(static=self._FIELDS_STATIC,
1295 dynamic=self._FIELDS_DYNAMIC,
1296 selected=self.op.output_fields)
1298 def ExpandNames(self):
1299 self.share_locks = ShareAll()
1302 self.needed_locks = {
1303 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1306 self.needed_locks = {
1307 locking.LEVEL_NODE: locking.ALL_SET,
1308 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1311 def Exec(self, feedback_fn):
1312 """Computes the list of nodes and their attributes.
1315 node_uuids = self.owned_locks(locking.LEVEL_NODE)
1316 volumes = self.rpc.call_node_volumes(node_uuids)
1318 ilist = self.cfg.GetAllInstancesInfo()
1319 vol2inst = MapInstanceLvsToNodes(ilist.values())
1322 for node_uuid in node_uuids:
1323 nresult = volumes[node_uuid]
1326 msg = nresult.fail_msg
1328 self.LogWarning("Can't compute volume data on node %s: %s",
1329 self.cfg.GetNodeName(node_uuid), msg)
1332 node_vols = sorted(nresult.payload,
1333 key=operator.itemgetter("dev"))
1335 for vol in node_vols:
1337 for field in self.op.output_fields:
1339 val = self.cfg.GetNodeName(node_uuid)
1340 elif field == "phys":
1344 elif field == "name":
1346 elif field == "size":
1347 val = int(float(vol["size"]))
1348 elif field == "instance":
1349 inst = vol2inst.get((node_uuid, vol["vg"] + "/" + vol["name"]),
1351 if inst is not None:
1356 raise errors.ParameterError(field)
1357 node_output.append(str(val))
1359 output.append(node_output)
1364 class LUNodeQueryStorage(NoHooksLU):
1365 """Logical unit for getting information on storage units on node(s).
1368 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1371 def CheckArguments(self):
1372 _CheckOutputFields(static=self._FIELDS_STATIC,
1373 dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1374 selected=self.op.output_fields)
1376 def ExpandNames(self):
1377 self.share_locks = ShareAll()
1380 self.needed_locks = {
1381 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1384 self.needed_locks = {
1385 locking.LEVEL_NODE: locking.ALL_SET,
1386 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1389 def Exec(self, feedback_fn):
1390 """Computes the list of nodes and their attributes.
1393 self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1395 # Always get name to sort by
1396 if constants.SF_NAME in self.op.output_fields:
1397 fields = self.op.output_fields[:]
1399 fields = [constants.SF_NAME] + self.op.output_fields
1401 # Never ask for node or type as it's only known to the LU
1402 for extra in [constants.SF_NODE, constants.SF_TYPE]:
1403 while extra in fields:
1404 fields.remove(extra)
1406 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1407 name_idx = field_idx[constants.SF_NAME]
1409 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1410 data = self.rpc.call_storage_list(self.node_uuids,
1411 self.op.storage_type, st_args,
1412 self.op.name, fields)
1416 for node_uuid in utils.NiceSort(self.node_uuids):
1417 node_name = self.cfg.GetNodeName(node_uuid)
1418 nresult = data[node_uuid]
1422 msg = nresult.fail_msg
1424 self.LogWarning("Can't get storage data from node %s: %s",
1428 rows = dict([(row[name_idx], row) for row in nresult.payload])
1430 for name in utils.NiceSort(rows.keys()):
1435 for field in self.op.output_fields:
1436 if field == constants.SF_NODE:
1438 elif field == constants.SF_TYPE:
1439 val = self.op.storage_type
1440 elif field in field_idx:
1441 val = row[field_idx[field]]
1443 raise errors.ParameterError(field)
1452 class LUNodeRemove(LogicalUnit):
1453 """Logical unit for removing a node.
1456 HPATH = "node-remove"
1457 HTYPE = constants.HTYPE_NODE
1459 def BuildHooksEnv(self):
1464 "OP_TARGET": self.op.node_name,
1465 "NODE_NAME": self.op.node_name,
1468 def BuildHooksNodes(self):
1469 """Build hooks nodes.
1471 This doesn't run on the target node in the pre phase as a failed
1472 node would then be impossible to remove.
1475 all_nodes = self.cfg.GetNodeList()
1477 all_nodes.remove(self.op.node_uuid)
1480 return (all_nodes, all_nodes)
1482 def CheckPrereq(self):
1483 """Check prerequisites.
1486 - the node exists in the configuration
1487 - it does not have primary or secondary instances
1488 - it's not the master
1490 Any errors are signaled by raising errors.OpPrereqError.
1493 (self.op.node_uuid, self.op.node_name) = \
1494 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1495 node = self.cfg.GetNodeInfo(self.op.node_uuid)
1496 assert node is not None
1498 masternode = self.cfg.GetMasterNode()
1499 if node.uuid == masternode:
1500 raise errors.OpPrereqError("Node is the master node, failover to another"
1501 " node is required", errors.ECODE_INVAL)
1503 for _, instance in self.cfg.GetAllInstancesInfo().items():
1504 if node.uuid in instance.all_nodes:
1505 raise errors.OpPrereqError("Instance %s is still running on the node,"
1506 " please remove first" % instance.name,
1508 self.op.node_name = node.name
1511 def Exec(self, feedback_fn):
1512 """Removes the node from the cluster.
1515 logging.info("Stopping the node daemon and removing configs from node %s",
1518 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1520 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1523 # Promote nodes to master candidate as needed
1524 AdjustCandidatePool(self, exceptions=[self.node.uuid])
1525 self.context.RemoveNode(self.node)
1527 # Run post hooks on the node before it's removed
1528 RunPostHook(self, self.node.name)
1530 # we have to call this by name rather than by UUID, as the node is no longer
1532 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1533 msg = result.fail_msg
1535 self.LogWarning("Errors encountered on the remote node while leaving"
1536 " the cluster: %s", msg)
1538 # Remove node from our /etc/hosts
1539 if self.cfg.GetClusterInfo().modify_etc_hosts:
1540 master_node_uuid = self.cfg.GetMasterNode()
1541 result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1542 constants.ETC_HOSTS_REMOVE,
1543 self.node.name, None)
1544 result.Raise("Can't update hosts file with new host data")
1545 RedistributeAncillaryFiles(self)
1548 class LURepairNodeStorage(NoHooksLU):
1549 """Repairs the volume group on a node.
1554 def CheckArguments(self):
1555 (self.op.node_uuid, self.op.node_name) = \
1556 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1558 storage_type = self.op.storage_type
1560 if (constants.SO_FIX_CONSISTENCY not in
1561 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1562 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1563 " repaired" % storage_type,
1566 def ExpandNames(self):
1567 self.needed_locks = {
1568 locking.LEVEL_NODE: [self.op.node_uuid],
1571 def _CheckFaultyDisks(self, instance, node_uuid):
1572 """Ensure faulty disks abort the opcode or at least warn."""
1574 if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1576 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1579 self.cfg.GetNodeName(node_uuid)),
1581 except errors.OpPrereqError, err:
1582 if self.op.ignore_consistency:
1583 self.LogWarning(str(err.args[0]))
1587 def CheckPrereq(self):
1588 """Check prerequisites.
1591 # Check whether any instance on this node has faulty disks
1592 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1593 if not inst.disks_active:
1595 check_nodes = set(inst.all_nodes)
1596 check_nodes.discard(self.op.node_uuid)
1597 for inst_node_uuid in check_nodes:
1598 self._CheckFaultyDisks(inst, inst_node_uuid)
1600 def Exec(self, feedback_fn):
1601 feedback_fn("Repairing storage unit '%s' on %s ..." %
1602 (self.op.name, self.op.node_name))
1604 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1605 result = self.rpc.call_storage_execute(self.op.node_uuid,
1606 self.op.storage_type, st_args,
1608 constants.SO_FIX_CONSISTENCY)
1609 result.Raise("Failed to repair storage unit '%s' on %s" %
1610 (self.op.name, self.op.node_name))