iallocator: prepare RPC call 'node_info'
[ganeti-local] / lib / cmdlib / node.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with nodes."""
23
24 import logging
25 import operator
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti import netutils
31 from ganeti import objects
32 from ganeti import opcodes
33 from ganeti import qlang
34 from ganeti import query
35 from ganeti import rpc
36 from ganeti import utils
37 from ganeti.masterd import iallocator
38
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40   ResultWithJobs
41 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42   MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43   IsExclusiveStorageEnabledNode, CheckNodePVs, \
44   RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45   CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46   AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47   GetWantedNodes, MapInstanceDisksToNodes, RunPostHook, \
48   FindFaultyInstanceDisks
49
50
51 def _DecideSelfPromotion(lu, exceptions=None):
52   """Decide whether I should promote myself as a master candidate.
53
54   """
55   cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56   mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57   # the new node will increase mc_max with one, so:
58   mc_should = min(mc_should + 1, cp_size)
59   return mc_now < mc_should
60
61
62 def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63   """Ensure that a node has the given secondary ip.
64
65   @type lu: L{LogicalUnit}
66   @param lu: the LU on behalf of which we make the check
67   @type node: L{objects.Node}
68   @param node: the node to check
69   @type secondary_ip: string
70   @param secondary_ip: the ip to check
71   @type prereq: boolean
72   @param prereq: whether to throw a prerequisite or an execute error
73   @raise errors.OpPrereqError: if the node doesn't have the ip,
74   and prereq=True
75   @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76
77   """
78   # this can be called with a new node, which has no UUID yet, so perform the
79   # RPC call using its name
80   result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81   result.Raise("Failure checking secondary ip on node %s" % node.name,
82                prereq=prereq, ecode=errors.ECODE_ENVIRON)
83   if not result.payload:
84     msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85            " please fix and re-run this command" % secondary_ip)
86     if prereq:
87       raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88     else:
89       raise errors.OpExecError(msg)
90
91
92 class LUNodeAdd(LogicalUnit):
93   """Logical unit for adding node to the cluster.
94
95   """
96   HPATH = "node-add"
97   HTYPE = constants.HTYPE_NODE
98   _NFLAGS = ["master_capable", "vm_capable"]
99
100   def CheckArguments(self):
101     self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102     # validate/normalize the node name
103     self.hostname = netutils.GetHostname(name=self.op.node_name,
104                                          family=self.primary_ip_family)
105     self.op.node_name = self.hostname.name
106
107     if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108       raise errors.OpPrereqError("Cannot readd the master node",
109                                  errors.ECODE_STATE)
110
111     if self.op.readd and self.op.group:
112       raise errors.OpPrereqError("Cannot pass a node group when a node is"
113                                  " being readded", errors.ECODE_INVAL)
114
115   def BuildHooksEnv(self):
116     """Build hooks env.
117
118     This will run on all nodes before, and on all nodes + the new node after.
119
120     """
121     return {
122       "OP_TARGET": self.op.node_name,
123       "NODE_NAME": self.op.node_name,
124       "NODE_PIP": self.op.primary_ip,
125       "NODE_SIP": self.op.secondary_ip,
126       "MASTER_CAPABLE": str(self.op.master_capable),
127       "VM_CAPABLE": str(self.op.vm_capable),
128       }
129
130   def BuildHooksNodes(self):
131     """Build hooks nodes.
132
133     """
134     hook_nodes = self.cfg.GetNodeList()
135     new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
136     if new_node_info is not None:
137       # Exclude added node
138       hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
139
140     # add the new node as post hook node by name; it does not have an UUID yet
141     return (hook_nodes, hook_nodes, [self.op.node_name, ])
142
143   def CheckPrereq(self):
144     """Check prerequisites.
145
146     This checks:
147      - the new node is not already in the config
148      - it is resolvable
149      - its parameters (single/dual homed) matches the cluster
150
151     Any errors are signaled by raising errors.OpPrereqError.
152
153     """
154     node_name = self.hostname.name
155     self.op.primary_ip = self.hostname.ip
156     if self.op.secondary_ip is None:
157       if self.primary_ip_family == netutils.IP6Address.family:
158         raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
159                                    " IPv4 address must be given as secondary",
160                                    errors.ECODE_INVAL)
161       self.op.secondary_ip = self.op.primary_ip
162
163     secondary_ip = self.op.secondary_ip
164     if not netutils.IP4Address.IsValid(secondary_ip):
165       raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
166                                  " address" % secondary_ip, errors.ECODE_INVAL)
167
168     existing_node_info = self.cfg.GetNodeInfoByName(node_name)
169     if not self.op.readd and existing_node_info is not None:
170       raise errors.OpPrereqError("Node %s is already in the configuration" %
171                                  node_name, errors.ECODE_EXISTS)
172     elif self.op.readd and existing_node_info is None:
173       raise errors.OpPrereqError("Node %s is not in the configuration" %
174                                  node_name, errors.ECODE_NOENT)
175
176     self.changed_primary_ip = False
177
178     for existing_node in self.cfg.GetAllNodesInfo().values():
179       if self.op.readd and node_name == existing_node.name:
180         if existing_node.secondary_ip != secondary_ip:
181           raise errors.OpPrereqError("Readded node doesn't have the same IP"
182                                      " address configuration as before",
183                                      errors.ECODE_INVAL)
184         if existing_node.primary_ip != self.op.primary_ip:
185           self.changed_primary_ip = True
186
187         continue
188
189       if (existing_node.primary_ip == self.op.primary_ip or
190           existing_node.secondary_ip == self.op.primary_ip or
191           existing_node.primary_ip == secondary_ip or
192           existing_node.secondary_ip == secondary_ip):
193         raise errors.OpPrereqError("New node ip address(es) conflict with"
194                                    " existing node %s" % existing_node.name,
195                                    errors.ECODE_NOTUNIQUE)
196
197     # After this 'if' block, None is no longer a valid value for the
198     # _capable op attributes
199     if self.op.readd:
200       assert existing_node_info is not None, \
201         "Can't retrieve locked node %s" % node_name
202       for attr in self._NFLAGS:
203         if getattr(self.op, attr) is None:
204           setattr(self.op, attr, getattr(existing_node_info, attr))
205     else:
206       for attr in self._NFLAGS:
207         if getattr(self.op, attr) is None:
208           setattr(self.op, attr, True)
209
210     if self.op.readd and not self.op.vm_capable:
211       pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
212       if pri or sec:
213         raise errors.OpPrereqError("Node %s being re-added with vm_capable"
214                                    " flag set to false, but it already holds"
215                                    " instances" % node_name,
216                                    errors.ECODE_STATE)
217
218     # check that the type of the node (single versus dual homed) is the
219     # same as for the master
220     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
221     master_singlehomed = myself.secondary_ip == myself.primary_ip
222     newbie_singlehomed = secondary_ip == self.op.primary_ip
223     if master_singlehomed != newbie_singlehomed:
224       if master_singlehomed:
225         raise errors.OpPrereqError("The master has no secondary ip but the"
226                                    " new node has one",
227                                    errors.ECODE_INVAL)
228       else:
229         raise errors.OpPrereqError("The master has a secondary ip but the"
230                                    " new node doesn't have one",
231                                    errors.ECODE_INVAL)
232
233     # checks reachability
234     if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
235       raise errors.OpPrereqError("Node not reachable by ping",
236                                  errors.ECODE_ENVIRON)
237
238     if not newbie_singlehomed:
239       # check reachability from my secondary ip to newbie's secondary ip
240       if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
241                               source=myself.secondary_ip):
242         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
243                                    " based ping to node daemon port",
244                                    errors.ECODE_ENVIRON)
245
246     if self.op.readd:
247       exceptions = [existing_node_info.uuid]
248     else:
249       exceptions = []
250
251     if self.op.master_capable:
252       self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
253     else:
254       self.master_candidate = False
255
256     if self.op.readd:
257       self.new_node = existing_node_info
258     else:
259       node_group = self.cfg.LookupNodeGroup(self.op.group)
260       self.new_node = objects.Node(name=node_name,
261                                    primary_ip=self.op.primary_ip,
262                                    secondary_ip=secondary_ip,
263                                    master_candidate=self.master_candidate,
264                                    offline=False, drained=False,
265                                    group=node_group, ndparams={})
266
267     if self.op.ndparams:
268       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
269       CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
270                            "node", "cluster or group")
271
272     if self.op.hv_state:
273       self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
274
275     if self.op.disk_state:
276       self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
277
278     # TODO: If we need to have multiple DnsOnlyRunner we probably should make
279     #       it a property on the base class.
280     rpcrunner = rpc.DnsOnlyRunner()
281     result = rpcrunner.call_version([node_name])[node_name]
282     result.Raise("Can't get version information from node %s" % node_name)
283     if constants.PROTOCOL_VERSION == result.payload:
284       logging.info("Communication to node %s fine, sw version %s match",
285                    node_name, result.payload)
286     else:
287       raise errors.OpPrereqError("Version mismatch master version %s,"
288                                  " node version %s" %
289                                  (constants.PROTOCOL_VERSION, result.payload),
290                                  errors.ECODE_ENVIRON)
291
292     vg_name = self.cfg.GetVGName()
293     if vg_name is not None:
294       vparams = {constants.NV_PVLIST: [vg_name]}
295       excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
296       cname = self.cfg.GetClusterName()
297       result = rpcrunner.call_node_verify_light(
298           [node_name], vparams, cname,
299           self.cfg.GetClusterInfo().hvparams)[node_name]
300       (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
301       if errmsgs:
302         raise errors.OpPrereqError("Checks on node PVs failed: %s" %
303                                    "; ".join(errmsgs), errors.ECODE_ENVIRON)
304
305   def Exec(self, feedback_fn):
306     """Adds the new node to the cluster.
307
308     """
309     assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
310       "Not owning BGL"
311
312     # We adding a new node so we assume it's powered
313     self.new_node.powered = True
314
315     # for re-adds, reset the offline/drained/master-candidate flags;
316     # we need to reset here, otherwise offline would prevent RPC calls
317     # later in the procedure; this also means that if the re-add
318     # fails, we are left with a non-offlined, broken node
319     if self.op.readd:
320       self.new_node.drained = False
321       self.LogInfo("Readding a node, the offline/drained flags were reset")
322       # if we demote the node, we do cleanup later in the procedure
323       self.new_node.master_candidate = self.master_candidate
324       if self.changed_primary_ip:
325         self.new_node.primary_ip = self.op.primary_ip
326
327     # copy the master/vm_capable flags
328     for attr in self._NFLAGS:
329       setattr(self.new_node, attr, getattr(self.op, attr))
330
331     # notify the user about any possible mc promotion
332     if self.new_node.master_candidate:
333       self.LogInfo("Node will be a master candidate")
334
335     if self.op.ndparams:
336       self.new_node.ndparams = self.op.ndparams
337     else:
338       self.new_node.ndparams = {}
339
340     if self.op.hv_state:
341       self.new_node.hv_state_static = self.new_hv_state
342
343     if self.op.disk_state:
344       self.new_node.disk_state_static = self.new_disk_state
345
346     # Add node to our /etc/hosts, and add key to known_hosts
347     if self.cfg.GetClusterInfo().modify_etc_hosts:
348       master_node = self.cfg.GetMasterNode()
349       result = self.rpc.call_etc_hosts_modify(
350                  master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
351                  self.hostname.ip)
352       result.Raise("Can't update hosts file with new host data")
353
354     if self.new_node.secondary_ip != self.new_node.primary_ip:
355       _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
356                                False)
357
358     node_verifier_uuids = [self.cfg.GetMasterNode()]
359     node_verify_param = {
360       constants.NV_NODELIST: ([self.new_node.name], {}),
361       # TODO: do a node-net-test as well?
362     }
363
364     result = self.rpc.call_node_verify(
365                node_verifier_uuids, node_verify_param,
366                self.cfg.GetClusterName(),
367                self.cfg.GetClusterInfo().hvparams)
368     for verifier in node_verifier_uuids:
369       result[verifier].Raise("Cannot communicate with node %s" % verifier)
370       nl_payload = result[verifier].payload[constants.NV_NODELIST]
371       if nl_payload:
372         for failed in nl_payload:
373           feedback_fn("ssh/hostname verification failed"
374                       " (checking from %s): %s" %
375                       (verifier, nl_payload[failed]))
376         raise errors.OpExecError("ssh/hostname verification failed")
377
378     if self.op.readd:
379       self.context.ReaddNode(self.new_node)
380       RedistributeAncillaryFiles(self)
381       # make sure we redistribute the config
382       self.cfg.Update(self.new_node, feedback_fn)
383       # and make sure the new node will not have old files around
384       if not self.new_node.master_candidate:
385         result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
386         result.Warn("Node failed to demote itself from master candidate status",
387                     self.LogWarning)
388     else:
389       self.context.AddNode(self.new_node, self.proc.GetECId())
390       RedistributeAncillaryFiles(self)
391
392
393 class LUNodeSetParams(LogicalUnit):
394   """Modifies the parameters of a node.
395
396   @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
397       to the node role (as _ROLE_*)
398   @cvar _R2F: a dictionary from node role to tuples of flags
399   @cvar _FLAGS: a list of attribute names corresponding to the flags
400
401   """
402   HPATH = "node-modify"
403   HTYPE = constants.HTYPE_NODE
404   REQ_BGL = False
405   (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
406   _F2R = {
407     (True, False, False): _ROLE_CANDIDATE,
408     (False, True, False): _ROLE_DRAINED,
409     (False, False, True): _ROLE_OFFLINE,
410     (False, False, False): _ROLE_REGULAR,
411     }
412   _R2F = dict((v, k) for k, v in _F2R.items())
413   _FLAGS = ["master_candidate", "drained", "offline"]
414
415   def CheckArguments(self):
416     (self.op.node_uuid, self.op.node_name) = \
417       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
418     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
419                 self.op.master_capable, self.op.vm_capable,
420                 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
421                 self.op.disk_state]
422     if all_mods.count(None) == len(all_mods):
423       raise errors.OpPrereqError("Please pass at least one modification",
424                                  errors.ECODE_INVAL)
425     if all_mods.count(True) > 1:
426       raise errors.OpPrereqError("Can't set the node into more than one"
427                                  " state at the same time",
428                                  errors.ECODE_INVAL)
429
430     # Boolean value that tells us whether we might be demoting from MC
431     self.might_demote = (self.op.master_candidate is False or
432                          self.op.offline is True or
433                          self.op.drained is True or
434                          self.op.master_capable is False)
435
436     if self.op.secondary_ip:
437       if not netutils.IP4Address.IsValid(self.op.secondary_ip):
438         raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
439                                    " address" % self.op.secondary_ip,
440                                    errors.ECODE_INVAL)
441
442     self.lock_all = self.op.auto_promote and self.might_demote
443     self.lock_instances = self.op.secondary_ip is not None
444
445   def _InstanceFilter(self, instance):
446     """Filter for getting affected instances.
447
448     """
449     return (instance.disk_template in constants.DTS_INT_MIRROR and
450             self.op.node_uuid in instance.all_nodes)
451
452   def ExpandNames(self):
453     if self.lock_all:
454       self.needed_locks = {
455         locking.LEVEL_NODE: locking.ALL_SET,
456
457         # Block allocations when all nodes are locked
458         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
459         }
460     else:
461       self.needed_locks = {
462         locking.LEVEL_NODE: self.op.node_uuid,
463         }
464
465     # Since modifying a node can have severe effects on currently running
466     # operations the resource lock is at least acquired in shared mode
467     self.needed_locks[locking.LEVEL_NODE_RES] = \
468       self.needed_locks[locking.LEVEL_NODE]
469
470     # Get all locks except nodes in shared mode; they are not used for anything
471     # but read-only access
472     self.share_locks = ShareAll()
473     self.share_locks[locking.LEVEL_NODE] = 0
474     self.share_locks[locking.LEVEL_NODE_RES] = 0
475     self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
476
477     if self.lock_instances:
478       self.needed_locks[locking.LEVEL_INSTANCE] = \
479         self.cfg.GetInstanceNames(
480           self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
481
482   def BuildHooksEnv(self):
483     """Build hooks env.
484
485     This runs on the master node.
486
487     """
488     return {
489       "OP_TARGET": self.op.node_name,
490       "MASTER_CANDIDATE": str(self.op.master_candidate),
491       "OFFLINE": str(self.op.offline),
492       "DRAINED": str(self.op.drained),
493       "MASTER_CAPABLE": str(self.op.master_capable),
494       "VM_CAPABLE": str(self.op.vm_capable),
495       }
496
497   def BuildHooksNodes(self):
498     """Build hooks nodes.
499
500     """
501     nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
502     return (nl, nl)
503
504   def CheckPrereq(self):
505     """Check prerequisites.
506
507     This only checks the instance list against the existing names.
508
509     """
510     node = self.cfg.GetNodeInfo(self.op.node_uuid)
511     if self.lock_instances:
512       affected_instances = \
513         self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
514
515       # Verify instance locks
516       owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
517       wanted_instance_names = frozenset([inst.name for inst in
518                                          affected_instances.values()])
519       if wanted_instance_names - owned_instance_names:
520         raise errors.OpPrereqError("Instances affected by changing node %s's"
521                                    " secondary IP address have changed since"
522                                    " locks were acquired, wanted '%s', have"
523                                    " '%s'; retry the operation" %
524                                    (node.name,
525                                     utils.CommaJoin(wanted_instance_names),
526                                     utils.CommaJoin(owned_instance_names)),
527                                    errors.ECODE_STATE)
528     else:
529       affected_instances = None
530
531     if (self.op.master_candidate is not None or
532         self.op.drained is not None or
533         self.op.offline is not None):
534       # we can't change the master's node flags
535       if node.uuid == self.cfg.GetMasterNode():
536         raise errors.OpPrereqError("The master role can be changed"
537                                    " only via master-failover",
538                                    errors.ECODE_INVAL)
539
540     if self.op.master_candidate and not node.master_capable:
541       raise errors.OpPrereqError("Node %s is not master capable, cannot make"
542                                  " it a master candidate" % node.name,
543                                  errors.ECODE_STATE)
544
545     if self.op.vm_capable is False:
546       (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
547       if ipri or isec:
548         raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
549                                    " the vm_capable flag" % node.name,
550                                    errors.ECODE_STATE)
551
552     if node.master_candidate and self.might_demote and not self.lock_all:
553       assert not self.op.auto_promote, "auto_promote set but lock_all not"
554       # check if after removing the current node, we're missing master
555       # candidates
556       (mc_remaining, mc_should, _) = \
557           self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
558       if mc_remaining < mc_should:
559         raise errors.OpPrereqError("Not enough master candidates, please"
560                                    " pass auto promote option to allow"
561                                    " promotion (--auto-promote or RAPI"
562                                    " auto_promote=True)", errors.ECODE_STATE)
563
564     self.old_flags = old_flags = (node.master_candidate,
565                                   node.drained, node.offline)
566     assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
567     self.old_role = old_role = self._F2R[old_flags]
568
569     # Check for ineffective changes
570     for attr in self._FLAGS:
571       if getattr(self.op, attr) is False and getattr(node, attr) is False:
572         self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
573         setattr(self.op, attr, None)
574
575     # Past this point, any flag change to False means a transition
576     # away from the respective state, as only real changes are kept
577
578     # TODO: We might query the real power state if it supports OOB
579     if SupportsOob(self.cfg, node):
580       if self.op.offline is False and not (node.powered or
581                                            self.op.powered is True):
582         raise errors.OpPrereqError(("Node %s needs to be turned on before its"
583                                     " offline status can be reset") %
584                                    self.op.node_name, errors.ECODE_STATE)
585     elif self.op.powered is not None:
586       raise errors.OpPrereqError(("Unable to change powered state for node %s"
587                                   " as it does not support out-of-band"
588                                   " handling") % self.op.node_name,
589                                  errors.ECODE_STATE)
590
591     # If we're being deofflined/drained, we'll MC ourself if needed
592     if (self.op.drained is False or self.op.offline is False or
593         (self.op.master_capable and not node.master_capable)):
594       if _DecideSelfPromotion(self):
595         self.op.master_candidate = True
596         self.LogInfo("Auto-promoting node to master candidate")
597
598     # If we're no longer master capable, we'll demote ourselves from MC
599     if self.op.master_capable is False and node.master_candidate:
600       self.LogInfo("Demoting from master candidate")
601       self.op.master_candidate = False
602
603     # Compute new role
604     assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
605     if self.op.master_candidate:
606       new_role = self._ROLE_CANDIDATE
607     elif self.op.drained:
608       new_role = self._ROLE_DRAINED
609     elif self.op.offline:
610       new_role = self._ROLE_OFFLINE
611     elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
612       # False is still in new flags, which means we're un-setting (the
613       # only) True flag
614       new_role = self._ROLE_REGULAR
615     else: # no new flags, nothing, keep old role
616       new_role = old_role
617
618     self.new_role = new_role
619
620     if old_role == self._ROLE_OFFLINE and new_role != old_role:
621       # Trying to transition out of offline status
622       result = self.rpc.call_version([node.uuid])[node.uuid]
623       if result.fail_msg:
624         raise errors.OpPrereqError("Node %s is being de-offlined but fails"
625                                    " to report its version: %s" %
626                                    (node.name, result.fail_msg),
627                                    errors.ECODE_STATE)
628       else:
629         self.LogWarning("Transitioning node from offline to online state"
630                         " without using re-add. Please make sure the node"
631                         " is healthy!")
632
633     # When changing the secondary ip, verify if this is a single-homed to
634     # multi-homed transition or vice versa, and apply the relevant
635     # restrictions.
636     if self.op.secondary_ip:
637       # Ok even without locking, because this can't be changed by any LU
638       master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
639       master_singlehomed = master.secondary_ip == master.primary_ip
640       if master_singlehomed and self.op.secondary_ip != node.primary_ip:
641         if self.op.force and node.uuid == master.uuid:
642           self.LogWarning("Transitioning from single-homed to multi-homed"
643                           " cluster; all nodes will require a secondary IP"
644                           " address")
645         else:
646           raise errors.OpPrereqError("Changing the secondary ip on a"
647                                      " single-homed cluster requires the"
648                                      " --force option to be passed, and the"
649                                      " target node to be the master",
650                                      errors.ECODE_INVAL)
651       elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
652         if self.op.force and node.uuid == master.uuid:
653           self.LogWarning("Transitioning from multi-homed to single-homed"
654                           " cluster; secondary IP addresses will have to be"
655                           " removed")
656         else:
657           raise errors.OpPrereqError("Cannot set the secondary IP to be the"
658                                      " same as the primary IP on a multi-homed"
659                                      " cluster, unless the --force option is"
660                                      " passed, and the target node is the"
661                                      " master", errors.ECODE_INVAL)
662
663       assert not (set([inst.name for inst in affected_instances.values()]) -
664                   self.owned_locks(locking.LEVEL_INSTANCE))
665
666       if node.offline:
667         if affected_instances:
668           msg = ("Cannot change secondary IP address: offline node has"
669                  " instances (%s) configured to use it" %
670                  utils.CommaJoin(
671                    [inst.name for inst in affected_instances.values()]))
672           raise errors.OpPrereqError(msg, errors.ECODE_STATE)
673       else:
674         # On online nodes, check that no instances are running, and that
675         # the node has the new ip and we can reach it.
676         for instance in affected_instances.values():
677           CheckInstanceState(self, instance, INSTANCE_DOWN,
678                              msg="cannot change secondary ip")
679
680         _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
681         if master.uuid != node.uuid:
682           # check reachability from master secondary ip to new secondary ip
683           if not netutils.TcpPing(self.op.secondary_ip,
684                                   constants.DEFAULT_NODED_PORT,
685                                   source=master.secondary_ip):
686             raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
687                                        " based ping to node daemon port",
688                                        errors.ECODE_ENVIRON)
689
690     if self.op.ndparams:
691       new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
692       utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
693       CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
694                            "node", "cluster or group")
695       self.new_ndparams = new_ndparams
696
697     if self.op.hv_state:
698       self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
699                                                 node.hv_state_static)
700
701     if self.op.disk_state:
702       self.new_disk_state = \
703         MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
704
705   def Exec(self, feedback_fn):
706     """Modifies a node.
707
708     """
709     node = self.cfg.GetNodeInfo(self.op.node_uuid)
710     result = []
711
712     if self.op.ndparams:
713       node.ndparams = self.new_ndparams
714
715     if self.op.powered is not None:
716       node.powered = self.op.powered
717
718     if self.op.hv_state:
719       node.hv_state_static = self.new_hv_state
720
721     if self.op.disk_state:
722       node.disk_state_static = self.new_disk_state
723
724     for attr in ["master_capable", "vm_capable"]:
725       val = getattr(self.op, attr)
726       if val is not None:
727         setattr(node, attr, val)
728         result.append((attr, str(val)))
729
730     if self.new_role != self.old_role:
731       # Tell the node to demote itself, if no longer MC and not offline
732       if self.old_role == self._ROLE_CANDIDATE and \
733           self.new_role != self._ROLE_OFFLINE:
734         msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
735         if msg:
736           self.LogWarning("Node failed to demote itself: %s", msg)
737
738       new_flags = self._R2F[self.new_role]
739       for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
740         if of != nf:
741           result.append((desc, str(nf)))
742       (node.master_candidate, node.drained, node.offline) = new_flags
743
744       # we locked all nodes, we adjust the CP before updating this node
745       if self.lock_all:
746         AdjustCandidatePool(self, [node.uuid])
747
748     if self.op.secondary_ip:
749       node.secondary_ip = self.op.secondary_ip
750       result.append(("secondary_ip", self.op.secondary_ip))
751
752     # this will trigger configuration file update, if needed
753     self.cfg.Update(node, feedback_fn)
754
755     # this will trigger job queue propagation or cleanup if the mc
756     # flag changed
757     if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
758       self.context.ReaddNode(node)
759
760     return result
761
762
763 class LUNodePowercycle(NoHooksLU):
764   """Powercycles a node.
765
766   """
767   REQ_BGL = False
768
769   def CheckArguments(self):
770     (self.op.node_uuid, self.op.node_name) = \
771       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
772
773     if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
774       raise errors.OpPrereqError("The node is the master and the force"
775                                  " parameter was not set",
776                                  errors.ECODE_INVAL)
777
778   def ExpandNames(self):
779     """Locking for PowercycleNode.
780
781     This is a last-resort option and shouldn't block on other
782     jobs. Therefore, we grab no locks.
783
784     """
785     self.needed_locks = {}
786
787   def Exec(self, feedback_fn):
788     """Reboots a node.
789
790     """
791     default_hypervisor = self.cfg.GetHypervisorType()
792     hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
793     result = self.rpc.call_node_powercycle(self.op.node_uuid,
794                                            default_hypervisor,
795                                            hvparams)
796     result.Raise("Failed to schedule the reboot")
797     return result.payload
798
799
800 def _GetNodeInstancesInner(cfg, fn):
801   return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
802
803
804 def _GetNodePrimaryInstances(cfg, node_uuid):
805   """Returns primary instances on a node.
806
807   """
808   return _GetNodeInstancesInner(cfg,
809                                 lambda inst: node_uuid == inst.primary_node)
810
811
812 def _GetNodeSecondaryInstances(cfg, node_uuid):
813   """Returns secondary instances on a node.
814
815   """
816   return _GetNodeInstancesInner(cfg,
817                                 lambda inst: node_uuid in inst.secondary_nodes)
818
819
820 def _GetNodeInstances(cfg, node_uuid):
821   """Returns a list of all primary and secondary instances on a node.
822
823   """
824
825   return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
826
827
828 class LUNodeEvacuate(NoHooksLU):
829   """Evacuates instances off a list of nodes.
830
831   """
832   REQ_BGL = False
833
834   _MODE2IALLOCATOR = {
835     constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
836     constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
837     constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
838     }
839   assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
840   assert (frozenset(_MODE2IALLOCATOR.values()) ==
841           constants.IALLOCATOR_NEVAC_MODES)
842
843   def CheckArguments(self):
844     CheckIAllocatorOrNode(self, "iallocator", "remote_node")
845
846   def ExpandNames(self):
847     (self.op.node_uuid, self.op.node_name) = \
848       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
849
850     if self.op.remote_node is not None:
851       (self.op.remote_node_uuid, self.op.remote_node) = \
852         ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
853                               self.op.remote_node)
854       assert self.op.remote_node
855
856       if self.op.node_uuid == self.op.remote_node_uuid:
857         raise errors.OpPrereqError("Can not use evacuated node as a new"
858                                    " secondary node", errors.ECODE_INVAL)
859
860       if self.op.mode != constants.NODE_EVAC_SEC:
861         raise errors.OpPrereqError("Without the use of an iallocator only"
862                                    " secondary instances can be evacuated",
863                                    errors.ECODE_INVAL)
864
865     # Declare locks
866     self.share_locks = ShareAll()
867     self.needed_locks = {
868       locking.LEVEL_INSTANCE: [],
869       locking.LEVEL_NODEGROUP: [],
870       locking.LEVEL_NODE: [],
871       }
872
873     # Determine nodes (via group) optimistically, needs verification once locks
874     # have been acquired
875     self.lock_nodes = self._DetermineNodes()
876
877   def _DetermineNodes(self):
878     """Gets the list of node UUIDs to operate on.
879
880     """
881     if self.op.remote_node is None:
882       # Iallocator will choose any node(s) in the same group
883       group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
884     else:
885       group_nodes = frozenset([self.op.remote_node_uuid])
886
887     # Determine nodes to be locked
888     return set([self.op.node_uuid]) | group_nodes
889
890   def _DetermineInstances(self):
891     """Builds list of instances to operate on.
892
893     """
894     assert self.op.mode in constants.NODE_EVAC_MODES
895
896     if self.op.mode == constants.NODE_EVAC_PRI:
897       # Primary instances only
898       inst_fn = _GetNodePrimaryInstances
899       assert self.op.remote_node is None, \
900         "Evacuating primary instances requires iallocator"
901     elif self.op.mode == constants.NODE_EVAC_SEC:
902       # Secondary instances only
903       inst_fn = _GetNodeSecondaryInstances
904     else:
905       # All instances
906       assert self.op.mode == constants.NODE_EVAC_ALL
907       inst_fn = _GetNodeInstances
908       # TODO: In 2.6, change the iallocator interface to take an evacuation mode
909       # per instance
910       raise errors.OpPrereqError("Due to an issue with the iallocator"
911                                  " interface it is not possible to evacuate"
912                                  " all instances at once; specify explicitly"
913                                  " whether to evacuate primary or secondary"
914                                  " instances",
915                                  errors.ECODE_INVAL)
916
917     return inst_fn(self.cfg, self.op.node_uuid)
918
919   def DeclareLocks(self, level):
920     if level == locking.LEVEL_INSTANCE:
921       # Lock instances optimistically, needs verification once node and group
922       # locks have been acquired
923       self.needed_locks[locking.LEVEL_INSTANCE] = \
924         set(i.name for i in self._DetermineInstances())
925
926     elif level == locking.LEVEL_NODEGROUP:
927       # Lock node groups for all potential target nodes optimistically, needs
928       # verification once nodes have been acquired
929       self.needed_locks[locking.LEVEL_NODEGROUP] = \
930         self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
931
932     elif level == locking.LEVEL_NODE:
933       self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
934
935   def CheckPrereq(self):
936     # Verify locks
937     owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
938     owned_nodes = self.owned_locks(locking.LEVEL_NODE)
939     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
940
941     need_nodes = self._DetermineNodes()
942
943     if not owned_nodes.issuperset(need_nodes):
944       raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
945                                  " locks were acquired, current nodes are"
946                                  " are '%s', used to be '%s'; retry the"
947                                  " operation" %
948                                  (self.op.node_name,
949                                   utils.CommaJoin(need_nodes),
950                                   utils.CommaJoin(owned_nodes)),
951                                  errors.ECODE_STATE)
952
953     wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
954     if owned_groups != wanted_groups:
955       raise errors.OpExecError("Node groups changed since locks were acquired,"
956                                " current groups are '%s', used to be '%s';"
957                                " retry the operation" %
958                                (utils.CommaJoin(wanted_groups),
959                                 utils.CommaJoin(owned_groups)))
960
961     # Determine affected instances
962     self.instances = self._DetermineInstances()
963     self.instance_names = [i.name for i in self.instances]
964
965     if set(self.instance_names) != owned_instance_names:
966       raise errors.OpExecError("Instances on node '%s' changed since locks"
967                                " were acquired, current instances are '%s',"
968                                " used to be '%s'; retry the operation" %
969                                (self.op.node_name,
970                                 utils.CommaJoin(self.instance_names),
971                                 utils.CommaJoin(owned_instance_names)))
972
973     if self.instance_names:
974       self.LogInfo("Evacuating instances from node '%s': %s",
975                    self.op.node_name,
976                    utils.CommaJoin(utils.NiceSort(self.instance_names)))
977     else:
978       self.LogInfo("No instances to evacuate from node '%s'",
979                    self.op.node_name)
980
981     if self.op.remote_node is not None:
982       for i in self.instances:
983         if i.primary_node == self.op.remote_node_uuid:
984           raise errors.OpPrereqError("Node %s is the primary node of"
985                                      " instance %s, cannot use it as"
986                                      " secondary" %
987                                      (self.op.remote_node, i.name),
988                                      errors.ECODE_INVAL)
989
990   def Exec(self, feedback_fn):
991     assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
992
993     if not self.instance_names:
994       # No instances to evacuate
995       jobs = []
996
997     elif self.op.iallocator is not None:
998       # TODO: Implement relocation to other group
999       evac_mode = self._MODE2IALLOCATOR[self.op.mode]
1000       req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
1001                                      instances=list(self.instance_names))
1002       ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1003
1004       ial.Run(self.op.iallocator)
1005
1006       if not ial.success:
1007         raise errors.OpPrereqError("Can't compute node evacuation using"
1008                                    " iallocator '%s': %s" %
1009                                    (self.op.iallocator, ial.info),
1010                                    errors.ECODE_NORES)
1011
1012       jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1013
1014     elif self.op.remote_node is not None:
1015       assert self.op.mode == constants.NODE_EVAC_SEC
1016       jobs = [
1017         [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1018                                         remote_node=self.op.remote_node,
1019                                         disks=[],
1020                                         mode=constants.REPLACE_DISK_CHG,
1021                                         early_release=self.op.early_release)]
1022         for instance_name in self.instance_names]
1023
1024     else:
1025       raise errors.ProgrammerError("No iallocator or remote node")
1026
1027     return ResultWithJobs(jobs)
1028
1029
1030 class LUNodeMigrate(LogicalUnit):
1031   """Migrate all instances from a node.
1032
1033   """
1034   HPATH = "node-migrate"
1035   HTYPE = constants.HTYPE_NODE
1036   REQ_BGL = False
1037
1038   def CheckArguments(self):
1039     pass
1040
1041   def ExpandNames(self):
1042     (self.op.node_uuid, self.op.node_name) = \
1043       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1044
1045     self.share_locks = ShareAll()
1046     self.needed_locks = {
1047       locking.LEVEL_NODE: [self.op.node_uuid],
1048       }
1049
1050   def BuildHooksEnv(self):
1051     """Build hooks env.
1052
1053     This runs on the master, the primary and all the secondaries.
1054
1055     """
1056     return {
1057       "NODE_NAME": self.op.node_name,
1058       "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1059       }
1060
1061   def BuildHooksNodes(self):
1062     """Build hooks nodes.
1063
1064     """
1065     nl = [self.cfg.GetMasterNode()]
1066     return (nl, nl)
1067
1068   def CheckPrereq(self):
1069     pass
1070
1071   def Exec(self, feedback_fn):
1072     # Prepare jobs for migration instances
1073     jobs = [
1074       [opcodes.OpInstanceMigrate(
1075         instance_name=inst.name,
1076         mode=self.op.mode,
1077         live=self.op.live,
1078         iallocator=self.op.iallocator,
1079         target_node=self.op.target_node,
1080         allow_runtime_changes=self.op.allow_runtime_changes,
1081         ignore_ipolicy=self.op.ignore_ipolicy)]
1082       for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1083
1084     # TODO: Run iallocator in this opcode and pass correct placement options to
1085     # OpInstanceMigrate. Since other jobs can modify the cluster between
1086     # running the iallocator and the actual migration, a good consistency model
1087     # will have to be found.
1088
1089     assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1090             frozenset([self.op.node_uuid]))
1091
1092     return ResultWithJobs(jobs)
1093
1094
1095 def _GetStorageTypeArgs(cfg, storage_type):
1096   """Returns the arguments for a storage type.
1097
1098   """
1099   # Special case for file storage
1100   if storage_type == constants.ST_FILE:
1101     # storage.FileStorage wants a list of storage directories
1102     return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1103
1104   return []
1105
1106
1107 class LUNodeModifyStorage(NoHooksLU):
1108   """Logical unit for modifying a storage volume on a node.
1109
1110   """
1111   REQ_BGL = False
1112
1113   def CheckArguments(self):
1114     (self.op.node_uuid, self.op.node_name) = \
1115       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1116
1117     storage_type = self.op.storage_type
1118
1119     try:
1120       modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1121     except KeyError:
1122       raise errors.OpPrereqError("Storage units of type '%s' can not be"
1123                                  " modified" % storage_type,
1124                                  errors.ECODE_INVAL)
1125
1126     diff = set(self.op.changes.keys()) - modifiable
1127     if diff:
1128       raise errors.OpPrereqError("The following fields can not be modified for"
1129                                  " storage units of type '%s': %r" %
1130                                  (storage_type, list(diff)),
1131                                  errors.ECODE_INVAL)
1132
1133   def ExpandNames(self):
1134     self.needed_locks = {
1135       locking.LEVEL_NODE: self.op.node_uuid,
1136       }
1137
1138   def Exec(self, feedback_fn):
1139     """Computes the list of nodes and their attributes.
1140
1141     """
1142     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1143     result = self.rpc.call_storage_modify(self.op.node_uuid,
1144                                           self.op.storage_type, st_args,
1145                                           self.op.name, self.op.changes)
1146     result.Raise("Failed to modify storage unit '%s' on %s" %
1147                  (self.op.name, self.op.node_name))
1148
1149
1150 class NodeQuery(QueryBase):
1151   FIELDS = query.NODE_FIELDS
1152
1153   def ExpandNames(self, lu):
1154     lu.needed_locks = {}
1155     lu.share_locks = ShareAll()
1156
1157     if self.names:
1158       (self.wanted, _) = GetWantedNodes(lu, self.names)
1159     else:
1160       self.wanted = locking.ALL_SET
1161
1162     self.do_locking = (self.use_locking and
1163                        query.NQ_LIVE in self.requested_data)
1164
1165     if self.do_locking:
1166       # If any non-static field is requested we need to lock the nodes
1167       lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1168       lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1169
1170   def DeclareLocks(self, lu, level):
1171     pass
1172
1173   def _GetQueryData(self, lu):
1174     """Computes the list of nodes and their attributes.
1175
1176     """
1177     all_info = lu.cfg.GetAllNodesInfo()
1178
1179     node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1180
1181     # Gather data as requested
1182     if query.NQ_LIVE in self.requested_data:
1183       # filter out non-vm_capable nodes
1184       toquery_node_uuids = [node.uuid for node in all_info.values()
1185                             if node.vm_capable and node.uuid in node_uuids]
1186
1187       es_flags = rpc.GetExclusiveStorageForNodes(lu.cfg, toquery_node_uuids)
1188       # FIXME: This currently maps everything to lvm, this should be more
1189       # flexible
1190       lvm_enabled = utils.storage.IsLvmEnabled(
1191           lu.cfg.GetClusterInfo().enabled_disk_templates)
1192       storage_units = utils.storage.GetStorageUnitsOfCluster(
1193           lu.cfg, include_spindles=True)
1194       default_hypervisor = lu.cfg.GetHypervisorType()
1195       hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1196       hvspecs = [(default_hypervisor, hvparams)]
1197       node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1198                                         hvspecs, es_flags)
1199       live_data = dict(
1200           (uuid, rpc.MakeLegacyNodeInfo(nresult.payload,
1201                                         require_vg_info=lvm_enabled))
1202           for (uuid, nresult) in node_data.items()
1203           if not nresult.fail_msg and nresult.payload)
1204     else:
1205       live_data = None
1206
1207     if query.NQ_INST in self.requested_data:
1208       node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1209       node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1210
1211       inst_data = lu.cfg.GetAllInstancesInfo()
1212       inst_uuid_to_inst_name = {}
1213
1214       for inst in inst_data.values():
1215         inst_uuid_to_inst_name[inst.uuid] = inst.name
1216         if inst.primary_node in node_to_primary:
1217           node_to_primary[inst.primary_node].add(inst.uuid)
1218         for secnode in inst.secondary_nodes:
1219           if secnode in node_to_secondary:
1220             node_to_secondary[secnode].add(inst.uuid)
1221     else:
1222       node_to_primary = None
1223       node_to_secondary = None
1224       inst_uuid_to_inst_name = None
1225
1226     if query.NQ_OOB in self.requested_data:
1227       oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1228                          for uuid, node in all_info.iteritems())
1229     else:
1230       oob_support = None
1231
1232     if query.NQ_GROUP in self.requested_data:
1233       groups = lu.cfg.GetAllNodeGroupsInfo()
1234     else:
1235       groups = {}
1236
1237     return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1238                                live_data, lu.cfg.GetMasterNode(),
1239                                node_to_primary, node_to_secondary,
1240                                inst_uuid_to_inst_name, groups, oob_support,
1241                                lu.cfg.GetClusterInfo())
1242
1243
1244 class LUNodeQuery(NoHooksLU):
1245   """Logical unit for querying nodes.
1246
1247   """
1248   # pylint: disable=W0142
1249   REQ_BGL = False
1250
1251   def CheckArguments(self):
1252     self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1253                          self.op.output_fields, self.op.use_locking)
1254
1255   def ExpandNames(self):
1256     self.nq.ExpandNames(self)
1257
1258   def DeclareLocks(self, level):
1259     self.nq.DeclareLocks(self, level)
1260
1261   def Exec(self, feedback_fn):
1262     return self.nq.OldStyleQuery(self)
1263
1264
1265 def _CheckOutputFields(static, dynamic, selected):
1266   """Checks whether all selected fields are valid.
1267
1268   @type static: L{utils.FieldSet}
1269   @param static: static fields set
1270   @type dynamic: L{utils.FieldSet}
1271   @param dynamic: dynamic fields set
1272
1273   """
1274   f = utils.FieldSet()
1275   f.Extend(static)
1276   f.Extend(dynamic)
1277
1278   delta = f.NonMatching(selected)
1279   if delta:
1280     raise errors.OpPrereqError("Unknown output fields selected: %s"
1281                                % ",".join(delta), errors.ECODE_INVAL)
1282
1283
1284 class LUNodeQueryvols(NoHooksLU):
1285   """Logical unit for getting volumes on node(s).
1286
1287   """
1288   REQ_BGL = False
1289   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1290   _FIELDS_STATIC = utils.FieldSet("node")
1291
1292   def CheckArguments(self):
1293     _CheckOutputFields(static=self._FIELDS_STATIC,
1294                        dynamic=self._FIELDS_DYNAMIC,
1295                        selected=self.op.output_fields)
1296
1297   def ExpandNames(self):
1298     self.share_locks = ShareAll()
1299
1300     if self.op.nodes:
1301       self.needed_locks = {
1302         locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1303         }
1304     else:
1305       self.needed_locks = {
1306         locking.LEVEL_NODE: locking.ALL_SET,
1307         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1308         }
1309
1310   def Exec(self, feedback_fn):
1311     """Computes the list of nodes and their attributes.
1312
1313     """
1314     node_uuids = self.owned_locks(locking.LEVEL_NODE)
1315     volumes = self.rpc.call_node_volumes(node_uuids)
1316
1317     ilist = self.cfg.GetAllInstancesInfo()
1318     vol2inst = MapInstanceDisksToNodes(ilist.values())
1319
1320     output = []
1321     for node_uuid in node_uuids:
1322       nresult = volumes[node_uuid]
1323       if nresult.offline:
1324         continue
1325       msg = nresult.fail_msg
1326       if msg:
1327         self.LogWarning("Can't compute volume data on node %s: %s",
1328                         self.cfg.GetNodeName(node_uuid), msg)
1329         continue
1330
1331       node_vols = sorted(nresult.payload,
1332                          key=operator.itemgetter("dev"))
1333
1334       for vol in node_vols:
1335         node_output = []
1336         for field in self.op.output_fields:
1337           if field == "node":
1338             val = self.cfg.GetNodeName(node_uuid)
1339           elif field == "phys":
1340             val = vol["dev"]
1341           elif field == "vg":
1342             val = vol["vg"]
1343           elif field == "name":
1344             val = vol["name"]
1345           elif field == "size":
1346             val = int(float(vol["size"]))
1347           elif field == "instance":
1348             val = vol2inst.get((node_uuid, vol["vg"] + "/" + vol["name"]), "-")
1349           else:
1350             raise errors.ParameterError(field)
1351           node_output.append(str(val))
1352
1353         output.append(node_output)
1354
1355     return output
1356
1357
1358 class LUNodeQueryStorage(NoHooksLU):
1359   """Logical unit for getting information on storage units on node(s).
1360
1361   """
1362   _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1363   REQ_BGL = False
1364
1365   def CheckArguments(self):
1366     _CheckOutputFields(static=self._FIELDS_STATIC,
1367                        dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1368                        selected=self.op.output_fields)
1369
1370   def ExpandNames(self):
1371     self.share_locks = ShareAll()
1372
1373     if self.op.nodes:
1374       self.needed_locks = {
1375         locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1376         }
1377     else:
1378       self.needed_locks = {
1379         locking.LEVEL_NODE: locking.ALL_SET,
1380         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1381         }
1382
1383   def Exec(self, feedback_fn):
1384     """Computes the list of nodes and their attributes.
1385
1386     """
1387     self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1388
1389     # Always get name to sort by
1390     if constants.SF_NAME in self.op.output_fields:
1391       fields = self.op.output_fields[:]
1392     else:
1393       fields = [constants.SF_NAME] + self.op.output_fields
1394
1395     # Never ask for node or type as it's only known to the LU
1396     for extra in [constants.SF_NODE, constants.SF_TYPE]:
1397       while extra in fields:
1398         fields.remove(extra)
1399
1400     field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1401     name_idx = field_idx[constants.SF_NAME]
1402
1403     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1404     data = self.rpc.call_storage_list(self.node_uuids,
1405                                       self.op.storage_type, st_args,
1406                                       self.op.name, fields)
1407
1408     result = []
1409
1410     for node_uuid in utils.NiceSort(self.node_uuids):
1411       node_name = self.cfg.GetNodeName(node_uuid)
1412       nresult = data[node_uuid]
1413       if nresult.offline:
1414         continue
1415
1416       msg = nresult.fail_msg
1417       if msg:
1418         self.LogWarning("Can't get storage data from node %s: %s",
1419                         node_name, msg)
1420         continue
1421
1422       rows = dict([(row[name_idx], row) for row in nresult.payload])
1423
1424       for name in utils.NiceSort(rows.keys()):
1425         row = rows[name]
1426
1427         out = []
1428
1429         for field in self.op.output_fields:
1430           if field == constants.SF_NODE:
1431             val = node_name
1432           elif field == constants.SF_TYPE:
1433             val = self.op.storage_type
1434           elif field in field_idx:
1435             val = row[field_idx[field]]
1436           else:
1437             raise errors.ParameterError(field)
1438
1439           out.append(val)
1440
1441         result.append(out)
1442
1443     return result
1444
1445
1446 class LUNodeRemove(LogicalUnit):
1447   """Logical unit for removing a node.
1448
1449   """
1450   HPATH = "node-remove"
1451   HTYPE = constants.HTYPE_NODE
1452
1453   def BuildHooksEnv(self):
1454     """Build hooks env.
1455
1456     """
1457     return {
1458       "OP_TARGET": self.op.node_name,
1459       "NODE_NAME": self.op.node_name,
1460       }
1461
1462   def BuildHooksNodes(self):
1463     """Build hooks nodes.
1464
1465     This doesn't run on the target node in the pre phase as a failed
1466     node would then be impossible to remove.
1467
1468     """
1469     all_nodes = self.cfg.GetNodeList()
1470     try:
1471       all_nodes.remove(self.op.node_uuid)
1472     except ValueError:
1473       pass
1474     return (all_nodes, all_nodes)
1475
1476   def CheckPrereq(self):
1477     """Check prerequisites.
1478
1479     This checks:
1480      - the node exists in the configuration
1481      - it does not have primary or secondary instances
1482      - it's not the master
1483
1484     Any errors are signaled by raising errors.OpPrereqError.
1485
1486     """
1487     (self.op.node_uuid, self.op.node_name) = \
1488       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1489     node = self.cfg.GetNodeInfo(self.op.node_uuid)
1490     assert node is not None
1491
1492     masternode = self.cfg.GetMasterNode()
1493     if node.uuid == masternode:
1494       raise errors.OpPrereqError("Node is the master node, failover to another"
1495                                  " node is required", errors.ECODE_INVAL)
1496
1497     for _, instance in self.cfg.GetAllInstancesInfo().items():
1498       if node.uuid in instance.all_nodes:
1499         raise errors.OpPrereqError("Instance %s is still running on the node,"
1500                                    " please remove first" % instance.name,
1501                                    errors.ECODE_INVAL)
1502     self.op.node_name = node.name
1503     self.node = node
1504
1505   def Exec(self, feedback_fn):
1506     """Removes the node from the cluster.
1507
1508     """
1509     logging.info("Stopping the node daemon and removing configs from node %s",
1510                  self.node.name)
1511
1512     modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1513
1514     assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1515       "Not owning BGL"
1516
1517     # Promote nodes to master candidate as needed
1518     AdjustCandidatePool(self, exceptions=[self.node.uuid])
1519     self.context.RemoveNode(self.node)
1520
1521     # Run post hooks on the node before it's removed
1522     RunPostHook(self, self.node.name)
1523
1524     # we have to call this by name rather than by UUID, as the node is no longer
1525     # in the config
1526     result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1527     msg = result.fail_msg
1528     if msg:
1529       self.LogWarning("Errors encountered on the remote node while leaving"
1530                       " the cluster: %s", msg)
1531
1532     # Remove node from our /etc/hosts
1533     if self.cfg.GetClusterInfo().modify_etc_hosts:
1534       master_node_uuid = self.cfg.GetMasterNode()
1535       result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1536                                               constants.ETC_HOSTS_REMOVE,
1537                                               self.node.name, None)
1538       result.Raise("Can't update hosts file with new host data")
1539       RedistributeAncillaryFiles(self)
1540
1541
1542 class LURepairNodeStorage(NoHooksLU):
1543   """Repairs the volume group on a node.
1544
1545   """
1546   REQ_BGL = False
1547
1548   def CheckArguments(self):
1549     (self.op.node_uuid, self.op.node_name) = \
1550       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1551
1552     storage_type = self.op.storage_type
1553
1554     if (constants.SO_FIX_CONSISTENCY not in
1555         constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1556       raise errors.OpPrereqError("Storage units of type '%s' can not be"
1557                                  " repaired" % storage_type,
1558                                  errors.ECODE_INVAL)
1559
1560   def ExpandNames(self):
1561     self.needed_locks = {
1562       locking.LEVEL_NODE: [self.op.node_uuid],
1563       }
1564
1565   def _CheckFaultyDisks(self, instance, node_uuid):
1566     """Ensure faulty disks abort the opcode or at least warn."""
1567     try:
1568       if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1569                                  node_uuid, True):
1570         raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1571                                    " node '%s'" %
1572                                    (instance.name,
1573                                     self.cfg.GetNodeName(node_uuid)),
1574                                    errors.ECODE_STATE)
1575     except errors.OpPrereqError, err:
1576       if self.op.ignore_consistency:
1577         self.LogWarning(str(err.args[0]))
1578       else:
1579         raise
1580
1581   def CheckPrereq(self):
1582     """Check prerequisites.
1583
1584     """
1585     # Check whether any instance on this node has faulty disks
1586     for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1587       if not inst.disks_active:
1588         continue
1589       check_nodes = set(inst.all_nodes)
1590       check_nodes.discard(self.op.node_uuid)
1591       for inst_node_uuid in check_nodes:
1592         self._CheckFaultyDisks(inst, inst_node_uuid)
1593
1594   def Exec(self, feedback_fn):
1595     feedback_fn("Repairing storage unit '%s' on %s ..." %
1596                 (self.op.name, self.op.node_name))
1597
1598     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1599     result = self.rpc.call_storage_execute(self.op.node_uuid,
1600                                            self.op.storage_type, st_args,
1601                                            self.op.name,
1602                                            constants.SO_FIX_CONSISTENCY)
1603     result.Raise("Failed to repair storage unit '%s' on %s" %
1604                  (self.op.name, self.op.node_name))