gnt-cluster modify: factor out ipolicy check
[ganeti-local] / lib / cmdlib / node.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with nodes."""
23
24 import logging
25 import operator
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti import netutils
31 from ganeti import objects
32 from ganeti import opcodes
33 from ganeti import qlang
34 from ganeti import query
35 from ganeti import rpc
36 from ganeti import utils
37 from ganeti.masterd import iallocator
38
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40   ResultWithJobs
41 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42   MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43   IsExclusiveStorageEnabledNode, CheckNodePVs, \
44   RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45   CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46   AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47   GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48   FindFaultyInstanceDisks, CheckStorageTypeEnabled
49
50
51 def _DecideSelfPromotion(lu, exceptions=None):
52   """Decide whether I should promote myself as a master candidate.
53
54   """
55   cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56   mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57   # the new node will increase mc_max with one, so:
58   mc_should = min(mc_should + 1, cp_size)
59   return mc_now < mc_should
60
61
62 def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63   """Ensure that a node has the given secondary ip.
64
65   @type lu: L{LogicalUnit}
66   @param lu: the LU on behalf of which we make the check
67   @type node: L{objects.Node}
68   @param node: the node to check
69   @type secondary_ip: string
70   @param secondary_ip: the ip to check
71   @type prereq: boolean
72   @param prereq: whether to throw a prerequisite or an execute error
73   @raise errors.OpPrereqError: if the node doesn't have the ip,
74   and prereq=True
75   @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76
77   """
78   # this can be called with a new node, which has no UUID yet, so perform the
79   # RPC call using its name
80   result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81   result.Raise("Failure checking secondary ip on node %s" % node.name,
82                prereq=prereq, ecode=errors.ECODE_ENVIRON)
83   if not result.payload:
84     msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85            " please fix and re-run this command" % secondary_ip)
86     if prereq:
87       raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88     else:
89       raise errors.OpExecError(msg)
90
91
92 class LUNodeAdd(LogicalUnit):
93   """Logical unit for adding node to the cluster.
94
95   """
96   HPATH = "node-add"
97   HTYPE = constants.HTYPE_NODE
98   _NFLAGS = ["master_capable", "vm_capable"]
99
100   def CheckArguments(self):
101     self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102     # validate/normalize the node name
103     self.hostname = netutils.GetHostname(name=self.op.node_name,
104                                          family=self.primary_ip_family)
105     self.op.node_name = self.hostname.name
106
107     if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108       raise errors.OpPrereqError("Cannot readd the master node",
109                                  errors.ECODE_STATE)
110
111     if self.op.readd and self.op.group:
112       raise errors.OpPrereqError("Cannot pass a node group when a node is"
113                                  " being readded", errors.ECODE_INVAL)
114
115   def BuildHooksEnv(self):
116     """Build hooks env.
117
118     This will run on all nodes before, and on all nodes + the new node after.
119
120     """
121     return {
122       "OP_TARGET": self.op.node_name,
123       "NODE_NAME": self.op.node_name,
124       "NODE_PIP": self.op.primary_ip,
125       "NODE_SIP": self.op.secondary_ip,
126       "MASTER_CAPABLE": str(self.op.master_capable),
127       "VM_CAPABLE": str(self.op.vm_capable),
128       }
129
130   def BuildHooksNodes(self):
131     """Build hooks nodes.
132
133     """
134     hook_nodes = self.cfg.GetNodeList()
135     new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
136     if new_node_info is not None:
137       # Exclude added node
138       hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
139
140     # add the new node as post hook node by name; it does not have an UUID yet
141     return (hook_nodes, hook_nodes, [self.op.node_name, ])
142
143   def CheckPrereq(self):
144     """Check prerequisites.
145
146     This checks:
147      - the new node is not already in the config
148      - it is resolvable
149      - its parameters (single/dual homed) matches the cluster
150
151     Any errors are signaled by raising errors.OpPrereqError.
152
153     """
154     node_name = self.hostname.name
155     self.op.primary_ip = self.hostname.ip
156     if self.op.secondary_ip is None:
157       if self.primary_ip_family == netutils.IP6Address.family:
158         raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
159                                    " IPv4 address must be given as secondary",
160                                    errors.ECODE_INVAL)
161       self.op.secondary_ip = self.op.primary_ip
162
163     secondary_ip = self.op.secondary_ip
164     if not netutils.IP4Address.IsValid(secondary_ip):
165       raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
166                                  " address" % secondary_ip, errors.ECODE_INVAL)
167
168     existing_node_info = self.cfg.GetNodeInfoByName(node_name)
169     if not self.op.readd and existing_node_info is not None:
170       raise errors.OpPrereqError("Node %s is already in the configuration" %
171                                  node_name, errors.ECODE_EXISTS)
172     elif self.op.readd and existing_node_info is None:
173       raise errors.OpPrereqError("Node %s is not in the configuration" %
174                                  node_name, errors.ECODE_NOENT)
175
176     self.changed_primary_ip = False
177
178     for existing_node in self.cfg.GetAllNodesInfo().values():
179       if self.op.readd and node_name == existing_node.name:
180         if existing_node.secondary_ip != secondary_ip:
181           raise errors.OpPrereqError("Readded node doesn't have the same IP"
182                                      " address configuration as before",
183                                      errors.ECODE_INVAL)
184         if existing_node.primary_ip != self.op.primary_ip:
185           self.changed_primary_ip = True
186
187         continue
188
189       if (existing_node.primary_ip == self.op.primary_ip or
190           existing_node.secondary_ip == self.op.primary_ip or
191           existing_node.primary_ip == secondary_ip or
192           existing_node.secondary_ip == secondary_ip):
193         raise errors.OpPrereqError("New node ip address(es) conflict with"
194                                    " existing node %s" % existing_node.name,
195                                    errors.ECODE_NOTUNIQUE)
196
197     # After this 'if' block, None is no longer a valid value for the
198     # _capable op attributes
199     if self.op.readd:
200       assert existing_node_info is not None, \
201         "Can't retrieve locked node %s" % node_name
202       for attr in self._NFLAGS:
203         if getattr(self.op, attr) is None:
204           setattr(self.op, attr, getattr(existing_node_info, attr))
205     else:
206       for attr in self._NFLAGS:
207         if getattr(self.op, attr) is None:
208           setattr(self.op, attr, True)
209
210     if self.op.readd and not self.op.vm_capable:
211       pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
212       if pri or sec:
213         raise errors.OpPrereqError("Node %s being re-added with vm_capable"
214                                    " flag set to false, but it already holds"
215                                    " instances" % node_name,
216                                    errors.ECODE_STATE)
217
218     # check that the type of the node (single versus dual homed) is the
219     # same as for the master
220     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
221     master_singlehomed = myself.secondary_ip == myself.primary_ip
222     newbie_singlehomed = secondary_ip == self.op.primary_ip
223     if master_singlehomed != newbie_singlehomed:
224       if master_singlehomed:
225         raise errors.OpPrereqError("The master has no secondary ip but the"
226                                    " new node has one",
227                                    errors.ECODE_INVAL)
228       else:
229         raise errors.OpPrereqError("The master has a secondary ip but the"
230                                    " new node doesn't have one",
231                                    errors.ECODE_INVAL)
232
233     # checks reachability
234     if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
235       raise errors.OpPrereqError("Node not reachable by ping",
236                                  errors.ECODE_ENVIRON)
237
238     if not newbie_singlehomed:
239       # check reachability from my secondary ip to newbie's secondary ip
240       if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
241                               source=myself.secondary_ip):
242         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
243                                    " based ping to node daemon port",
244                                    errors.ECODE_ENVIRON)
245
246     if self.op.readd:
247       exceptions = [existing_node_info.uuid]
248     else:
249       exceptions = []
250
251     if self.op.master_capable:
252       self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
253     else:
254       self.master_candidate = False
255
256     if self.op.readd:
257       self.new_node = existing_node_info
258     else:
259       node_group = self.cfg.LookupNodeGroup(self.op.group)
260       self.new_node = objects.Node(name=node_name,
261                                    primary_ip=self.op.primary_ip,
262                                    secondary_ip=secondary_ip,
263                                    master_candidate=self.master_candidate,
264                                    offline=False, drained=False,
265                                    group=node_group, ndparams={})
266
267     if self.op.ndparams:
268       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
269       CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
270                            "node", "cluster or group")
271
272     if self.op.hv_state:
273       self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
274
275     if self.op.disk_state:
276       self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
277
278     # TODO: If we need to have multiple DnsOnlyRunner we probably should make
279     #       it a property on the base class.
280     rpcrunner = rpc.DnsOnlyRunner()
281     result = rpcrunner.call_version([node_name])[node_name]
282     result.Raise("Can't get version information from node %s" % node_name)
283     if constants.PROTOCOL_VERSION == result.payload:
284       logging.info("Communication to node %s fine, sw version %s match",
285                    node_name, result.payload)
286     else:
287       raise errors.OpPrereqError("Version mismatch master version %s,"
288                                  " node version %s" %
289                                  (constants.PROTOCOL_VERSION, result.payload),
290                                  errors.ECODE_ENVIRON)
291
292     vg_name = self.cfg.GetVGName()
293     if vg_name is not None:
294       vparams = {constants.NV_PVLIST: [vg_name]}
295       excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
296       cname = self.cfg.GetClusterName()
297       result = rpcrunner.call_node_verify_light(
298           [node_name], vparams, cname,
299           self.cfg.GetClusterInfo().hvparams)[node_name]
300       (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
301       if errmsgs:
302         raise errors.OpPrereqError("Checks on node PVs failed: %s" %
303                                    "; ".join(errmsgs), errors.ECODE_ENVIRON)
304
305   def Exec(self, feedback_fn):
306     """Adds the new node to the cluster.
307
308     """
309     assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
310       "Not owning BGL"
311
312     # We adding a new node so we assume it's powered
313     self.new_node.powered = True
314
315     # for re-adds, reset the offline/drained/master-candidate flags;
316     # we need to reset here, otherwise offline would prevent RPC calls
317     # later in the procedure; this also means that if the re-add
318     # fails, we are left with a non-offlined, broken node
319     if self.op.readd:
320       self.new_node.drained = False
321       self.LogInfo("Readding a node, the offline/drained flags were reset")
322       # if we demote the node, we do cleanup later in the procedure
323       self.new_node.master_candidate = self.master_candidate
324       if self.changed_primary_ip:
325         self.new_node.primary_ip = self.op.primary_ip
326
327     # copy the master/vm_capable flags
328     for attr in self._NFLAGS:
329       setattr(self.new_node, attr, getattr(self.op, attr))
330
331     # notify the user about any possible mc promotion
332     if self.new_node.master_candidate:
333       self.LogInfo("Node will be a master candidate")
334
335     if self.op.ndparams:
336       self.new_node.ndparams = self.op.ndparams
337     else:
338       self.new_node.ndparams = {}
339
340     if self.op.hv_state:
341       self.new_node.hv_state_static = self.new_hv_state
342
343     if self.op.disk_state:
344       self.new_node.disk_state_static = self.new_disk_state
345
346     # Add node to our /etc/hosts, and add key to known_hosts
347     if self.cfg.GetClusterInfo().modify_etc_hosts:
348       master_node = self.cfg.GetMasterNode()
349       result = self.rpc.call_etc_hosts_modify(
350                  master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
351                  self.hostname.ip)
352       result.Raise("Can't update hosts file with new host data")
353
354     if self.new_node.secondary_ip != self.new_node.primary_ip:
355       _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
356                                False)
357
358     node_verifier_uuids = [self.cfg.GetMasterNode()]
359     node_verify_param = {
360       constants.NV_NODELIST: ([self.new_node.name], {}),
361       # TODO: do a node-net-test as well?
362     }
363
364     result = self.rpc.call_node_verify(
365                node_verifier_uuids, node_verify_param,
366                self.cfg.GetClusterName(),
367                self.cfg.GetClusterInfo().hvparams)
368     for verifier in node_verifier_uuids:
369       result[verifier].Raise("Cannot communicate with node %s" % verifier)
370       nl_payload = result[verifier].payload[constants.NV_NODELIST]
371       if nl_payload:
372         for failed in nl_payload:
373           feedback_fn("ssh/hostname verification failed"
374                       " (checking from %s): %s" %
375                       (verifier, nl_payload[failed]))
376         raise errors.OpExecError("ssh/hostname verification failed")
377
378     if self.op.readd:
379       self.context.ReaddNode(self.new_node)
380       RedistributeAncillaryFiles(self)
381       # make sure we redistribute the config
382       self.cfg.Update(self.new_node, feedback_fn)
383       # and make sure the new node will not have old files around
384       if not self.new_node.master_candidate:
385         result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
386         result.Warn("Node failed to demote itself from master candidate status",
387                     self.LogWarning)
388     else:
389       self.context.AddNode(self.new_node, self.proc.GetECId())
390       RedistributeAncillaryFiles(self)
391
392
393 class LUNodeSetParams(LogicalUnit):
394   """Modifies the parameters of a node.
395
396   @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
397       to the node role (as _ROLE_*)
398   @cvar _R2F: a dictionary from node role to tuples of flags
399   @cvar _FLAGS: a list of attribute names corresponding to the flags
400
401   """
402   HPATH = "node-modify"
403   HTYPE = constants.HTYPE_NODE
404   REQ_BGL = False
405   (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
406   _F2R = {
407     (True, False, False): _ROLE_CANDIDATE,
408     (False, True, False): _ROLE_DRAINED,
409     (False, False, True): _ROLE_OFFLINE,
410     (False, False, False): _ROLE_REGULAR,
411     }
412   _R2F = dict((v, k) for k, v in _F2R.items())
413   _FLAGS = ["master_candidate", "drained", "offline"]
414
415   def CheckArguments(self):
416     (self.op.node_uuid, self.op.node_name) = \
417       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
418     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
419                 self.op.master_capable, self.op.vm_capable,
420                 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
421                 self.op.disk_state]
422     if all_mods.count(None) == len(all_mods):
423       raise errors.OpPrereqError("Please pass at least one modification",
424                                  errors.ECODE_INVAL)
425     if all_mods.count(True) > 1:
426       raise errors.OpPrereqError("Can't set the node into more than one"
427                                  " state at the same time",
428                                  errors.ECODE_INVAL)
429
430     # Boolean value that tells us whether we might be demoting from MC
431     self.might_demote = (self.op.master_candidate is False or
432                          self.op.offline is True or
433                          self.op.drained is True or
434                          self.op.master_capable is False)
435
436     if self.op.secondary_ip:
437       if not netutils.IP4Address.IsValid(self.op.secondary_ip):
438         raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
439                                    " address" % self.op.secondary_ip,
440                                    errors.ECODE_INVAL)
441
442     self.lock_all = self.op.auto_promote and self.might_demote
443     self.lock_instances = self.op.secondary_ip is not None
444
445   def _InstanceFilter(self, instance):
446     """Filter for getting affected instances.
447
448     """
449     return (instance.disk_template in constants.DTS_INT_MIRROR and
450             self.op.node_uuid in instance.all_nodes)
451
452   def ExpandNames(self):
453     if self.lock_all:
454       self.needed_locks = {
455         locking.LEVEL_NODE: locking.ALL_SET,
456
457         # Block allocations when all nodes are locked
458         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
459         }
460     else:
461       self.needed_locks = {
462         locking.LEVEL_NODE: self.op.node_uuid,
463         }
464
465     # Since modifying a node can have severe effects on currently running
466     # operations the resource lock is at least acquired in shared mode
467     self.needed_locks[locking.LEVEL_NODE_RES] = \
468       self.needed_locks[locking.LEVEL_NODE]
469
470     # Get all locks except nodes in shared mode; they are not used for anything
471     # but read-only access
472     self.share_locks = ShareAll()
473     self.share_locks[locking.LEVEL_NODE] = 0
474     self.share_locks[locking.LEVEL_NODE_RES] = 0
475     self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
476
477     if self.lock_instances:
478       self.needed_locks[locking.LEVEL_INSTANCE] = \
479         self.cfg.GetInstanceNames(
480           self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
481
482   def BuildHooksEnv(self):
483     """Build hooks env.
484
485     This runs on the master node.
486
487     """
488     return {
489       "OP_TARGET": self.op.node_name,
490       "MASTER_CANDIDATE": str(self.op.master_candidate),
491       "OFFLINE": str(self.op.offline),
492       "DRAINED": str(self.op.drained),
493       "MASTER_CAPABLE": str(self.op.master_capable),
494       "VM_CAPABLE": str(self.op.vm_capable),
495       }
496
497   def BuildHooksNodes(self):
498     """Build hooks nodes.
499
500     """
501     nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
502     return (nl, nl)
503
504   def CheckPrereq(self):
505     """Check prerequisites.
506
507     This only checks the instance list against the existing names.
508
509     """
510     node = self.cfg.GetNodeInfo(self.op.node_uuid)
511     if self.lock_instances:
512       affected_instances = \
513         self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
514
515       # Verify instance locks
516       owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
517       wanted_instance_names = frozenset([inst.name for inst in
518                                          affected_instances.values()])
519       if wanted_instance_names - owned_instance_names:
520         raise errors.OpPrereqError("Instances affected by changing node %s's"
521                                    " secondary IP address have changed since"
522                                    " locks were acquired, wanted '%s', have"
523                                    " '%s'; retry the operation" %
524                                    (node.name,
525                                     utils.CommaJoin(wanted_instance_names),
526                                     utils.CommaJoin(owned_instance_names)),
527                                    errors.ECODE_STATE)
528     else:
529       affected_instances = None
530
531     if (self.op.master_candidate is not None or
532         self.op.drained is not None or
533         self.op.offline is not None):
534       # we can't change the master's node flags
535       if node.uuid == self.cfg.GetMasterNode():
536         raise errors.OpPrereqError("The master role can be changed"
537                                    " only via master-failover",
538                                    errors.ECODE_INVAL)
539
540     if self.op.master_candidate and not node.master_capable:
541       raise errors.OpPrereqError("Node %s is not master capable, cannot make"
542                                  " it a master candidate" % node.name,
543                                  errors.ECODE_STATE)
544
545     if self.op.vm_capable is False:
546       (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
547       if ipri or isec:
548         raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
549                                    " the vm_capable flag" % node.name,
550                                    errors.ECODE_STATE)
551
552     if node.master_candidate and self.might_demote and not self.lock_all:
553       assert not self.op.auto_promote, "auto_promote set but lock_all not"
554       # check if after removing the current node, we're missing master
555       # candidates
556       (mc_remaining, mc_should, _) = \
557           self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
558       if mc_remaining < mc_should:
559         raise errors.OpPrereqError("Not enough master candidates, please"
560                                    " pass auto promote option to allow"
561                                    " promotion (--auto-promote or RAPI"
562                                    " auto_promote=True)", errors.ECODE_STATE)
563
564     self.old_flags = old_flags = (node.master_candidate,
565                                   node.drained, node.offline)
566     assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
567     self.old_role = old_role = self._F2R[old_flags]
568
569     # Check for ineffective changes
570     for attr in self._FLAGS:
571       if getattr(self.op, attr) is False and getattr(node, attr) is False:
572         self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
573         setattr(self.op, attr, None)
574
575     # Past this point, any flag change to False means a transition
576     # away from the respective state, as only real changes are kept
577
578     # TODO: We might query the real power state if it supports OOB
579     if SupportsOob(self.cfg, node):
580       if self.op.offline is False and not (node.powered or
581                                            self.op.powered is True):
582         raise errors.OpPrereqError(("Node %s needs to be turned on before its"
583                                     " offline status can be reset") %
584                                    self.op.node_name, errors.ECODE_STATE)
585     elif self.op.powered is not None:
586       raise errors.OpPrereqError(("Unable to change powered state for node %s"
587                                   " as it does not support out-of-band"
588                                   " handling") % self.op.node_name,
589                                  errors.ECODE_STATE)
590
591     # If we're being deofflined/drained, we'll MC ourself if needed
592     if (self.op.drained is False or self.op.offline is False or
593         (self.op.master_capable and not node.master_capable)):
594       if _DecideSelfPromotion(self):
595         self.op.master_candidate = True
596         self.LogInfo("Auto-promoting node to master candidate")
597
598     # If we're no longer master capable, we'll demote ourselves from MC
599     if self.op.master_capable is False and node.master_candidate:
600       self.LogInfo("Demoting from master candidate")
601       self.op.master_candidate = False
602
603     # Compute new role
604     assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
605     if self.op.master_candidate:
606       new_role = self._ROLE_CANDIDATE
607     elif self.op.drained:
608       new_role = self._ROLE_DRAINED
609     elif self.op.offline:
610       new_role = self._ROLE_OFFLINE
611     elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
612       # False is still in new flags, which means we're un-setting (the
613       # only) True flag
614       new_role = self._ROLE_REGULAR
615     else: # no new flags, nothing, keep old role
616       new_role = old_role
617
618     self.new_role = new_role
619
620     if old_role == self._ROLE_OFFLINE and new_role != old_role:
621       # Trying to transition out of offline status
622       result = self.rpc.call_version([node.uuid])[node.uuid]
623       if result.fail_msg:
624         raise errors.OpPrereqError("Node %s is being de-offlined but fails"
625                                    " to report its version: %s" %
626                                    (node.name, result.fail_msg),
627                                    errors.ECODE_STATE)
628       else:
629         self.LogWarning("Transitioning node from offline to online state"
630                         " without using re-add. Please make sure the node"
631                         " is healthy!")
632
633     # When changing the secondary ip, verify if this is a single-homed to
634     # multi-homed transition or vice versa, and apply the relevant
635     # restrictions.
636     if self.op.secondary_ip:
637       # Ok even without locking, because this can't be changed by any LU
638       master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
639       master_singlehomed = master.secondary_ip == master.primary_ip
640       if master_singlehomed and self.op.secondary_ip != node.primary_ip:
641         if self.op.force and node.uuid == master.uuid:
642           self.LogWarning("Transitioning from single-homed to multi-homed"
643                           " cluster; all nodes will require a secondary IP"
644                           " address")
645         else:
646           raise errors.OpPrereqError("Changing the secondary ip on a"
647                                      " single-homed cluster requires the"
648                                      " --force option to be passed, and the"
649                                      " target node to be the master",
650                                      errors.ECODE_INVAL)
651       elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
652         if self.op.force and node.uuid == master.uuid:
653           self.LogWarning("Transitioning from multi-homed to single-homed"
654                           " cluster; secondary IP addresses will have to be"
655                           " removed")
656         else:
657           raise errors.OpPrereqError("Cannot set the secondary IP to be the"
658                                      " same as the primary IP on a multi-homed"
659                                      " cluster, unless the --force option is"
660                                      " passed, and the target node is the"
661                                      " master", errors.ECODE_INVAL)
662
663       assert not (set([inst.name for inst in affected_instances.values()]) -
664                   self.owned_locks(locking.LEVEL_INSTANCE))
665
666       if node.offline:
667         if affected_instances:
668           msg = ("Cannot change secondary IP address: offline node has"
669                  " instances (%s) configured to use it" %
670                  utils.CommaJoin(
671                    [inst.name for inst in affected_instances.values()]))
672           raise errors.OpPrereqError(msg, errors.ECODE_STATE)
673       else:
674         # On online nodes, check that no instances are running, and that
675         # the node has the new ip and we can reach it.
676         for instance in affected_instances.values():
677           CheckInstanceState(self, instance, INSTANCE_DOWN,
678                              msg="cannot change secondary ip")
679
680         _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
681         if master.uuid != node.uuid:
682           # check reachability from master secondary ip to new secondary ip
683           if not netutils.TcpPing(self.op.secondary_ip,
684                                   constants.DEFAULT_NODED_PORT,
685                                   source=master.secondary_ip):
686             raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
687                                        " based ping to node daemon port",
688                                        errors.ECODE_ENVIRON)
689
690     if self.op.ndparams:
691       new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
692       utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
693       CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
694                            "node", "cluster or group")
695       self.new_ndparams = new_ndparams
696
697     if self.op.hv_state:
698       self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
699                                                 node.hv_state_static)
700
701     if self.op.disk_state:
702       self.new_disk_state = \
703         MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
704
705   def Exec(self, feedback_fn):
706     """Modifies a node.
707
708     """
709     node = self.cfg.GetNodeInfo(self.op.node_uuid)
710     result = []
711
712     if self.op.ndparams:
713       node.ndparams = self.new_ndparams
714
715     if self.op.powered is not None:
716       node.powered = self.op.powered
717
718     if self.op.hv_state:
719       node.hv_state_static = self.new_hv_state
720
721     if self.op.disk_state:
722       node.disk_state_static = self.new_disk_state
723
724     for attr in ["master_capable", "vm_capable"]:
725       val = getattr(self.op, attr)
726       if val is not None:
727         setattr(node, attr, val)
728         result.append((attr, str(val)))
729
730     if self.new_role != self.old_role:
731       # Tell the node to demote itself, if no longer MC and not offline
732       if self.old_role == self._ROLE_CANDIDATE and \
733           self.new_role != self._ROLE_OFFLINE:
734         msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
735         if msg:
736           self.LogWarning("Node failed to demote itself: %s", msg)
737
738       new_flags = self._R2F[self.new_role]
739       for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
740         if of != nf:
741           result.append((desc, str(nf)))
742       (node.master_candidate, node.drained, node.offline) = new_flags
743
744       # we locked all nodes, we adjust the CP before updating this node
745       if self.lock_all:
746         AdjustCandidatePool(self, [node.uuid])
747
748     if self.op.secondary_ip:
749       node.secondary_ip = self.op.secondary_ip
750       result.append(("secondary_ip", self.op.secondary_ip))
751
752     # this will trigger configuration file update, if needed
753     self.cfg.Update(node, feedback_fn)
754
755     # this will trigger job queue propagation or cleanup if the mc
756     # flag changed
757     if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
758       self.context.ReaddNode(node)
759
760     return result
761
762
763 class LUNodePowercycle(NoHooksLU):
764   """Powercycles a node.
765
766   """
767   REQ_BGL = False
768
769   def CheckArguments(self):
770     (self.op.node_uuid, self.op.node_name) = \
771       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
772
773     if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
774       raise errors.OpPrereqError("The node is the master and the force"
775                                  " parameter was not set",
776                                  errors.ECODE_INVAL)
777
778   def ExpandNames(self):
779     """Locking for PowercycleNode.
780
781     This is a last-resort option and shouldn't block on other
782     jobs. Therefore, we grab no locks.
783
784     """
785     self.needed_locks = {}
786
787   def Exec(self, feedback_fn):
788     """Reboots a node.
789
790     """
791     default_hypervisor = self.cfg.GetHypervisorType()
792     hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
793     result = self.rpc.call_node_powercycle(self.op.node_uuid,
794                                            default_hypervisor,
795                                            hvparams)
796     result.Raise("Failed to schedule the reboot")
797     return result.payload
798
799
800 def _GetNodeInstancesInner(cfg, fn):
801   return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
802
803
804 def _GetNodePrimaryInstances(cfg, node_uuid):
805   """Returns primary instances on a node.
806
807   """
808   return _GetNodeInstancesInner(cfg,
809                                 lambda inst: node_uuid == inst.primary_node)
810
811
812 def _GetNodeSecondaryInstances(cfg, node_uuid):
813   """Returns secondary instances on a node.
814
815   """
816   return _GetNodeInstancesInner(cfg,
817                                 lambda inst: node_uuid in inst.secondary_nodes)
818
819
820 def _GetNodeInstances(cfg, node_uuid):
821   """Returns a list of all primary and secondary instances on a node.
822
823   """
824
825   return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
826
827
828 class LUNodeEvacuate(NoHooksLU):
829   """Evacuates instances off a list of nodes.
830
831   """
832   REQ_BGL = False
833
834   _MODE2IALLOCATOR = {
835     constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
836     constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
837     constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
838     }
839   assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
840   assert (frozenset(_MODE2IALLOCATOR.values()) ==
841           constants.IALLOCATOR_NEVAC_MODES)
842
843   def CheckArguments(self):
844     CheckIAllocatorOrNode(self, "iallocator", "remote_node")
845
846   def ExpandNames(self):
847     (self.op.node_uuid, self.op.node_name) = \
848       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
849
850     if self.op.remote_node is not None:
851       (self.op.remote_node_uuid, self.op.remote_node) = \
852         ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
853                               self.op.remote_node)
854       assert self.op.remote_node
855
856       if self.op.node_uuid == self.op.remote_node_uuid:
857         raise errors.OpPrereqError("Can not use evacuated node as a new"
858                                    " secondary node", errors.ECODE_INVAL)
859
860       if self.op.mode != constants.NODE_EVAC_SEC:
861         raise errors.OpPrereqError("Without the use of an iallocator only"
862                                    " secondary instances can be evacuated",
863                                    errors.ECODE_INVAL)
864
865     # Declare locks
866     self.share_locks = ShareAll()
867     self.needed_locks = {
868       locking.LEVEL_INSTANCE: [],
869       locking.LEVEL_NODEGROUP: [],
870       locking.LEVEL_NODE: [],
871       }
872
873     # Determine nodes (via group) optimistically, needs verification once locks
874     # have been acquired
875     self.lock_nodes = self._DetermineNodes()
876
877   def _DetermineNodes(self):
878     """Gets the list of node UUIDs to operate on.
879
880     """
881     if self.op.remote_node is None:
882       # Iallocator will choose any node(s) in the same group
883       group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
884     else:
885       group_nodes = frozenset([self.op.remote_node_uuid])
886
887     # Determine nodes to be locked
888     return set([self.op.node_uuid]) | group_nodes
889
890   def _DetermineInstances(self):
891     """Builds list of instances to operate on.
892
893     """
894     assert self.op.mode in constants.NODE_EVAC_MODES
895
896     if self.op.mode == constants.NODE_EVAC_PRI:
897       # Primary instances only
898       inst_fn = _GetNodePrimaryInstances
899       assert self.op.remote_node is None, \
900         "Evacuating primary instances requires iallocator"
901     elif self.op.mode == constants.NODE_EVAC_SEC:
902       # Secondary instances only
903       inst_fn = _GetNodeSecondaryInstances
904     else:
905       # All instances
906       assert self.op.mode == constants.NODE_EVAC_ALL
907       inst_fn = _GetNodeInstances
908       # TODO: In 2.6, change the iallocator interface to take an evacuation mode
909       # per instance
910       raise errors.OpPrereqError("Due to an issue with the iallocator"
911                                  " interface it is not possible to evacuate"
912                                  " all instances at once; specify explicitly"
913                                  " whether to evacuate primary or secondary"
914                                  " instances",
915                                  errors.ECODE_INVAL)
916
917     return inst_fn(self.cfg, self.op.node_uuid)
918
919   def DeclareLocks(self, level):
920     if level == locking.LEVEL_INSTANCE:
921       # Lock instances optimistically, needs verification once node and group
922       # locks have been acquired
923       self.needed_locks[locking.LEVEL_INSTANCE] = \
924         set(i.name for i in self._DetermineInstances())
925
926     elif level == locking.LEVEL_NODEGROUP:
927       # Lock node groups for all potential target nodes optimistically, needs
928       # verification once nodes have been acquired
929       self.needed_locks[locking.LEVEL_NODEGROUP] = \
930         self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
931
932     elif level == locking.LEVEL_NODE:
933       self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
934
935   def CheckPrereq(self):
936     # Verify locks
937     owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
938     owned_nodes = self.owned_locks(locking.LEVEL_NODE)
939     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
940
941     need_nodes = self._DetermineNodes()
942
943     if not owned_nodes.issuperset(need_nodes):
944       raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
945                                  " locks were acquired, current nodes are"
946                                  " are '%s', used to be '%s'; retry the"
947                                  " operation" %
948                                  (self.op.node_name,
949                                   utils.CommaJoin(need_nodes),
950                                   utils.CommaJoin(owned_nodes)),
951                                  errors.ECODE_STATE)
952
953     wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
954     if owned_groups != wanted_groups:
955       raise errors.OpExecError("Node groups changed since locks were acquired,"
956                                " current groups are '%s', used to be '%s';"
957                                " retry the operation" %
958                                (utils.CommaJoin(wanted_groups),
959                                 utils.CommaJoin(owned_groups)))
960
961     # Determine affected instances
962     self.instances = self._DetermineInstances()
963     self.instance_names = [i.name for i in self.instances]
964
965     if set(self.instance_names) != owned_instance_names:
966       raise errors.OpExecError("Instances on node '%s' changed since locks"
967                                " were acquired, current instances are '%s',"
968                                " used to be '%s'; retry the operation" %
969                                (self.op.node_name,
970                                 utils.CommaJoin(self.instance_names),
971                                 utils.CommaJoin(owned_instance_names)))
972
973     if self.instance_names:
974       self.LogInfo("Evacuating instances from node '%s': %s",
975                    self.op.node_name,
976                    utils.CommaJoin(utils.NiceSort(self.instance_names)))
977     else:
978       self.LogInfo("No instances to evacuate from node '%s'",
979                    self.op.node_name)
980
981     if self.op.remote_node is not None:
982       for i in self.instances:
983         if i.primary_node == self.op.remote_node_uuid:
984           raise errors.OpPrereqError("Node %s is the primary node of"
985                                      " instance %s, cannot use it as"
986                                      " secondary" %
987                                      (self.op.remote_node, i.name),
988                                      errors.ECODE_INVAL)
989
990   def Exec(self, feedback_fn):
991     assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
992
993     if not self.instance_names:
994       # No instances to evacuate
995       jobs = []
996
997     elif self.op.iallocator is not None:
998       # TODO: Implement relocation to other group
999       evac_mode = self._MODE2IALLOCATOR[self.op.mode]
1000       req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
1001                                      instances=list(self.instance_names))
1002       ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1003
1004       ial.Run(self.op.iallocator)
1005
1006       if not ial.success:
1007         raise errors.OpPrereqError("Can't compute node evacuation using"
1008                                    " iallocator '%s': %s" %
1009                                    (self.op.iallocator, ial.info),
1010                                    errors.ECODE_NORES)
1011
1012       jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1013
1014     elif self.op.remote_node is not None:
1015       assert self.op.mode == constants.NODE_EVAC_SEC
1016       jobs = [
1017         [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1018                                         remote_node=self.op.remote_node,
1019                                         disks=[],
1020                                         mode=constants.REPLACE_DISK_CHG,
1021                                         early_release=self.op.early_release)]
1022         for instance_name in self.instance_names]
1023
1024     else:
1025       raise errors.ProgrammerError("No iallocator or remote node")
1026
1027     return ResultWithJobs(jobs)
1028
1029
1030 class LUNodeMigrate(LogicalUnit):
1031   """Migrate all instances from a node.
1032
1033   """
1034   HPATH = "node-migrate"
1035   HTYPE = constants.HTYPE_NODE
1036   REQ_BGL = False
1037
1038   def CheckArguments(self):
1039     pass
1040
1041   def ExpandNames(self):
1042     (self.op.node_uuid, self.op.node_name) = \
1043       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1044
1045     self.share_locks = ShareAll()
1046     self.needed_locks = {
1047       locking.LEVEL_NODE: [self.op.node_uuid],
1048       }
1049
1050   def BuildHooksEnv(self):
1051     """Build hooks env.
1052
1053     This runs on the master, the primary and all the secondaries.
1054
1055     """
1056     return {
1057       "NODE_NAME": self.op.node_name,
1058       "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1059       }
1060
1061   def BuildHooksNodes(self):
1062     """Build hooks nodes.
1063
1064     """
1065     nl = [self.cfg.GetMasterNode()]
1066     return (nl, nl)
1067
1068   def CheckPrereq(self):
1069     pass
1070
1071   def Exec(self, feedback_fn):
1072     # Prepare jobs for migration instances
1073     jobs = [
1074       [opcodes.OpInstanceMigrate(
1075         instance_name=inst.name,
1076         mode=self.op.mode,
1077         live=self.op.live,
1078         iallocator=self.op.iallocator,
1079         target_node=self.op.target_node,
1080         allow_runtime_changes=self.op.allow_runtime_changes,
1081         ignore_ipolicy=self.op.ignore_ipolicy)]
1082       for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1083
1084     # TODO: Run iallocator in this opcode and pass correct placement options to
1085     # OpInstanceMigrate. Since other jobs can modify the cluster between
1086     # running the iallocator and the actual migration, a good consistency model
1087     # will have to be found.
1088
1089     assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1090             frozenset([self.op.node_uuid]))
1091
1092     return ResultWithJobs(jobs)
1093
1094
1095 def _GetStorageTypeArgs(cfg, storage_type):
1096   """Returns the arguments for a storage type.
1097
1098   """
1099   # Special case for file storage
1100   if storage_type == constants.ST_FILE:
1101     # storage.FileStorage wants a list of storage directories
1102     return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1103
1104   return []
1105
1106
1107 class LUNodeModifyStorage(NoHooksLU):
1108   """Logical unit for modifying a storage volume on a node.
1109
1110   """
1111   REQ_BGL = False
1112
1113   def CheckArguments(self):
1114     (self.op.node_uuid, self.op.node_name) = \
1115       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1116
1117     storage_type = self.op.storage_type
1118
1119     try:
1120       modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1121     except KeyError:
1122       raise errors.OpPrereqError("Storage units of type '%s' can not be"
1123                                  " modified" % storage_type,
1124                                  errors.ECODE_INVAL)
1125
1126     diff = set(self.op.changes.keys()) - modifiable
1127     if diff:
1128       raise errors.OpPrereqError("The following fields can not be modified for"
1129                                  " storage units of type '%s': %r" %
1130                                  (storage_type, list(diff)),
1131                                  errors.ECODE_INVAL)
1132
1133   def CheckPrereq(self):
1134     """Check prerequisites.
1135
1136     """
1137     CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1138
1139   def ExpandNames(self):
1140     self.needed_locks = {
1141       locking.LEVEL_NODE: self.op.node_uuid,
1142       }
1143
1144   def Exec(self, feedback_fn):
1145     """Computes the list of nodes and their attributes.
1146
1147     """
1148     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1149     result = self.rpc.call_storage_modify(self.op.node_uuid,
1150                                           self.op.storage_type, st_args,
1151                                           self.op.name, self.op.changes)
1152     result.Raise("Failed to modify storage unit '%s' on %s" %
1153                  (self.op.name, self.op.node_name))
1154
1155
1156 class NodeQuery(QueryBase):
1157   FIELDS = query.NODE_FIELDS
1158
1159   def ExpandNames(self, lu):
1160     lu.needed_locks = {}
1161     lu.share_locks = ShareAll()
1162
1163     if self.names:
1164       (self.wanted, _) = GetWantedNodes(lu, self.names)
1165     else:
1166       self.wanted = locking.ALL_SET
1167
1168     self.do_locking = (self.use_locking and
1169                        query.NQ_LIVE in self.requested_data)
1170
1171     if self.do_locking:
1172       # If any non-static field is requested we need to lock the nodes
1173       lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1174       lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1175
1176   def DeclareLocks(self, lu, level):
1177     pass
1178
1179   def _GetQueryData(self, lu):
1180     """Computes the list of nodes and their attributes.
1181
1182     """
1183     all_info = lu.cfg.GetAllNodesInfo()
1184
1185     node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1186
1187     # Gather data as requested
1188     if query.NQ_LIVE in self.requested_data:
1189       # filter out non-vm_capable nodes
1190       toquery_node_uuids = [node.uuid for node in all_info.values()
1191                             if node.vm_capable and node.uuid in node_uuids]
1192       lvm_enabled = utils.storage.IsLvmEnabled(
1193           lu.cfg.GetClusterInfo().enabled_disk_templates)
1194       # FIXME: this per default asks for storage space information for all
1195       # enabled disk templates. Fix this by making it possible to specify
1196       # space report fields for specific disk templates.
1197       raw_storage_units = utils.storage.GetStorageUnitsOfCluster(
1198           lu.cfg, include_spindles=lvm_enabled)
1199       storage_units = rpc.PrepareStorageUnitsForNodes(
1200           lu.cfg, raw_storage_units, toquery_node_uuids)
1201       default_hypervisor = lu.cfg.GetHypervisorType()
1202       hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1203       hvspecs = [(default_hypervisor, hvparams)]
1204       node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1205                                         hvspecs)
1206       live_data = dict(
1207           (uuid, rpc.MakeLegacyNodeInfo(nresult.payload,
1208                                         require_spindles=lvm_enabled))
1209           for (uuid, nresult) in node_data.items()
1210           if not nresult.fail_msg and nresult.payload)
1211     else:
1212       live_data = None
1213
1214     if query.NQ_INST in self.requested_data:
1215       node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1216       node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1217
1218       inst_data = lu.cfg.GetAllInstancesInfo()
1219       inst_uuid_to_inst_name = {}
1220
1221       for inst in inst_data.values():
1222         inst_uuid_to_inst_name[inst.uuid] = inst.name
1223         if inst.primary_node in node_to_primary:
1224           node_to_primary[inst.primary_node].add(inst.uuid)
1225         for secnode in inst.secondary_nodes:
1226           if secnode in node_to_secondary:
1227             node_to_secondary[secnode].add(inst.uuid)
1228     else:
1229       node_to_primary = None
1230       node_to_secondary = None
1231       inst_uuid_to_inst_name = None
1232
1233     if query.NQ_OOB in self.requested_data:
1234       oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1235                          for uuid, node in all_info.iteritems())
1236     else:
1237       oob_support = None
1238
1239     if query.NQ_GROUP in self.requested_data:
1240       groups = lu.cfg.GetAllNodeGroupsInfo()
1241     else:
1242       groups = {}
1243
1244     return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1245                                live_data, lu.cfg.GetMasterNode(),
1246                                node_to_primary, node_to_secondary,
1247                                inst_uuid_to_inst_name, groups, oob_support,
1248                                lu.cfg.GetClusterInfo())
1249
1250
1251 class LUNodeQuery(NoHooksLU):
1252   """Logical unit for querying nodes.
1253
1254   """
1255   # pylint: disable=W0142
1256   REQ_BGL = False
1257
1258   def CheckArguments(self):
1259     self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1260                          self.op.output_fields, self.op.use_locking)
1261
1262   def ExpandNames(self):
1263     self.nq.ExpandNames(self)
1264
1265   def DeclareLocks(self, level):
1266     self.nq.DeclareLocks(self, level)
1267
1268   def Exec(self, feedback_fn):
1269     return self.nq.OldStyleQuery(self)
1270
1271
1272 def _CheckOutputFields(static, dynamic, selected):
1273   """Checks whether all selected fields are valid.
1274
1275   @type static: L{utils.FieldSet}
1276   @param static: static fields set
1277   @type dynamic: L{utils.FieldSet}
1278   @param dynamic: dynamic fields set
1279
1280   """
1281   f = utils.FieldSet()
1282   f.Extend(static)
1283   f.Extend(dynamic)
1284
1285   delta = f.NonMatching(selected)
1286   if delta:
1287     raise errors.OpPrereqError("Unknown output fields selected: %s"
1288                                % ",".join(delta), errors.ECODE_INVAL)
1289
1290
1291 class LUNodeQueryvols(NoHooksLU):
1292   """Logical unit for getting volumes on node(s).
1293
1294   """
1295   REQ_BGL = False
1296   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1297   _FIELDS_STATIC = utils.FieldSet("node")
1298
1299   def CheckArguments(self):
1300     _CheckOutputFields(static=self._FIELDS_STATIC,
1301                        dynamic=self._FIELDS_DYNAMIC,
1302                        selected=self.op.output_fields)
1303
1304   def ExpandNames(self):
1305     self.share_locks = ShareAll()
1306
1307     if self.op.nodes:
1308       self.needed_locks = {
1309         locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1310         }
1311     else:
1312       self.needed_locks = {
1313         locking.LEVEL_NODE: locking.ALL_SET,
1314         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1315         }
1316
1317   def Exec(self, feedback_fn):
1318     """Computes the list of nodes and their attributes.
1319
1320     """
1321     node_uuids = self.owned_locks(locking.LEVEL_NODE)
1322     volumes = self.rpc.call_node_volumes(node_uuids)
1323
1324     ilist = self.cfg.GetAllInstancesInfo()
1325     vol2inst = MapInstanceLvsToNodes(ilist.values())
1326
1327     output = []
1328     for node_uuid in node_uuids:
1329       nresult = volumes[node_uuid]
1330       if nresult.offline:
1331         continue
1332       msg = nresult.fail_msg
1333       if msg:
1334         self.LogWarning("Can't compute volume data on node %s: %s",
1335                         self.cfg.GetNodeName(node_uuid), msg)
1336         continue
1337
1338       node_vols = sorted(nresult.payload,
1339                          key=operator.itemgetter("dev"))
1340
1341       for vol in node_vols:
1342         node_output = []
1343         for field in self.op.output_fields:
1344           if field == "node":
1345             val = self.cfg.GetNodeName(node_uuid)
1346           elif field == "phys":
1347             val = vol["dev"]
1348           elif field == "vg":
1349             val = vol["vg"]
1350           elif field == "name":
1351             val = vol["name"]
1352           elif field == "size":
1353             val = int(float(vol["size"]))
1354           elif field == "instance":
1355             inst = vol2inst.get((node_uuid, vol["vg"] + "/" + vol["name"]),
1356                                 None)
1357             if inst is not None:
1358               val = inst.name
1359             else:
1360               val = "-"
1361           else:
1362             raise errors.ParameterError(field)
1363           node_output.append(str(val))
1364
1365         output.append(node_output)
1366
1367     return output
1368
1369
1370 class LUNodeQueryStorage(NoHooksLU):
1371   """Logical unit for getting information on storage units on node(s).
1372
1373   """
1374   _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1375   REQ_BGL = False
1376
1377   def CheckArguments(self):
1378     _CheckOutputFields(static=self._FIELDS_STATIC,
1379                        dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1380                        selected=self.op.output_fields)
1381
1382   def ExpandNames(self):
1383     self.share_locks = ShareAll()
1384
1385     if self.op.nodes:
1386       self.needed_locks = {
1387         locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1388         }
1389     else:
1390       self.needed_locks = {
1391         locking.LEVEL_NODE: locking.ALL_SET,
1392         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1393         }
1394
1395   def CheckPrereq(self):
1396     """Check prerequisites.
1397
1398     """
1399     CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1400
1401   def Exec(self, feedback_fn):
1402     """Computes the list of nodes and their attributes.
1403
1404     """
1405     self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1406
1407     # Always get name to sort by
1408     if constants.SF_NAME in self.op.output_fields:
1409       fields = self.op.output_fields[:]
1410     else:
1411       fields = [constants.SF_NAME] + self.op.output_fields
1412
1413     # Never ask for node or type as it's only known to the LU
1414     for extra in [constants.SF_NODE, constants.SF_TYPE]:
1415       while extra in fields:
1416         fields.remove(extra)
1417
1418     field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1419     name_idx = field_idx[constants.SF_NAME]
1420
1421     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1422     data = self.rpc.call_storage_list(self.node_uuids,
1423                                       self.op.storage_type, st_args,
1424                                       self.op.name, fields)
1425
1426     result = []
1427
1428     for node_uuid in utils.NiceSort(self.node_uuids):
1429       node_name = self.cfg.GetNodeName(node_uuid)
1430       nresult = data[node_uuid]
1431       if nresult.offline:
1432         continue
1433
1434       msg = nresult.fail_msg
1435       if msg:
1436         self.LogWarning("Can't get storage data from node %s: %s",
1437                         node_name, msg)
1438         continue
1439
1440       rows = dict([(row[name_idx], row) for row in nresult.payload])
1441
1442       for name in utils.NiceSort(rows.keys()):
1443         row = rows[name]
1444
1445         out = []
1446
1447         for field in self.op.output_fields:
1448           if field == constants.SF_NODE:
1449             val = node_name
1450           elif field == constants.SF_TYPE:
1451             val = self.op.storage_type
1452           elif field in field_idx:
1453             val = row[field_idx[field]]
1454           else:
1455             raise errors.ParameterError(field)
1456
1457           out.append(val)
1458
1459         result.append(out)
1460
1461     return result
1462
1463
1464 class LUNodeRemove(LogicalUnit):
1465   """Logical unit for removing a node.
1466
1467   """
1468   HPATH = "node-remove"
1469   HTYPE = constants.HTYPE_NODE
1470
1471   def BuildHooksEnv(self):
1472     """Build hooks env.
1473
1474     """
1475     return {
1476       "OP_TARGET": self.op.node_name,
1477       "NODE_NAME": self.op.node_name,
1478       }
1479
1480   def BuildHooksNodes(self):
1481     """Build hooks nodes.
1482
1483     This doesn't run on the target node in the pre phase as a failed
1484     node would then be impossible to remove.
1485
1486     """
1487     all_nodes = self.cfg.GetNodeList()
1488     try:
1489       all_nodes.remove(self.op.node_uuid)
1490     except ValueError:
1491       pass
1492     return (all_nodes, all_nodes)
1493
1494   def CheckPrereq(self):
1495     """Check prerequisites.
1496
1497     This checks:
1498      - the node exists in the configuration
1499      - it does not have primary or secondary instances
1500      - it's not the master
1501
1502     Any errors are signaled by raising errors.OpPrereqError.
1503
1504     """
1505     (self.op.node_uuid, self.op.node_name) = \
1506       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1507     node = self.cfg.GetNodeInfo(self.op.node_uuid)
1508     assert node is not None
1509
1510     masternode = self.cfg.GetMasterNode()
1511     if node.uuid == masternode:
1512       raise errors.OpPrereqError("Node is the master node, failover to another"
1513                                  " node is required", errors.ECODE_INVAL)
1514
1515     for _, instance in self.cfg.GetAllInstancesInfo().items():
1516       if node.uuid in instance.all_nodes:
1517         raise errors.OpPrereqError("Instance %s is still running on the node,"
1518                                    " please remove first" % instance.name,
1519                                    errors.ECODE_INVAL)
1520     self.op.node_name = node.name
1521     self.node = node
1522
1523   def Exec(self, feedback_fn):
1524     """Removes the node from the cluster.
1525
1526     """
1527     logging.info("Stopping the node daemon and removing configs from node %s",
1528                  self.node.name)
1529
1530     modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1531
1532     assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1533       "Not owning BGL"
1534
1535     # Promote nodes to master candidate as needed
1536     AdjustCandidatePool(self, exceptions=[self.node.uuid])
1537     self.context.RemoveNode(self.node)
1538
1539     # Run post hooks on the node before it's removed
1540     RunPostHook(self, self.node.name)
1541
1542     # we have to call this by name rather than by UUID, as the node is no longer
1543     # in the config
1544     result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1545     msg = result.fail_msg
1546     if msg:
1547       self.LogWarning("Errors encountered on the remote node while leaving"
1548                       " the cluster: %s", msg)
1549
1550     # Remove node from our /etc/hosts
1551     if self.cfg.GetClusterInfo().modify_etc_hosts:
1552       master_node_uuid = self.cfg.GetMasterNode()
1553       result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1554                                               constants.ETC_HOSTS_REMOVE,
1555                                               self.node.name, None)
1556       result.Raise("Can't update hosts file with new host data")
1557       RedistributeAncillaryFiles(self)
1558
1559
1560 class LURepairNodeStorage(NoHooksLU):
1561   """Repairs the volume group on a node.
1562
1563   """
1564   REQ_BGL = False
1565
1566   def CheckArguments(self):
1567     (self.op.node_uuid, self.op.node_name) = \
1568       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1569
1570     storage_type = self.op.storage_type
1571
1572     if (constants.SO_FIX_CONSISTENCY not in
1573         constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1574       raise errors.OpPrereqError("Storage units of type '%s' can not be"
1575                                  " repaired" % storage_type,
1576                                  errors.ECODE_INVAL)
1577
1578   def ExpandNames(self):
1579     self.needed_locks = {
1580       locking.LEVEL_NODE: [self.op.node_uuid],
1581       }
1582
1583   def _CheckFaultyDisks(self, instance, node_uuid):
1584     """Ensure faulty disks abort the opcode or at least warn."""
1585     try:
1586       if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1587                                  node_uuid, True):
1588         raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1589                                    " node '%s'" %
1590                                    (instance.name,
1591                                     self.cfg.GetNodeName(node_uuid)),
1592                                    errors.ECODE_STATE)
1593     except errors.OpPrereqError, err:
1594       if self.op.ignore_consistency:
1595         self.LogWarning(str(err.args[0]))
1596       else:
1597         raise
1598
1599   def CheckPrereq(self):
1600     """Check prerequisites.
1601
1602     """
1603     CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1604
1605     # Check whether any instance on this node has faulty disks
1606     for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1607       if not inst.disks_active:
1608         continue
1609       check_nodes = set(inst.all_nodes)
1610       check_nodes.discard(self.op.node_uuid)
1611       for inst_node_uuid in check_nodes:
1612         self._CheckFaultyDisks(inst, inst_node_uuid)
1613
1614   def Exec(self, feedback_fn):
1615     feedback_fn("Repairing storage unit '%s' on %s ..." %
1616                 (self.op.name, self.op.node_name))
1617
1618     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1619     result = self.rpc.call_storage_execute(self.op.node_uuid,
1620                                            self.op.storage_type, st_args,
1621                                            self.op.name,
1622                                            constants.SO_FIX_CONSISTENCY)
1623     result.Raise("Failed to repair storage unit '%s' on %s" %
1624                  (self.op.name, self.op.node_name))