Replace literals with constants
[ganeti-local] / lib / cmdlib / node.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with nodes."""
23
24 import logging
25 import operator
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti import netutils
31 from ganeti import objects
32 from ganeti import opcodes
33 from ganeti import qlang
34 from ganeti import query
35 from ganeti import rpc
36 from ganeti import utils
37 from ganeti.masterd import iallocator
38
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40   ResultWithJobs
41 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42   MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43   IsExclusiveStorageEnabledNode, CheckNodePVs, \
44   RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45   CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46   AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47   GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48   FindFaultyInstanceDisks, CheckStorageTypeEnabled
49
50
51 def _DecideSelfPromotion(lu, exceptions=None):
52   """Decide whether I should promote myself as a master candidate.
53
54   """
55   cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56   mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57   # the new node will increase mc_max with one, so:
58   mc_should = min(mc_should + 1, cp_size)
59   return mc_now < mc_should
60
61
62 def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63   """Ensure that a node has the given secondary ip.
64
65   @type lu: L{LogicalUnit}
66   @param lu: the LU on behalf of which we make the check
67   @type node: L{objects.Node}
68   @param node: the node to check
69   @type secondary_ip: string
70   @param secondary_ip: the ip to check
71   @type prereq: boolean
72   @param prereq: whether to throw a prerequisite or an execute error
73   @raise errors.OpPrereqError: if the node doesn't have the ip,
74   and prereq=True
75   @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76
77   """
78   # this can be called with a new node, which has no UUID yet, so perform the
79   # RPC call using its name
80   result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81   result.Raise("Failure checking secondary ip on node %s" % node.name,
82                prereq=prereq, ecode=errors.ECODE_ENVIRON)
83   if not result.payload:
84     msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85            " please fix and re-run this command" % secondary_ip)
86     if prereq:
87       raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88     else:
89       raise errors.OpExecError(msg)
90
91
92 class LUNodeAdd(LogicalUnit):
93   """Logical unit for adding node to the cluster.
94
95   """
96   HPATH = "node-add"
97   HTYPE = constants.HTYPE_NODE
98   _NFLAGS = ["master_capable", "vm_capable"]
99
100   def CheckArguments(self):
101     self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102     # validate/normalize the node name
103     self.hostname = netutils.GetHostname(name=self.op.node_name,
104                                          family=self.primary_ip_family)
105     self.op.node_name = self.hostname.name
106
107     if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108       raise errors.OpPrereqError("Cannot readd the master node",
109                                  errors.ECODE_STATE)
110
111     if self.op.readd and self.op.group:
112       raise errors.OpPrereqError("Cannot pass a node group when a node is"
113                                  " being readded", errors.ECODE_INVAL)
114
115     # OpenvSwitch: Warn user if link is missing
116     if (self.op.ndparams[constants.ND_OVS] and not
117         self.op.ndparams[constants.ND_OVS_LINK]):
118       self.LogInfo("No physical interface for OpenvSwitch was given."
119                    " OpenvSwitch will not have an outside connection. This"
120                    " might not be what you want.")
121     # OpenvSwitch: Fail if parameters are given, but OVS is not enabled.
122     if (not self.op.ndparams[constants.ND_OVS] and
123         (self.op.ndparams[constants.ND_OVS_NAME] or
124          self.op.ndparams[constants.ND_OVS_LINK])):
125       raise errors.OpPrereqError("OpenvSwitch name or link were given, but"
126                                  " OpenvSwitch is not enabled. Please enable"
127                                  " OpenvSwitch with --ovs", errors.ECODE_INVAL)
128
129   def BuildHooksEnv(self):
130     """Build hooks env.
131
132     This will run on all nodes before, and on all nodes + the new node after.
133
134     """
135     return {
136       "OP_TARGET": self.op.node_name,
137       "NODE_NAME": self.op.node_name,
138       "NODE_PIP": self.op.primary_ip,
139       "NODE_SIP": self.op.secondary_ip,
140       "MASTER_CAPABLE": str(self.op.master_capable),
141       "VM_CAPABLE": str(self.op.vm_capable),
142       }
143
144   def BuildHooksNodes(self):
145     """Build hooks nodes.
146
147     """
148     hook_nodes = self.cfg.GetNodeList()
149     new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
150     if new_node_info is not None:
151       # Exclude added node
152       hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
153
154     # add the new node as post hook node by name; it does not have an UUID yet
155     return (hook_nodes, hook_nodes, [self.op.node_name, ])
156
157   def CheckPrereq(self):
158     """Check prerequisites.
159
160     This checks:
161      - the new node is not already in the config
162      - it is resolvable
163      - its parameters (single/dual homed) matches the cluster
164
165     Any errors are signaled by raising errors.OpPrereqError.
166
167     """
168     node_name = self.hostname.name
169     self.op.primary_ip = self.hostname.ip
170     if self.op.secondary_ip is None:
171       if self.primary_ip_family == netutils.IP6Address.family:
172         raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
173                                    " IPv4 address must be given as secondary",
174                                    errors.ECODE_INVAL)
175       self.op.secondary_ip = self.op.primary_ip
176
177     secondary_ip = self.op.secondary_ip
178     if not netutils.IP4Address.IsValid(secondary_ip):
179       raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
180                                  " address" % secondary_ip, errors.ECODE_INVAL)
181
182     existing_node_info = self.cfg.GetNodeInfoByName(node_name)
183     if not self.op.readd and existing_node_info is not None:
184       raise errors.OpPrereqError("Node %s is already in the configuration" %
185                                  node_name, errors.ECODE_EXISTS)
186     elif self.op.readd and existing_node_info is None:
187       raise errors.OpPrereqError("Node %s is not in the configuration" %
188                                  node_name, errors.ECODE_NOENT)
189
190     self.changed_primary_ip = False
191
192     for existing_node in self.cfg.GetAllNodesInfo().values():
193       if self.op.readd and node_name == existing_node.name:
194         if existing_node.secondary_ip != secondary_ip:
195           raise errors.OpPrereqError("Readded node doesn't have the same IP"
196                                      " address configuration as before",
197                                      errors.ECODE_INVAL)
198         if existing_node.primary_ip != self.op.primary_ip:
199           self.changed_primary_ip = True
200
201         continue
202
203       if (existing_node.primary_ip == self.op.primary_ip or
204           existing_node.secondary_ip == self.op.primary_ip or
205           existing_node.primary_ip == secondary_ip or
206           existing_node.secondary_ip == secondary_ip):
207         raise errors.OpPrereqError("New node ip address(es) conflict with"
208                                    " existing node %s" % existing_node.name,
209                                    errors.ECODE_NOTUNIQUE)
210
211     # After this 'if' block, None is no longer a valid value for the
212     # _capable op attributes
213     if self.op.readd:
214       assert existing_node_info is not None, \
215         "Can't retrieve locked node %s" % node_name
216       for attr in self._NFLAGS:
217         if getattr(self.op, attr) is None:
218           setattr(self.op, attr, getattr(existing_node_info, attr))
219     else:
220       for attr in self._NFLAGS:
221         if getattr(self.op, attr) is None:
222           setattr(self.op, attr, True)
223
224     if self.op.readd and not self.op.vm_capable:
225       pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
226       if pri or sec:
227         raise errors.OpPrereqError("Node %s being re-added with vm_capable"
228                                    " flag set to false, but it already holds"
229                                    " instances" % node_name,
230                                    errors.ECODE_STATE)
231
232     # check that the type of the node (single versus dual homed) is the
233     # same as for the master
234     myself = self.cfg.GetMasterNodeInfo()
235     master_singlehomed = myself.secondary_ip == myself.primary_ip
236     newbie_singlehomed = secondary_ip == self.op.primary_ip
237     if master_singlehomed != newbie_singlehomed:
238       if master_singlehomed:
239         raise errors.OpPrereqError("The master has no secondary ip but the"
240                                    " new node has one",
241                                    errors.ECODE_INVAL)
242       else:
243         raise errors.OpPrereqError("The master has a secondary ip but the"
244                                    " new node doesn't have one",
245                                    errors.ECODE_INVAL)
246
247     # checks reachability
248     if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
249       raise errors.OpPrereqError("Node not reachable by ping",
250                                  errors.ECODE_ENVIRON)
251
252     if not newbie_singlehomed:
253       # check reachability from my secondary ip to newbie's secondary ip
254       if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
255                               source=myself.secondary_ip):
256         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
257                                    " based ping to node daemon port",
258                                    errors.ECODE_ENVIRON)
259
260     if self.op.readd:
261       exceptions = [existing_node_info.uuid]
262     else:
263       exceptions = []
264
265     if self.op.master_capable:
266       self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
267     else:
268       self.master_candidate = False
269
270     if self.op.readd:
271       self.new_node = existing_node_info
272     else:
273       node_group = self.cfg.LookupNodeGroup(self.op.group)
274       self.new_node = objects.Node(name=node_name,
275                                    primary_ip=self.op.primary_ip,
276                                    secondary_ip=secondary_ip,
277                                    master_candidate=self.master_candidate,
278                                    offline=False, drained=False,
279                                    group=node_group, ndparams={})
280
281     if self.op.ndparams:
282       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
283       CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
284                            "node", "cluster or group")
285
286     if self.op.hv_state:
287       self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
288
289     if self.op.disk_state:
290       self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
291
292     # TODO: If we need to have multiple DnsOnlyRunner we probably should make
293     #       it a property on the base class.
294     rpcrunner = rpc.DnsOnlyRunner()
295     result = rpcrunner.call_version([node_name])[node_name]
296     result.Raise("Can't get version information from node %s" % node_name)
297     if constants.PROTOCOL_VERSION == result.payload:
298       logging.info("Communication to node %s fine, sw version %s match",
299                    node_name, result.payload)
300     else:
301       raise errors.OpPrereqError("Version mismatch master version %s,"
302                                  " node version %s" %
303                                  (constants.PROTOCOL_VERSION, result.payload),
304                                  errors.ECODE_ENVIRON)
305
306     vg_name = self.cfg.GetVGName()
307     if vg_name is not None:
308       vparams = {constants.NV_PVLIST: [vg_name]}
309       excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
310       cname = self.cfg.GetClusterName()
311       result = rpcrunner.call_node_verify_light(
312           [node_name], vparams, cname,
313           self.cfg.GetClusterInfo().hvparams)[node_name]
314       (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
315       if errmsgs:
316         raise errors.OpPrereqError("Checks on node PVs failed: %s" %
317                                    "; ".join(errmsgs), errors.ECODE_ENVIRON)
318
319   def Exec(self, feedback_fn):
320     """Adds the new node to the cluster.
321
322     """
323     assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
324       "Not owning BGL"
325
326     # We adding a new node so we assume it's powered
327     self.new_node.powered = True
328
329     # for re-adds, reset the offline/drained/master-candidate flags;
330     # we need to reset here, otherwise offline would prevent RPC calls
331     # later in the procedure; this also means that if the re-add
332     # fails, we are left with a non-offlined, broken node
333     if self.op.readd:
334       self.new_node.drained = False
335       self.LogInfo("Readding a node, the offline/drained flags were reset")
336       # if we demote the node, we do cleanup later in the procedure
337       self.new_node.master_candidate = self.master_candidate
338       if self.changed_primary_ip:
339         self.new_node.primary_ip = self.op.primary_ip
340
341     # copy the master/vm_capable flags
342     for attr in self._NFLAGS:
343       setattr(self.new_node, attr, getattr(self.op, attr))
344
345     # notify the user about any possible mc promotion
346     if self.new_node.master_candidate:
347       self.LogInfo("Node will be a master candidate")
348
349     if self.op.ndparams:
350       self.new_node.ndparams = self.op.ndparams
351     else:
352       self.new_node.ndparams = {}
353
354     if self.op.hv_state:
355       self.new_node.hv_state_static = self.new_hv_state
356
357     if self.op.disk_state:
358       self.new_node.disk_state_static = self.new_disk_state
359
360     # Add node to our /etc/hosts, and add key to known_hosts
361     if self.cfg.GetClusterInfo().modify_etc_hosts:
362       master_node = self.cfg.GetMasterNode()
363       result = self.rpc.call_etc_hosts_modify(
364                  master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
365                  self.hostname.ip)
366       result.Raise("Can't update hosts file with new host data")
367
368     if self.new_node.secondary_ip != self.new_node.primary_ip:
369       _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
370                                False)
371
372     node_verifier_uuids = [self.cfg.GetMasterNode()]
373     node_verify_param = {
374       constants.NV_NODELIST: ([self.new_node.name], {}),
375       # TODO: do a node-net-test as well?
376     }
377
378     result = self.rpc.call_node_verify(
379                node_verifier_uuids, node_verify_param,
380                self.cfg.GetClusterName(),
381                self.cfg.GetClusterInfo().hvparams)
382     for verifier in node_verifier_uuids:
383       result[verifier].Raise("Cannot communicate with node %s" % verifier)
384       nl_payload = result[verifier].payload[constants.NV_NODELIST]
385       if nl_payload:
386         for failed in nl_payload:
387           feedback_fn("ssh/hostname verification failed"
388                       " (checking from %s): %s" %
389                       (verifier, nl_payload[failed]))
390         raise errors.OpExecError("ssh/hostname verification failed")
391
392     # OpenvSwitch initialization on the node
393     if self.new_node.ndparams[constants.ND_OVS]:
394       result = self.rpc.call_node_configure_ovs(
395                  self.new_node.name,
396                  self.new_node.ndparams[constants.ND_OVS_NAME],
397                  self.new_node.ndparams[constants.ND_OVS_LINK])
398
399     if self.op.readd:
400       self.context.ReaddNode(self.new_node)
401       RedistributeAncillaryFiles(self)
402       # make sure we redistribute the config
403       self.cfg.Update(self.new_node, feedback_fn)
404       # and make sure the new node will not have old files around
405       if not self.new_node.master_candidate:
406         result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
407         result.Warn("Node failed to demote itself from master candidate status",
408                     self.LogWarning)
409     else:
410       self.context.AddNode(self.new_node, self.proc.GetECId())
411       RedistributeAncillaryFiles(self)
412
413
414 class LUNodeSetParams(LogicalUnit):
415   """Modifies the parameters of a node.
416
417   @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
418       to the node role (as _ROLE_*)
419   @cvar _R2F: a dictionary from node role to tuples of flags
420   @cvar _FLAGS: a list of attribute names corresponding to the flags
421
422   """
423   HPATH = "node-modify"
424   HTYPE = constants.HTYPE_NODE
425   REQ_BGL = False
426   (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
427   _F2R = {
428     (True, False, False): _ROLE_CANDIDATE,
429     (False, True, False): _ROLE_DRAINED,
430     (False, False, True): _ROLE_OFFLINE,
431     (False, False, False): _ROLE_REGULAR,
432     }
433   _R2F = dict((v, k) for k, v in _F2R.items())
434   _FLAGS = ["master_candidate", "drained", "offline"]
435
436   def CheckArguments(self):
437     (self.op.node_uuid, self.op.node_name) = \
438       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
439     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
440                 self.op.master_capable, self.op.vm_capable,
441                 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
442                 self.op.disk_state]
443     if all_mods.count(None) == len(all_mods):
444       raise errors.OpPrereqError("Please pass at least one modification",
445                                  errors.ECODE_INVAL)
446     if all_mods.count(True) > 1:
447       raise errors.OpPrereqError("Can't set the node into more than one"
448                                  " state at the same time",
449                                  errors.ECODE_INVAL)
450
451     # Boolean value that tells us whether we might be demoting from MC
452     self.might_demote = (self.op.master_candidate is False or
453                          self.op.offline is True or
454                          self.op.drained is True or
455                          self.op.master_capable is False)
456
457     if self.op.secondary_ip:
458       if not netutils.IP4Address.IsValid(self.op.secondary_ip):
459         raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
460                                    " address" % self.op.secondary_ip,
461                                    errors.ECODE_INVAL)
462
463     self.lock_all = self.op.auto_promote and self.might_demote
464     self.lock_instances = self.op.secondary_ip is not None
465
466   def _InstanceFilter(self, instance):
467     """Filter for getting affected instances.
468
469     """
470     return (instance.disk_template in constants.DTS_INT_MIRROR and
471             self.op.node_uuid in instance.all_nodes)
472
473   def ExpandNames(self):
474     if self.lock_all:
475       self.needed_locks = {
476         locking.LEVEL_NODE: locking.ALL_SET,
477
478         # Block allocations when all nodes are locked
479         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
480         }
481     else:
482       self.needed_locks = {
483         locking.LEVEL_NODE: self.op.node_uuid,
484         }
485
486     # Since modifying a node can have severe effects on currently running
487     # operations the resource lock is at least acquired in shared mode
488     self.needed_locks[locking.LEVEL_NODE_RES] = \
489       self.needed_locks[locking.LEVEL_NODE]
490
491     # Get all locks except nodes in shared mode; they are not used for anything
492     # but read-only access
493     self.share_locks = ShareAll()
494     self.share_locks[locking.LEVEL_NODE] = 0
495     self.share_locks[locking.LEVEL_NODE_RES] = 0
496     self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
497
498     if self.lock_instances:
499       self.needed_locks[locking.LEVEL_INSTANCE] = \
500         self.cfg.GetInstanceNames(
501           self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
502
503   def BuildHooksEnv(self):
504     """Build hooks env.
505
506     This runs on the master node.
507
508     """
509     return {
510       "OP_TARGET": self.op.node_name,
511       "MASTER_CANDIDATE": str(self.op.master_candidate),
512       "OFFLINE": str(self.op.offline),
513       "DRAINED": str(self.op.drained),
514       "MASTER_CAPABLE": str(self.op.master_capable),
515       "VM_CAPABLE": str(self.op.vm_capable),
516       }
517
518   def BuildHooksNodes(self):
519     """Build hooks nodes.
520
521     """
522     nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
523     return (nl, nl)
524
525   def CheckPrereq(self):
526     """Check prerequisites.
527
528     This only checks the instance list against the existing names.
529
530     """
531     node = self.cfg.GetNodeInfo(self.op.node_uuid)
532     if self.lock_instances:
533       affected_instances = \
534         self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
535
536       # Verify instance locks
537       owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
538       wanted_instance_names = frozenset([inst.name for inst in
539                                          affected_instances.values()])
540       if wanted_instance_names - owned_instance_names:
541         raise errors.OpPrereqError("Instances affected by changing node %s's"
542                                    " secondary IP address have changed since"
543                                    " locks were acquired, wanted '%s', have"
544                                    " '%s'; retry the operation" %
545                                    (node.name,
546                                     utils.CommaJoin(wanted_instance_names),
547                                     utils.CommaJoin(owned_instance_names)),
548                                    errors.ECODE_STATE)
549     else:
550       affected_instances = None
551
552     if (self.op.master_candidate is not None or
553         self.op.drained is not None or
554         self.op.offline is not None):
555       # we can't change the master's node flags
556       if node.uuid == self.cfg.GetMasterNode():
557         raise errors.OpPrereqError("The master role can be changed"
558                                    " only via master-failover",
559                                    errors.ECODE_INVAL)
560
561     if self.op.master_candidate and not node.master_capable:
562       raise errors.OpPrereqError("Node %s is not master capable, cannot make"
563                                  " it a master candidate" % node.name,
564                                  errors.ECODE_STATE)
565
566     if self.op.vm_capable is False:
567       (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
568       if ipri or isec:
569         raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
570                                    " the vm_capable flag" % node.name,
571                                    errors.ECODE_STATE)
572
573     if node.master_candidate and self.might_demote and not self.lock_all:
574       assert not self.op.auto_promote, "auto_promote set but lock_all not"
575       # check if after removing the current node, we're missing master
576       # candidates
577       (mc_remaining, mc_should, _) = \
578           self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
579       if mc_remaining < mc_should:
580         raise errors.OpPrereqError("Not enough master candidates, please"
581                                    " pass auto promote option to allow"
582                                    " promotion (--auto-promote or RAPI"
583                                    " auto_promote=True)", errors.ECODE_STATE)
584
585     self.old_flags = old_flags = (node.master_candidate,
586                                   node.drained, node.offline)
587     assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
588     self.old_role = old_role = self._F2R[old_flags]
589
590     # Check for ineffective changes
591     for attr in self._FLAGS:
592       if getattr(self.op, attr) is False and getattr(node, attr) is False:
593         self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
594         setattr(self.op, attr, None)
595
596     # Past this point, any flag change to False means a transition
597     # away from the respective state, as only real changes are kept
598
599     # TODO: We might query the real power state if it supports OOB
600     if SupportsOob(self.cfg, node):
601       if self.op.offline is False and not (node.powered or
602                                            self.op.powered is True):
603         raise errors.OpPrereqError(("Node %s needs to be turned on before its"
604                                     " offline status can be reset") %
605                                    self.op.node_name, errors.ECODE_STATE)
606     elif self.op.powered is not None:
607       raise errors.OpPrereqError(("Unable to change powered state for node %s"
608                                   " as it does not support out-of-band"
609                                   " handling") % self.op.node_name,
610                                  errors.ECODE_STATE)
611
612     # If we're being deofflined/drained, we'll MC ourself if needed
613     if (self.op.drained is False or self.op.offline is False or
614         (self.op.master_capable and not node.master_capable)):
615       if _DecideSelfPromotion(self):
616         self.op.master_candidate = True
617         self.LogInfo("Auto-promoting node to master candidate")
618
619     # If we're no longer master capable, we'll demote ourselves from MC
620     if self.op.master_capable is False and node.master_candidate:
621       self.LogInfo("Demoting from master candidate")
622       self.op.master_candidate = False
623
624     # Compute new role
625     assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
626     if self.op.master_candidate:
627       new_role = self._ROLE_CANDIDATE
628     elif self.op.drained:
629       new_role = self._ROLE_DRAINED
630     elif self.op.offline:
631       new_role = self._ROLE_OFFLINE
632     elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
633       # False is still in new flags, which means we're un-setting (the
634       # only) True flag
635       new_role = self._ROLE_REGULAR
636     else: # no new flags, nothing, keep old role
637       new_role = old_role
638
639     self.new_role = new_role
640
641     if old_role == self._ROLE_OFFLINE and new_role != old_role:
642       # Trying to transition out of offline status
643       result = self.rpc.call_version([node.uuid])[node.uuid]
644       if result.fail_msg:
645         raise errors.OpPrereqError("Node %s is being de-offlined but fails"
646                                    " to report its version: %s" %
647                                    (node.name, result.fail_msg),
648                                    errors.ECODE_STATE)
649       else:
650         self.LogWarning("Transitioning node from offline to online state"
651                         " without using re-add. Please make sure the node"
652                         " is healthy!")
653
654     # When changing the secondary ip, verify if this is a single-homed to
655     # multi-homed transition or vice versa, and apply the relevant
656     # restrictions.
657     if self.op.secondary_ip:
658       # Ok even without locking, because this can't be changed by any LU
659       master = self.cfg.GetMasterNodeInfo()
660       master_singlehomed = master.secondary_ip == master.primary_ip
661       if master_singlehomed and self.op.secondary_ip != node.primary_ip:
662         if self.op.force and node.uuid == master.uuid:
663           self.LogWarning("Transitioning from single-homed to multi-homed"
664                           " cluster; all nodes will require a secondary IP"
665                           " address")
666         else:
667           raise errors.OpPrereqError("Changing the secondary ip on a"
668                                      " single-homed cluster requires the"
669                                      " --force option to be passed, and the"
670                                      " target node to be the master",
671                                      errors.ECODE_INVAL)
672       elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
673         if self.op.force and node.uuid == master.uuid:
674           self.LogWarning("Transitioning from multi-homed to single-homed"
675                           " cluster; secondary IP addresses will have to be"
676                           " removed")
677         else:
678           raise errors.OpPrereqError("Cannot set the secondary IP to be the"
679                                      " same as the primary IP on a multi-homed"
680                                      " cluster, unless the --force option is"
681                                      " passed, and the target node is the"
682                                      " master", errors.ECODE_INVAL)
683
684       assert not (set([inst.name for inst in affected_instances.values()]) -
685                   self.owned_locks(locking.LEVEL_INSTANCE))
686
687       if node.offline:
688         if affected_instances:
689           msg = ("Cannot change secondary IP address: offline node has"
690                  " instances (%s) configured to use it" %
691                  utils.CommaJoin(
692                    [inst.name for inst in affected_instances.values()]))
693           raise errors.OpPrereqError(msg, errors.ECODE_STATE)
694       else:
695         # On online nodes, check that no instances are running, and that
696         # the node has the new ip and we can reach it.
697         for instance in affected_instances.values():
698           CheckInstanceState(self, instance, INSTANCE_DOWN,
699                              msg="cannot change secondary ip")
700
701         _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
702         if master.uuid != node.uuid:
703           # check reachability from master secondary ip to new secondary ip
704           if not netutils.TcpPing(self.op.secondary_ip,
705                                   constants.DEFAULT_NODED_PORT,
706                                   source=master.secondary_ip):
707             raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
708                                        " based ping to node daemon port",
709                                        errors.ECODE_ENVIRON)
710
711     if self.op.ndparams:
712       new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
713       utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
714       CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
715                            "node", "cluster or group")
716       self.new_ndparams = new_ndparams
717
718     if self.op.hv_state:
719       self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
720                                                 node.hv_state_static)
721
722     if self.op.disk_state:
723       self.new_disk_state = \
724         MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
725
726   def Exec(self, feedback_fn):
727     """Modifies a node.
728
729     """
730     node = self.cfg.GetNodeInfo(self.op.node_uuid)
731     result = []
732
733     if self.op.ndparams:
734       node.ndparams = self.new_ndparams
735
736     if self.op.powered is not None:
737       node.powered = self.op.powered
738
739     if self.op.hv_state:
740       node.hv_state_static = self.new_hv_state
741
742     if self.op.disk_state:
743       node.disk_state_static = self.new_disk_state
744
745     for attr in ["master_capable", "vm_capable"]:
746       val = getattr(self.op, attr)
747       if val is not None:
748         setattr(node, attr, val)
749         result.append((attr, str(val)))
750
751     if self.new_role != self.old_role:
752       # Tell the node to demote itself, if no longer MC and not offline
753       if self.old_role == self._ROLE_CANDIDATE and \
754           self.new_role != self._ROLE_OFFLINE:
755         msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
756         if msg:
757           self.LogWarning("Node failed to demote itself: %s", msg)
758
759       new_flags = self._R2F[self.new_role]
760       for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
761         if of != nf:
762           result.append((desc, str(nf)))
763       (node.master_candidate, node.drained, node.offline) = new_flags
764
765       # we locked all nodes, we adjust the CP before updating this node
766       if self.lock_all:
767         AdjustCandidatePool(self, [node.uuid])
768
769     if self.op.secondary_ip:
770       node.secondary_ip = self.op.secondary_ip
771       result.append(("secondary_ip", self.op.secondary_ip))
772
773     # this will trigger configuration file update, if needed
774     self.cfg.Update(node, feedback_fn)
775
776     # this will trigger job queue propagation or cleanup if the mc
777     # flag changed
778     if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
779       self.context.ReaddNode(node)
780
781     return result
782
783
784 class LUNodePowercycle(NoHooksLU):
785   """Powercycles a node.
786
787   """
788   REQ_BGL = False
789
790   def CheckArguments(self):
791     (self.op.node_uuid, self.op.node_name) = \
792       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
793
794     if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
795       raise errors.OpPrereqError("The node is the master and the force"
796                                  " parameter was not set",
797                                  errors.ECODE_INVAL)
798
799   def ExpandNames(self):
800     """Locking for PowercycleNode.
801
802     This is a last-resort option and shouldn't block on other
803     jobs. Therefore, we grab no locks.
804
805     """
806     self.needed_locks = {}
807
808   def Exec(self, feedback_fn):
809     """Reboots a node.
810
811     """
812     default_hypervisor = self.cfg.GetHypervisorType()
813     hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
814     result = self.rpc.call_node_powercycle(self.op.node_uuid,
815                                            default_hypervisor,
816                                            hvparams)
817     result.Raise("Failed to schedule the reboot")
818     return result.payload
819
820
821 def _GetNodeInstancesInner(cfg, fn):
822   return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
823
824
825 def _GetNodePrimaryInstances(cfg, node_uuid):
826   """Returns primary instances on a node.
827
828   """
829   return _GetNodeInstancesInner(cfg,
830                                 lambda inst: node_uuid == inst.primary_node)
831
832
833 def _GetNodeSecondaryInstances(cfg, node_uuid):
834   """Returns secondary instances on a node.
835
836   """
837   return _GetNodeInstancesInner(cfg,
838                                 lambda inst: node_uuid in inst.secondary_nodes)
839
840
841 def _GetNodeInstances(cfg, node_uuid):
842   """Returns a list of all primary and secondary instances on a node.
843
844   """
845
846   return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
847
848
849 class LUNodeEvacuate(NoHooksLU):
850   """Evacuates instances off a list of nodes.
851
852   """
853   REQ_BGL = False
854
855   _MODE2IALLOCATOR = {
856     constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
857     constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
858     constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
859     }
860   assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
861   assert (frozenset(_MODE2IALLOCATOR.values()) ==
862           constants.IALLOCATOR_NEVAC_MODES)
863
864   def CheckArguments(self):
865     CheckIAllocatorOrNode(self, "iallocator", "remote_node")
866
867   def ExpandNames(self):
868     (self.op.node_uuid, self.op.node_name) = \
869       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
870
871     if self.op.remote_node is not None:
872       (self.op.remote_node_uuid, self.op.remote_node) = \
873         ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
874                               self.op.remote_node)
875       assert self.op.remote_node
876
877       if self.op.node_uuid == self.op.remote_node_uuid:
878         raise errors.OpPrereqError("Can not use evacuated node as a new"
879                                    " secondary node", errors.ECODE_INVAL)
880
881       if self.op.mode != constants.NODE_EVAC_SEC:
882         raise errors.OpPrereqError("Without the use of an iallocator only"
883                                    " secondary instances can be evacuated",
884                                    errors.ECODE_INVAL)
885
886     # Declare locks
887     self.share_locks = ShareAll()
888     self.needed_locks = {
889       locking.LEVEL_INSTANCE: [],
890       locking.LEVEL_NODEGROUP: [],
891       locking.LEVEL_NODE: [],
892       }
893
894     # Determine nodes (via group) optimistically, needs verification once locks
895     # have been acquired
896     self.lock_nodes = self._DetermineNodes()
897
898   def _DetermineNodes(self):
899     """Gets the list of node UUIDs to operate on.
900
901     """
902     if self.op.remote_node is None:
903       # Iallocator will choose any node(s) in the same group
904       group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
905     else:
906       group_nodes = frozenset([self.op.remote_node_uuid])
907
908     # Determine nodes to be locked
909     return set([self.op.node_uuid]) | group_nodes
910
911   def _DetermineInstances(self):
912     """Builds list of instances to operate on.
913
914     """
915     assert self.op.mode in constants.NODE_EVAC_MODES
916
917     if self.op.mode == constants.NODE_EVAC_PRI:
918       # Primary instances only
919       inst_fn = _GetNodePrimaryInstances
920       assert self.op.remote_node is None, \
921         "Evacuating primary instances requires iallocator"
922     elif self.op.mode == constants.NODE_EVAC_SEC:
923       # Secondary instances only
924       inst_fn = _GetNodeSecondaryInstances
925     else:
926       # All instances
927       assert self.op.mode == constants.NODE_EVAC_ALL
928       inst_fn = _GetNodeInstances
929       # TODO: In 2.6, change the iallocator interface to take an evacuation mode
930       # per instance
931       raise errors.OpPrereqError("Due to an issue with the iallocator"
932                                  " interface it is not possible to evacuate"
933                                  " all instances at once; specify explicitly"
934                                  " whether to evacuate primary or secondary"
935                                  " instances",
936                                  errors.ECODE_INVAL)
937
938     return inst_fn(self.cfg, self.op.node_uuid)
939
940   def DeclareLocks(self, level):
941     if level == locking.LEVEL_INSTANCE:
942       # Lock instances optimistically, needs verification once node and group
943       # locks have been acquired
944       self.needed_locks[locking.LEVEL_INSTANCE] = \
945         set(i.name for i in self._DetermineInstances())
946
947     elif level == locking.LEVEL_NODEGROUP:
948       # Lock node groups for all potential target nodes optimistically, needs
949       # verification once nodes have been acquired
950       self.needed_locks[locking.LEVEL_NODEGROUP] = \
951         self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
952
953     elif level == locking.LEVEL_NODE:
954       self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
955
956   def CheckPrereq(self):
957     # Verify locks
958     owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
959     owned_nodes = self.owned_locks(locking.LEVEL_NODE)
960     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
961
962     need_nodes = self._DetermineNodes()
963
964     if not owned_nodes.issuperset(need_nodes):
965       raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
966                                  " locks were acquired, current nodes are"
967                                  " are '%s', used to be '%s'; retry the"
968                                  " operation" %
969                                  (self.op.node_name,
970                                   utils.CommaJoin(need_nodes),
971                                   utils.CommaJoin(owned_nodes)),
972                                  errors.ECODE_STATE)
973
974     wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
975     if owned_groups != wanted_groups:
976       raise errors.OpExecError("Node groups changed since locks were acquired,"
977                                " current groups are '%s', used to be '%s';"
978                                " retry the operation" %
979                                (utils.CommaJoin(wanted_groups),
980                                 utils.CommaJoin(owned_groups)))
981
982     # Determine affected instances
983     self.instances = self._DetermineInstances()
984     self.instance_names = [i.name for i in self.instances]
985
986     if set(self.instance_names) != owned_instance_names:
987       raise errors.OpExecError("Instances on node '%s' changed since locks"
988                                " were acquired, current instances are '%s',"
989                                " used to be '%s'; retry the operation" %
990                                (self.op.node_name,
991                                 utils.CommaJoin(self.instance_names),
992                                 utils.CommaJoin(owned_instance_names)))
993
994     if self.instance_names:
995       self.LogInfo("Evacuating instances from node '%s': %s",
996                    self.op.node_name,
997                    utils.CommaJoin(utils.NiceSort(self.instance_names)))
998     else:
999       self.LogInfo("No instances to evacuate from node '%s'",
1000                    self.op.node_name)
1001
1002     if self.op.remote_node is not None:
1003       for i in self.instances:
1004         if i.primary_node == self.op.remote_node_uuid:
1005           raise errors.OpPrereqError("Node %s is the primary node of"
1006                                      " instance %s, cannot use it as"
1007                                      " secondary" %
1008                                      (self.op.remote_node, i.name),
1009                                      errors.ECODE_INVAL)
1010
1011   def Exec(self, feedback_fn):
1012     assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1013
1014     if not self.instance_names:
1015       # No instances to evacuate
1016       jobs = []
1017
1018     elif self.op.iallocator is not None:
1019       # TODO: Implement relocation to other group
1020       evac_mode = self._MODE2IALLOCATOR[self.op.mode]
1021       req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
1022                                      instances=list(self.instance_names))
1023       ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1024
1025       ial.Run(self.op.iallocator)
1026
1027       if not ial.success:
1028         raise errors.OpPrereqError("Can't compute node evacuation using"
1029                                    " iallocator '%s': %s" %
1030                                    (self.op.iallocator, ial.info),
1031                                    errors.ECODE_NORES)
1032
1033       jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1034
1035     elif self.op.remote_node is not None:
1036       assert self.op.mode == constants.NODE_EVAC_SEC
1037       jobs = [
1038         [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1039                                         remote_node=self.op.remote_node,
1040                                         disks=[],
1041                                         mode=constants.REPLACE_DISK_CHG,
1042                                         early_release=self.op.early_release)]
1043         for instance_name in self.instance_names]
1044
1045     else:
1046       raise errors.ProgrammerError("No iallocator or remote node")
1047
1048     return ResultWithJobs(jobs)
1049
1050
1051 class LUNodeMigrate(LogicalUnit):
1052   """Migrate all instances from a node.
1053
1054   """
1055   HPATH = "node-migrate"
1056   HTYPE = constants.HTYPE_NODE
1057   REQ_BGL = False
1058
1059   def CheckArguments(self):
1060     pass
1061
1062   def ExpandNames(self):
1063     (self.op.node_uuid, self.op.node_name) = \
1064       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1065
1066     self.share_locks = ShareAll()
1067     self.needed_locks = {
1068       locking.LEVEL_NODE: [self.op.node_uuid],
1069       }
1070
1071   def BuildHooksEnv(self):
1072     """Build hooks env.
1073
1074     This runs on the master, the primary and all the secondaries.
1075
1076     """
1077     return {
1078       "NODE_NAME": self.op.node_name,
1079       "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1080       }
1081
1082   def BuildHooksNodes(self):
1083     """Build hooks nodes.
1084
1085     """
1086     nl = [self.cfg.GetMasterNode()]
1087     return (nl, nl)
1088
1089   def CheckPrereq(self):
1090     pass
1091
1092   def Exec(self, feedback_fn):
1093     # Prepare jobs for migration instances
1094     jobs = [
1095       [opcodes.OpInstanceMigrate(
1096         instance_name=inst.name,
1097         mode=self.op.mode,
1098         live=self.op.live,
1099         iallocator=self.op.iallocator,
1100         target_node=self.op.target_node,
1101         allow_runtime_changes=self.op.allow_runtime_changes,
1102         ignore_ipolicy=self.op.ignore_ipolicy)]
1103       for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1104
1105     # TODO: Run iallocator in this opcode and pass correct placement options to
1106     # OpInstanceMigrate. Since other jobs can modify the cluster between
1107     # running the iallocator and the actual migration, a good consistency model
1108     # will have to be found.
1109
1110     assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1111             frozenset([self.op.node_uuid]))
1112
1113     return ResultWithJobs(jobs)
1114
1115
1116 def _GetStorageTypeArgs(cfg, storage_type):
1117   """Returns the arguments for a storage type.
1118
1119   """
1120   # Special case for file storage
1121   if storage_type == constants.ST_FILE:
1122     # storage.FileStorage wants a list of storage directories
1123     return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1124
1125   return []
1126
1127
1128 class LUNodeModifyStorage(NoHooksLU):
1129   """Logical unit for modifying a storage volume on a node.
1130
1131   """
1132   REQ_BGL = False
1133
1134   def CheckArguments(self):
1135     (self.op.node_uuid, self.op.node_name) = \
1136       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1137
1138     storage_type = self.op.storage_type
1139
1140     try:
1141       modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1142     except KeyError:
1143       raise errors.OpPrereqError("Storage units of type '%s' can not be"
1144                                  " modified" % storage_type,
1145                                  errors.ECODE_INVAL)
1146
1147     diff = set(self.op.changes.keys()) - modifiable
1148     if diff:
1149       raise errors.OpPrereqError("The following fields can not be modified for"
1150                                  " storage units of type '%s': %r" %
1151                                  (storage_type, list(diff)),
1152                                  errors.ECODE_INVAL)
1153
1154   def CheckPrereq(self):
1155     """Check prerequisites.
1156
1157     """
1158     CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1159
1160   def ExpandNames(self):
1161     self.needed_locks = {
1162       locking.LEVEL_NODE: self.op.node_uuid,
1163       }
1164
1165   def Exec(self, feedback_fn):
1166     """Computes the list of nodes and their attributes.
1167
1168     """
1169     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1170     result = self.rpc.call_storage_modify(self.op.node_uuid,
1171                                           self.op.storage_type, st_args,
1172                                           self.op.name, self.op.changes)
1173     result.Raise("Failed to modify storage unit '%s' on %s" %
1174                  (self.op.name, self.op.node_name))
1175
1176
1177 class NodeQuery(QueryBase):
1178   FIELDS = query.NODE_FIELDS
1179
1180   def ExpandNames(self, lu):
1181     lu.needed_locks = {}
1182     lu.share_locks = ShareAll()
1183
1184     if self.names:
1185       (self.wanted, _) = GetWantedNodes(lu, self.names)
1186     else:
1187       self.wanted = locking.ALL_SET
1188
1189     self.do_locking = (self.use_locking and
1190                        query.NQ_LIVE in self.requested_data)
1191
1192     if self.do_locking:
1193       # If any non-static field is requested we need to lock the nodes
1194       lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1195       lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1196
1197   def DeclareLocks(self, lu, level):
1198     pass
1199
1200   def _GetQueryData(self, lu):
1201     """Computes the list of nodes and their attributes.
1202
1203     """
1204     all_info = lu.cfg.GetAllNodesInfo()
1205
1206     node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1207
1208     # Gather data as requested
1209     if query.NQ_LIVE in self.requested_data:
1210       # filter out non-vm_capable nodes
1211       toquery_node_uuids = [node.uuid for node in all_info.values()
1212                             if node.vm_capable and node.uuid in node_uuids]
1213       lvm_enabled = utils.storage.IsLvmEnabled(
1214           lu.cfg.GetClusterInfo().enabled_disk_templates)
1215       # FIXME: this per default asks for storage space information for all
1216       # enabled disk templates. Fix this by making it possible to specify
1217       # space report fields for specific disk templates.
1218       raw_storage_units = utils.storage.GetStorageUnitsOfCluster(
1219           lu.cfg, include_spindles=lvm_enabled)
1220       storage_units = rpc.PrepareStorageUnitsForNodes(
1221           lu.cfg, raw_storage_units, toquery_node_uuids)
1222       default_hypervisor = lu.cfg.GetHypervisorType()
1223       hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1224       hvspecs = [(default_hypervisor, hvparams)]
1225       node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1226                                         hvspecs)
1227       live_data = dict(
1228           (uuid, rpc.MakeLegacyNodeInfo(nresult.payload,
1229                                         require_spindles=lvm_enabled))
1230           for (uuid, nresult) in node_data.items()
1231           if not nresult.fail_msg and nresult.payload)
1232     else:
1233       live_data = None
1234
1235     if query.NQ_INST in self.requested_data:
1236       node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1237       node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1238
1239       inst_data = lu.cfg.GetAllInstancesInfo()
1240       inst_uuid_to_inst_name = {}
1241
1242       for inst in inst_data.values():
1243         inst_uuid_to_inst_name[inst.uuid] = inst.name
1244         if inst.primary_node in node_to_primary:
1245           node_to_primary[inst.primary_node].add(inst.uuid)
1246         for secnode in inst.secondary_nodes:
1247           if secnode in node_to_secondary:
1248             node_to_secondary[secnode].add(inst.uuid)
1249     else:
1250       node_to_primary = None
1251       node_to_secondary = None
1252       inst_uuid_to_inst_name = None
1253
1254     if query.NQ_OOB in self.requested_data:
1255       oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1256                          for uuid, node in all_info.iteritems())
1257     else:
1258       oob_support = None
1259
1260     if query.NQ_GROUP in self.requested_data:
1261       groups = lu.cfg.GetAllNodeGroupsInfo()
1262     else:
1263       groups = {}
1264
1265     return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1266                                live_data, lu.cfg.GetMasterNode(),
1267                                node_to_primary, node_to_secondary,
1268                                inst_uuid_to_inst_name, groups, oob_support,
1269                                lu.cfg.GetClusterInfo())
1270
1271
1272 class LUNodeQuery(NoHooksLU):
1273   """Logical unit for querying nodes.
1274
1275   """
1276   # pylint: disable=W0142
1277   REQ_BGL = False
1278
1279   def CheckArguments(self):
1280     self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1281                          self.op.output_fields, self.op.use_locking)
1282
1283   def ExpandNames(self):
1284     self.nq.ExpandNames(self)
1285
1286   def DeclareLocks(self, level):
1287     self.nq.DeclareLocks(self, level)
1288
1289   def Exec(self, feedback_fn):
1290     return self.nq.OldStyleQuery(self)
1291
1292
1293 def _CheckOutputFields(fields, selected):
1294   """Checks whether all selected fields are valid according to fields.
1295
1296   @type fields: L{utils.FieldSet}
1297   @param fields: fields set
1298   @type selected: L{utils.FieldSet}
1299   @param selected: fields set
1300
1301   """
1302   delta = fields.NonMatching(selected)
1303   if delta:
1304     raise errors.OpPrereqError("Unknown output fields selected: %s"
1305                                % ",".join(delta), errors.ECODE_INVAL)
1306
1307
1308 class LUNodeQueryvols(NoHooksLU):
1309   """Logical unit for getting volumes on node(s).
1310
1311   """
1312   REQ_BGL = False
1313
1314   def CheckArguments(self):
1315     _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1316                                       constants.VF_VG, constants.VF_NAME,
1317                                       constants.VF_SIZE, constants.VF_INSTANCE),
1318                        self.op.output_fields)
1319
1320   def ExpandNames(self):
1321     self.share_locks = ShareAll()
1322
1323     if self.op.nodes:
1324       self.needed_locks = {
1325         locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1326         }
1327     else:
1328       self.needed_locks = {
1329         locking.LEVEL_NODE: locking.ALL_SET,
1330         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1331         }
1332
1333   def Exec(self, feedback_fn):
1334     """Computes the list of nodes and their attributes.
1335
1336     """
1337     node_uuids = self.owned_locks(locking.LEVEL_NODE)
1338     volumes = self.rpc.call_node_volumes(node_uuids)
1339
1340     ilist = self.cfg.GetAllInstancesInfo()
1341     vol2inst = MapInstanceLvsToNodes(ilist.values())
1342
1343     output = []
1344     for node_uuid in node_uuids:
1345       nresult = volumes[node_uuid]
1346       if nresult.offline:
1347         continue
1348       msg = nresult.fail_msg
1349       if msg:
1350         self.LogWarning("Can't compute volume data on node %s: %s",
1351                         self.cfg.GetNodeName(node_uuid), msg)
1352         continue
1353
1354       node_vols = sorted(nresult.payload,
1355                          key=operator.itemgetter(constants.VF_DEV))
1356
1357       for vol in node_vols:
1358         node_output = []
1359         for field in self.op.output_fields:
1360           if field == constants.VF_NODE:
1361             val = self.cfg.GetNodeName(node_uuid)
1362           elif field == constants.VF_PHYS:
1363             val = vol[constants.VF_DEV]
1364           elif field == constants.VF_VG:
1365             val = vol[constants.VF_VG]
1366           elif field == constants.VF_NAME:
1367             val = vol[constants.VF_NAME]
1368           elif field == constants.VF_SIZE:
1369             val = int(float(vol[constants.VF_SIZE]))
1370           elif field == constants.VF_INSTANCE:
1371             inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1372                                  vol[constants.VF_NAME]), None)
1373             if inst is not None:
1374               val = inst.name
1375             else:
1376               val = "-"
1377           else:
1378             raise errors.ParameterError(field)
1379           node_output.append(str(val))
1380
1381         output.append(node_output)
1382
1383     return output
1384
1385
1386 class LUNodeQueryStorage(NoHooksLU):
1387   """Logical unit for getting information on storage units on node(s).
1388
1389   """
1390   REQ_BGL = False
1391
1392   def CheckArguments(self):
1393     _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1394                        self.op.output_fields)
1395
1396   def ExpandNames(self):
1397     self.share_locks = ShareAll()
1398
1399     if self.op.nodes:
1400       self.needed_locks = {
1401         locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1402         }
1403     else:
1404       self.needed_locks = {
1405         locking.LEVEL_NODE: locking.ALL_SET,
1406         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1407         }
1408
1409   def CheckPrereq(self):
1410     """Check prerequisites.
1411
1412     """
1413     CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1414
1415   def Exec(self, feedback_fn):
1416     """Computes the list of nodes and their attributes.
1417
1418     """
1419     self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1420
1421     # Always get name to sort by
1422     if constants.SF_NAME in self.op.output_fields:
1423       fields = self.op.output_fields[:]
1424     else:
1425       fields = [constants.SF_NAME] + self.op.output_fields
1426
1427     # Never ask for node or type as it's only known to the LU
1428     for extra in [constants.SF_NODE, constants.SF_TYPE]:
1429       while extra in fields:
1430         fields.remove(extra)
1431
1432     field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1433     name_idx = field_idx[constants.SF_NAME]
1434
1435     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1436     data = self.rpc.call_storage_list(self.node_uuids,
1437                                       self.op.storage_type, st_args,
1438                                       self.op.name, fields)
1439
1440     result = []
1441
1442     for node_uuid in utils.NiceSort(self.node_uuids):
1443       node_name = self.cfg.GetNodeName(node_uuid)
1444       nresult = data[node_uuid]
1445       if nresult.offline:
1446         continue
1447
1448       msg = nresult.fail_msg
1449       if msg:
1450         self.LogWarning("Can't get storage data from node %s: %s",
1451                         node_name, msg)
1452         continue
1453
1454       rows = dict([(row[name_idx], row) for row in nresult.payload])
1455
1456       for name in utils.NiceSort(rows.keys()):
1457         row = rows[name]
1458
1459         out = []
1460
1461         for field in self.op.output_fields:
1462           if field == constants.SF_NODE:
1463             val = node_name
1464           elif field == constants.SF_TYPE:
1465             val = self.op.storage_type
1466           elif field in field_idx:
1467             val = row[field_idx[field]]
1468           else:
1469             raise errors.ParameterError(field)
1470
1471           out.append(val)
1472
1473         result.append(out)
1474
1475     return result
1476
1477
1478 class LUNodeRemove(LogicalUnit):
1479   """Logical unit for removing a node.
1480
1481   """
1482   HPATH = "node-remove"
1483   HTYPE = constants.HTYPE_NODE
1484
1485   def BuildHooksEnv(self):
1486     """Build hooks env.
1487
1488     """
1489     return {
1490       "OP_TARGET": self.op.node_name,
1491       "NODE_NAME": self.op.node_name,
1492       }
1493
1494   def BuildHooksNodes(self):
1495     """Build hooks nodes.
1496
1497     This doesn't run on the target node in the pre phase as a failed
1498     node would then be impossible to remove.
1499
1500     """
1501     all_nodes = self.cfg.GetNodeList()
1502     try:
1503       all_nodes.remove(self.op.node_uuid)
1504     except ValueError:
1505       pass
1506     return (all_nodes, all_nodes)
1507
1508   def CheckPrereq(self):
1509     """Check prerequisites.
1510
1511     This checks:
1512      - the node exists in the configuration
1513      - it does not have primary or secondary instances
1514      - it's not the master
1515
1516     Any errors are signaled by raising errors.OpPrereqError.
1517
1518     """
1519     (self.op.node_uuid, self.op.node_name) = \
1520       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1521     node = self.cfg.GetNodeInfo(self.op.node_uuid)
1522     assert node is not None
1523
1524     masternode = self.cfg.GetMasterNode()
1525     if node.uuid == masternode:
1526       raise errors.OpPrereqError("Node is the master node, failover to another"
1527                                  " node is required", errors.ECODE_INVAL)
1528
1529     for _, instance in self.cfg.GetAllInstancesInfo().items():
1530       if node.uuid in instance.all_nodes:
1531         raise errors.OpPrereqError("Instance %s is still running on the node,"
1532                                    " please remove first" % instance.name,
1533                                    errors.ECODE_INVAL)
1534     self.op.node_name = node.name
1535     self.node = node
1536
1537   def Exec(self, feedback_fn):
1538     """Removes the node from the cluster.
1539
1540     """
1541     logging.info("Stopping the node daemon and removing configs from node %s",
1542                  self.node.name)
1543
1544     modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1545
1546     assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1547       "Not owning BGL"
1548
1549     # Promote nodes to master candidate as needed
1550     AdjustCandidatePool(self, exceptions=[self.node.uuid])
1551     self.context.RemoveNode(self.node)
1552
1553     # Run post hooks on the node before it's removed
1554     RunPostHook(self, self.node.name)
1555
1556     # we have to call this by name rather than by UUID, as the node is no longer
1557     # in the config
1558     result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1559     msg = result.fail_msg
1560     if msg:
1561       self.LogWarning("Errors encountered on the remote node while leaving"
1562                       " the cluster: %s", msg)
1563
1564     # Remove node from our /etc/hosts
1565     if self.cfg.GetClusterInfo().modify_etc_hosts:
1566       master_node_uuid = self.cfg.GetMasterNode()
1567       result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1568                                               constants.ETC_HOSTS_REMOVE,
1569                                               self.node.name, None)
1570       result.Raise("Can't update hosts file with new host data")
1571       RedistributeAncillaryFiles(self)
1572
1573
1574 class LURepairNodeStorage(NoHooksLU):
1575   """Repairs the volume group on a node.
1576
1577   """
1578   REQ_BGL = False
1579
1580   def CheckArguments(self):
1581     (self.op.node_uuid, self.op.node_name) = \
1582       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1583
1584     storage_type = self.op.storage_type
1585
1586     if (constants.SO_FIX_CONSISTENCY not in
1587         constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1588       raise errors.OpPrereqError("Storage units of type '%s' can not be"
1589                                  " repaired" % storage_type,
1590                                  errors.ECODE_INVAL)
1591
1592   def ExpandNames(self):
1593     self.needed_locks = {
1594       locking.LEVEL_NODE: [self.op.node_uuid],
1595       }
1596
1597   def _CheckFaultyDisks(self, instance, node_uuid):
1598     """Ensure faulty disks abort the opcode or at least warn."""
1599     try:
1600       if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1601                                  node_uuid, True):
1602         raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1603                                    " node '%s'" %
1604                                    (instance.name,
1605                                     self.cfg.GetNodeName(node_uuid)),
1606                                    errors.ECODE_STATE)
1607     except errors.OpPrereqError, err:
1608       if self.op.ignore_consistency:
1609         self.LogWarning(str(err.args[0]))
1610       else:
1611         raise
1612
1613   def CheckPrereq(self):
1614     """Check prerequisites.
1615
1616     """
1617     CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1618
1619     # Check whether any instance on this node has faulty disks
1620     for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1621       if not inst.disks_active:
1622         continue
1623       check_nodes = set(inst.all_nodes)
1624       check_nodes.discard(self.op.node_uuid)
1625       for inst_node_uuid in check_nodes:
1626         self._CheckFaultyDisks(inst, inst_node_uuid)
1627
1628   def Exec(self, feedback_fn):
1629     feedback_fn("Repairing storage unit '%s' on %s ..." %
1630                 (self.op.name, self.op.node_name))
1631
1632     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1633     result = self.rpc.call_storage_execute(self.op.node_uuid,
1634                                            self.op.storage_type, st_args,
1635                                            self.op.name,
1636                                            constants.SO_FIX_CONSISTENCY)
1637     result.Raise("Failed to repair storage unit '%s' on %s" %
1638                  (self.op.name, self.op.node_name))