Merge branch 'stable-2.8' into stable-2.9
[ganeti-local] / lib / cmdlib / node.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with nodes."""
23
24 import logging
25 import operator
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti import netutils
31 from ganeti import objects
32 from ganeti import opcodes
33 from ganeti import qlang
34 from ganeti import query
35 from ganeti import rpc
36 from ganeti import utils
37 from ganeti.masterd import iallocator
38
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40   ResultWithJobs
41 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42   MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43   IsExclusiveStorageEnabledNode, CheckNodePVs, \
44   RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45   CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46   AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47   GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48   FindFaultyInstanceDisks, CheckStorageTypeEnabled
49
50
51 def _DecideSelfPromotion(lu, exceptions=None):
52   """Decide whether I should promote myself as a master candidate.
53
54   """
55   cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56   mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57   # the new node will increase mc_max with one, so:
58   mc_should = min(mc_should + 1, cp_size)
59   return mc_now < mc_should
60
61
62 def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63   """Ensure that a node has the given secondary ip.
64
65   @type lu: L{LogicalUnit}
66   @param lu: the LU on behalf of which we make the check
67   @type node: L{objects.Node}
68   @param node: the node to check
69   @type secondary_ip: string
70   @param secondary_ip: the ip to check
71   @type prereq: boolean
72   @param prereq: whether to throw a prerequisite or an execute error
73   @raise errors.OpPrereqError: if the node doesn't have the ip,
74   and prereq=True
75   @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76
77   """
78   # this can be called with a new node, which has no UUID yet, so perform the
79   # RPC call using its name
80   result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81   result.Raise("Failure checking secondary ip on node %s" % node.name,
82                prereq=prereq, ecode=errors.ECODE_ENVIRON)
83   if not result.payload:
84     msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85            " please fix and re-run this command" % secondary_ip)
86     if prereq:
87       raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88     else:
89       raise errors.OpExecError(msg)
90
91
92 class LUNodeAdd(LogicalUnit):
93   """Logical unit for adding node to the cluster.
94
95   """
96   HPATH = "node-add"
97   HTYPE = constants.HTYPE_NODE
98   _NFLAGS = ["master_capable", "vm_capable"]
99
100   def CheckArguments(self):
101     self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102     # validate/normalize the node name
103     self.hostname = netutils.GetHostname(name=self.op.node_name,
104                                          family=self.primary_ip_family)
105     self.op.node_name = self.hostname.name
106
107     if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108       raise errors.OpPrereqError("Cannot readd the master node",
109                                  errors.ECODE_STATE)
110
111     if self.op.readd and self.op.group:
112       raise errors.OpPrereqError("Cannot pass a node group when a node is"
113                                  " being readded", errors.ECODE_INVAL)
114
115   def BuildHooksEnv(self):
116     """Build hooks env.
117
118     This will run on all nodes before, and on all nodes + the new node after.
119
120     """
121     return {
122       "OP_TARGET": self.op.node_name,
123       "NODE_NAME": self.op.node_name,
124       "NODE_PIP": self.op.primary_ip,
125       "NODE_SIP": self.op.secondary_ip,
126       "MASTER_CAPABLE": str(self.op.master_capable),
127       "VM_CAPABLE": str(self.op.vm_capable),
128       }
129
130   def BuildHooksNodes(self):
131     """Build hooks nodes.
132
133     """
134     hook_nodes = self.cfg.GetNodeList()
135     new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
136     if new_node_info is not None:
137       # Exclude added node
138       hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
139
140     # add the new node as post hook node by name; it does not have an UUID yet
141     return (hook_nodes, hook_nodes, [self.op.node_name, ])
142
143   def CheckPrereq(self):
144     """Check prerequisites.
145
146     This checks:
147      - the new node is not already in the config
148      - it is resolvable
149      - its parameters (single/dual homed) matches the cluster
150
151     Any errors are signaled by raising errors.OpPrereqError.
152
153     """
154     node_name = self.hostname.name
155     self.op.primary_ip = self.hostname.ip
156     if self.op.secondary_ip is None:
157       if self.primary_ip_family == netutils.IP6Address.family:
158         raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
159                                    " IPv4 address must be given as secondary",
160                                    errors.ECODE_INVAL)
161       self.op.secondary_ip = self.op.primary_ip
162
163     secondary_ip = self.op.secondary_ip
164     if not netutils.IP4Address.IsValid(secondary_ip):
165       raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
166                                  " address" % secondary_ip, errors.ECODE_INVAL)
167
168     existing_node_info = self.cfg.GetNodeInfoByName(node_name)
169     if not self.op.readd and existing_node_info is not None:
170       raise errors.OpPrereqError("Node %s is already in the configuration" %
171                                  node_name, errors.ECODE_EXISTS)
172     elif self.op.readd and existing_node_info is None:
173       raise errors.OpPrereqError("Node %s is not in the configuration" %
174                                  node_name, errors.ECODE_NOENT)
175
176     self.changed_primary_ip = False
177
178     for existing_node in self.cfg.GetAllNodesInfo().values():
179       if self.op.readd and node_name == existing_node.name:
180         if existing_node.secondary_ip != secondary_ip:
181           raise errors.OpPrereqError("Readded node doesn't have the same IP"
182                                      " address configuration as before",
183                                      errors.ECODE_INVAL)
184         if existing_node.primary_ip != self.op.primary_ip:
185           self.changed_primary_ip = True
186
187         continue
188
189       if (existing_node.primary_ip == self.op.primary_ip or
190           existing_node.secondary_ip == self.op.primary_ip or
191           existing_node.primary_ip == secondary_ip or
192           existing_node.secondary_ip == secondary_ip):
193         raise errors.OpPrereqError("New node ip address(es) conflict with"
194                                    " existing node %s" % existing_node.name,
195                                    errors.ECODE_NOTUNIQUE)
196
197     # After this 'if' block, None is no longer a valid value for the
198     # _capable op attributes
199     if self.op.readd:
200       assert existing_node_info is not None, \
201         "Can't retrieve locked node %s" % node_name
202       for attr in self._NFLAGS:
203         if getattr(self.op, attr) is None:
204           setattr(self.op, attr, getattr(existing_node_info, attr))
205     else:
206       for attr in self._NFLAGS:
207         if getattr(self.op, attr) is None:
208           setattr(self.op, attr, True)
209
210     if self.op.readd and not self.op.vm_capable:
211       pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
212       if pri or sec:
213         raise errors.OpPrereqError("Node %s being re-added with vm_capable"
214                                    " flag set to false, but it already holds"
215                                    " instances" % node_name,
216                                    errors.ECODE_STATE)
217
218     # check that the type of the node (single versus dual homed) is the
219     # same as for the master
220     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
221     master_singlehomed = myself.secondary_ip == myself.primary_ip
222     newbie_singlehomed = secondary_ip == self.op.primary_ip
223     if master_singlehomed != newbie_singlehomed:
224       if master_singlehomed:
225         raise errors.OpPrereqError("The master has no secondary ip but the"
226                                    " new node has one",
227                                    errors.ECODE_INVAL)
228       else:
229         raise errors.OpPrereqError("The master has a secondary ip but the"
230                                    " new node doesn't have one",
231                                    errors.ECODE_INVAL)
232
233     # checks reachability
234     if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
235       raise errors.OpPrereqError("Node not reachable by ping",
236                                  errors.ECODE_ENVIRON)
237
238     if not newbie_singlehomed:
239       # check reachability from my secondary ip to newbie's secondary ip
240       if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
241                               source=myself.secondary_ip):
242         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
243                                    " based ping to node daemon port",
244                                    errors.ECODE_ENVIRON)
245
246     if self.op.readd:
247       exceptions = [existing_node_info.uuid]
248     else:
249       exceptions = []
250
251     if self.op.master_capable:
252       self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
253     else:
254       self.master_candidate = False
255
256     if self.op.readd:
257       self.new_node = existing_node_info
258     else:
259       node_group = self.cfg.LookupNodeGroup(self.op.group)
260       self.new_node = objects.Node(name=node_name,
261                                    primary_ip=self.op.primary_ip,
262                                    secondary_ip=secondary_ip,
263                                    master_candidate=self.master_candidate,
264                                    offline=False, drained=False,
265                                    group=node_group, ndparams={})
266
267     if self.op.ndparams:
268       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
269       CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
270                            "node", "cluster or group")
271
272     if self.op.hv_state:
273       self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
274
275     if self.op.disk_state:
276       self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
277
278     # TODO: If we need to have multiple DnsOnlyRunner we probably should make
279     #       it a property on the base class.
280     rpcrunner = rpc.DnsOnlyRunner()
281     result = rpcrunner.call_version([node_name])[node_name]
282     result.Raise("Can't get version information from node %s" % node_name)
283     if constants.PROTOCOL_VERSION == result.payload:
284       logging.info("Communication to node %s fine, sw version %s match",
285                    node_name, result.payload)
286     else:
287       raise errors.OpPrereqError("Version mismatch master version %s,"
288                                  " node version %s" %
289                                  (constants.PROTOCOL_VERSION, result.payload),
290                                  errors.ECODE_ENVIRON)
291
292     vg_name = self.cfg.GetVGName()
293     if vg_name is not None:
294       vparams = {constants.NV_PVLIST: [vg_name]}
295       excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
296       cname = self.cfg.GetClusterName()
297       result = rpcrunner.call_node_verify_light(
298           [node_name], vparams, cname,
299           self.cfg.GetClusterInfo().hvparams)[node_name]
300       (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
301       if errmsgs:
302         raise errors.OpPrereqError("Checks on node PVs failed: %s" %
303                                    "; ".join(errmsgs), errors.ECODE_ENVIRON)
304
305   def Exec(self, feedback_fn):
306     """Adds the new node to the cluster.
307
308     """
309     assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
310       "Not owning BGL"
311
312     # We adding a new node so we assume it's powered
313     self.new_node.powered = True
314
315     # for re-adds, reset the offline/drained/master-candidate flags;
316     # we need to reset here, otherwise offline would prevent RPC calls
317     # later in the procedure; this also means that if the re-add
318     # fails, we are left with a non-offlined, broken node
319     if self.op.readd:
320       self.new_node.offline = False
321       self.new_node.drained = False
322       self.LogInfo("Readding a node, the offline/drained flags were reset")
323       # if we demote the node, we do cleanup later in the procedure
324       self.new_node.master_candidate = self.master_candidate
325       if self.changed_primary_ip:
326         self.new_node.primary_ip = self.op.primary_ip
327
328     # copy the master/vm_capable flags
329     for attr in self._NFLAGS:
330       setattr(self.new_node, attr, getattr(self.op, attr))
331
332     # notify the user about any possible mc promotion
333     if self.new_node.master_candidate:
334       self.LogInfo("Node will be a master candidate")
335
336     if self.op.ndparams:
337       self.new_node.ndparams = self.op.ndparams
338     else:
339       self.new_node.ndparams = {}
340
341     if self.op.hv_state:
342       self.new_node.hv_state_static = self.new_hv_state
343
344     if self.op.disk_state:
345       self.new_node.disk_state_static = self.new_disk_state
346
347     # Add node to our /etc/hosts, and add key to known_hosts
348     if self.cfg.GetClusterInfo().modify_etc_hosts:
349       master_node = self.cfg.GetMasterNode()
350       result = self.rpc.call_etc_hosts_modify(
351                  master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
352                  self.hostname.ip)
353       result.Raise("Can't update hosts file with new host data")
354
355     if self.new_node.secondary_ip != self.new_node.primary_ip:
356       _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
357                                False)
358
359     node_verifier_uuids = [self.cfg.GetMasterNode()]
360     node_verify_param = {
361       constants.NV_NODELIST: ([self.new_node.name], {}),
362       # TODO: do a node-net-test as well?
363     }
364
365     result = self.rpc.call_node_verify(
366                node_verifier_uuids, node_verify_param,
367                self.cfg.GetClusterName(),
368                self.cfg.GetClusterInfo().hvparams)
369     for verifier in node_verifier_uuids:
370       result[verifier].Raise("Cannot communicate with node %s" % verifier)
371       nl_payload = result[verifier].payload[constants.NV_NODELIST]
372       if nl_payload:
373         for failed in nl_payload:
374           feedback_fn("ssh/hostname verification failed"
375                       " (checking from %s): %s" %
376                       (verifier, nl_payload[failed]))
377         raise errors.OpExecError("ssh/hostname verification failed")
378
379     if self.op.readd:
380       self.context.ReaddNode(self.new_node)
381       RedistributeAncillaryFiles(self)
382       # make sure we redistribute the config
383       self.cfg.Update(self.new_node, feedback_fn)
384       # and make sure the new node will not have old files around
385       if not self.new_node.master_candidate:
386         result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
387         result.Warn("Node failed to demote itself from master candidate status",
388                     self.LogWarning)
389     else:
390       self.context.AddNode(self.new_node, self.proc.GetECId())
391       RedistributeAncillaryFiles(self)
392
393
394 class LUNodeSetParams(LogicalUnit):
395   """Modifies the parameters of a node.
396
397   @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
398       to the node role (as _ROLE_*)
399   @cvar _R2F: a dictionary from node role to tuples of flags
400   @cvar _FLAGS: a list of attribute names corresponding to the flags
401
402   """
403   HPATH = "node-modify"
404   HTYPE = constants.HTYPE_NODE
405   REQ_BGL = False
406   (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
407   _F2R = {
408     (True, False, False): _ROLE_CANDIDATE,
409     (False, True, False): _ROLE_DRAINED,
410     (False, False, True): _ROLE_OFFLINE,
411     (False, False, False): _ROLE_REGULAR,
412     }
413   _R2F = dict((v, k) for k, v in _F2R.items())
414   _FLAGS = ["master_candidate", "drained", "offline"]
415
416   def CheckArguments(self):
417     (self.op.node_uuid, self.op.node_name) = \
418       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
419     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
420                 self.op.master_capable, self.op.vm_capable,
421                 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
422                 self.op.disk_state]
423     if all_mods.count(None) == len(all_mods):
424       raise errors.OpPrereqError("Please pass at least one modification",
425                                  errors.ECODE_INVAL)
426     if all_mods.count(True) > 1:
427       raise errors.OpPrereqError("Can't set the node into more than one"
428                                  " state at the same time",
429                                  errors.ECODE_INVAL)
430
431     # Boolean value that tells us whether we might be demoting from MC
432     self.might_demote = (self.op.master_candidate is False or
433                          self.op.offline is True or
434                          self.op.drained is True or
435                          self.op.master_capable is False)
436
437     if self.op.secondary_ip:
438       if not netutils.IP4Address.IsValid(self.op.secondary_ip):
439         raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
440                                    " address" % self.op.secondary_ip,
441                                    errors.ECODE_INVAL)
442
443     self.lock_all = self.op.auto_promote and self.might_demote
444     self.lock_instances = self.op.secondary_ip is not None
445
446   def _InstanceFilter(self, instance):
447     """Filter for getting affected instances.
448
449     """
450     return (instance.disk_template in constants.DTS_INT_MIRROR and
451             self.op.node_uuid in instance.all_nodes)
452
453   def ExpandNames(self):
454     if self.lock_all:
455       self.needed_locks = {
456         locking.LEVEL_NODE: locking.ALL_SET,
457
458         # Block allocations when all nodes are locked
459         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
460         }
461     else:
462       self.needed_locks = {
463         locking.LEVEL_NODE: self.op.node_uuid,
464         }
465
466     # Since modifying a node can have severe effects on currently running
467     # operations the resource lock is at least acquired in shared mode
468     self.needed_locks[locking.LEVEL_NODE_RES] = \
469       self.needed_locks[locking.LEVEL_NODE]
470
471     # Get all locks except nodes in shared mode; they are not used for anything
472     # but read-only access
473     self.share_locks = ShareAll()
474     self.share_locks[locking.LEVEL_NODE] = 0
475     self.share_locks[locking.LEVEL_NODE_RES] = 0
476     self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
477
478     if self.lock_instances:
479       self.needed_locks[locking.LEVEL_INSTANCE] = \
480         self.cfg.GetInstanceNames(
481           self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
482
483   def BuildHooksEnv(self):
484     """Build hooks env.
485
486     This runs on the master node.
487
488     """
489     return {
490       "OP_TARGET": self.op.node_name,
491       "MASTER_CANDIDATE": str(self.op.master_candidate),
492       "OFFLINE": str(self.op.offline),
493       "DRAINED": str(self.op.drained),
494       "MASTER_CAPABLE": str(self.op.master_capable),
495       "VM_CAPABLE": str(self.op.vm_capable),
496       }
497
498   def BuildHooksNodes(self):
499     """Build hooks nodes.
500
501     """
502     nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
503     return (nl, nl)
504
505   def CheckPrereq(self):
506     """Check prerequisites.
507
508     This only checks the instance list against the existing names.
509
510     """
511     node = self.cfg.GetNodeInfo(self.op.node_uuid)
512     if self.lock_instances:
513       affected_instances = \
514         self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
515
516       # Verify instance locks
517       owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
518       wanted_instance_names = frozenset([inst.name for inst in
519                                          affected_instances.values()])
520       if wanted_instance_names - owned_instance_names:
521         raise errors.OpPrereqError("Instances affected by changing node %s's"
522                                    " secondary IP address have changed since"
523                                    " locks were acquired, wanted '%s', have"
524                                    " '%s'; retry the operation" %
525                                    (node.name,
526                                     utils.CommaJoin(wanted_instance_names),
527                                     utils.CommaJoin(owned_instance_names)),
528                                    errors.ECODE_STATE)
529     else:
530       affected_instances = None
531
532     if (self.op.master_candidate is not None or
533         self.op.drained is not None or
534         self.op.offline is not None):
535       # we can't change the master's node flags
536       if node.uuid == self.cfg.GetMasterNode():
537         raise errors.OpPrereqError("The master role can be changed"
538                                    " only via master-failover",
539                                    errors.ECODE_INVAL)
540
541     if self.op.master_candidate and not node.master_capable:
542       raise errors.OpPrereqError("Node %s is not master capable, cannot make"
543                                  " it a master candidate" % node.name,
544                                  errors.ECODE_STATE)
545
546     if self.op.vm_capable is False:
547       (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
548       if ipri or isec:
549         raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
550                                    " the vm_capable flag" % node.name,
551                                    errors.ECODE_STATE)
552
553     if node.master_candidate and self.might_demote and not self.lock_all:
554       assert not self.op.auto_promote, "auto_promote set but lock_all not"
555       # check if after removing the current node, we're missing master
556       # candidates
557       (mc_remaining, mc_should, _) = \
558           self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
559       if mc_remaining < mc_should:
560         raise errors.OpPrereqError("Not enough master candidates, please"
561                                    " pass auto promote option to allow"
562                                    " promotion (--auto-promote or RAPI"
563                                    " auto_promote=True)", errors.ECODE_STATE)
564
565     self.old_flags = old_flags = (node.master_candidate,
566                                   node.drained, node.offline)
567     assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
568     self.old_role = old_role = self._F2R[old_flags]
569
570     # Check for ineffective changes
571     for attr in self._FLAGS:
572       if getattr(self.op, attr) is False and getattr(node, attr) is False:
573         self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
574         setattr(self.op, attr, None)
575
576     # Past this point, any flag change to False means a transition
577     # away from the respective state, as only real changes are kept
578
579     # TODO: We might query the real power state if it supports OOB
580     if SupportsOob(self.cfg, node):
581       if self.op.offline is False and not (node.powered or
582                                            self.op.powered is True):
583         raise errors.OpPrereqError(("Node %s needs to be turned on before its"
584                                     " offline status can be reset") %
585                                    self.op.node_name, errors.ECODE_STATE)
586     elif self.op.powered is not None:
587       raise errors.OpPrereqError(("Unable to change powered state for node %s"
588                                   " as it does not support out-of-band"
589                                   " handling") % self.op.node_name,
590                                  errors.ECODE_STATE)
591
592     # If we're being deofflined/drained, we'll MC ourself if needed
593     if (self.op.drained is False or self.op.offline is False or
594         (self.op.master_capable and not node.master_capable)):
595       if _DecideSelfPromotion(self):
596         self.op.master_candidate = True
597         self.LogInfo("Auto-promoting node to master candidate")
598
599     # If we're no longer master capable, we'll demote ourselves from MC
600     if self.op.master_capable is False and node.master_candidate:
601       self.LogInfo("Demoting from master candidate")
602       self.op.master_candidate = False
603
604     # Compute new role
605     assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
606     if self.op.master_candidate:
607       new_role = self._ROLE_CANDIDATE
608     elif self.op.drained:
609       new_role = self._ROLE_DRAINED
610     elif self.op.offline:
611       new_role = self._ROLE_OFFLINE
612     elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
613       # False is still in new flags, which means we're un-setting (the
614       # only) True flag
615       new_role = self._ROLE_REGULAR
616     else: # no new flags, nothing, keep old role
617       new_role = old_role
618
619     self.new_role = new_role
620
621     if old_role == self._ROLE_OFFLINE and new_role != old_role:
622       # Trying to transition out of offline status
623       result = self.rpc.call_version([node.uuid])[node.uuid]
624       if result.fail_msg:
625         raise errors.OpPrereqError("Node %s is being de-offlined but fails"
626                                    " to report its version: %s" %
627                                    (node.name, result.fail_msg),
628                                    errors.ECODE_STATE)
629       else:
630         self.LogWarning("Transitioning node from offline to online state"
631                         " without using re-add. Please make sure the node"
632                         " is healthy!")
633
634     # When changing the secondary ip, verify if this is a single-homed to
635     # multi-homed transition or vice versa, and apply the relevant
636     # restrictions.
637     if self.op.secondary_ip:
638       # Ok even without locking, because this can't be changed by any LU
639       master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
640       master_singlehomed = master.secondary_ip == master.primary_ip
641       if master_singlehomed and self.op.secondary_ip != node.primary_ip:
642         if self.op.force and node.uuid == master.uuid:
643           self.LogWarning("Transitioning from single-homed to multi-homed"
644                           " cluster; all nodes will require a secondary IP"
645                           " address")
646         else:
647           raise errors.OpPrereqError("Changing the secondary ip on a"
648                                      " single-homed cluster requires the"
649                                      " --force option to be passed, and the"
650                                      " target node to be the master",
651                                      errors.ECODE_INVAL)
652       elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
653         if self.op.force and node.uuid == master.uuid:
654           self.LogWarning("Transitioning from multi-homed to single-homed"
655                           " cluster; secondary IP addresses will have to be"
656                           " removed")
657         else:
658           raise errors.OpPrereqError("Cannot set the secondary IP to be the"
659                                      " same as the primary IP on a multi-homed"
660                                      " cluster, unless the --force option is"
661                                      " passed, and the target node is the"
662                                      " master", errors.ECODE_INVAL)
663
664       assert not (set([inst.name for inst in affected_instances.values()]) -
665                   self.owned_locks(locking.LEVEL_INSTANCE))
666
667       if node.offline:
668         if affected_instances:
669           msg = ("Cannot change secondary IP address: offline node has"
670                  " instances (%s) configured to use it" %
671                  utils.CommaJoin(
672                    [inst.name for inst in affected_instances.values()]))
673           raise errors.OpPrereqError(msg, errors.ECODE_STATE)
674       else:
675         # On online nodes, check that no instances are running, and that
676         # the node has the new ip and we can reach it.
677         for instance in affected_instances.values():
678           CheckInstanceState(self, instance, INSTANCE_DOWN,
679                              msg="cannot change secondary ip")
680
681         _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
682         if master.uuid != node.uuid:
683           # check reachability from master secondary ip to new secondary ip
684           if not netutils.TcpPing(self.op.secondary_ip,
685                                   constants.DEFAULT_NODED_PORT,
686                                   source=master.secondary_ip):
687             raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
688                                        " based ping to node daemon port",
689                                        errors.ECODE_ENVIRON)
690
691     if self.op.ndparams:
692       new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
693       utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
694       CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
695                            "node", "cluster or group")
696       self.new_ndparams = new_ndparams
697
698     if self.op.hv_state:
699       self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
700                                                 node.hv_state_static)
701
702     if self.op.disk_state:
703       self.new_disk_state = \
704         MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
705
706   def Exec(self, feedback_fn):
707     """Modifies a node.
708
709     """
710     node = self.cfg.GetNodeInfo(self.op.node_uuid)
711     result = []
712
713     if self.op.ndparams:
714       node.ndparams = self.new_ndparams
715
716     if self.op.powered is not None:
717       node.powered = self.op.powered
718
719     if self.op.hv_state:
720       node.hv_state_static = self.new_hv_state
721
722     if self.op.disk_state:
723       node.disk_state_static = self.new_disk_state
724
725     for attr in ["master_capable", "vm_capable"]:
726       val = getattr(self.op, attr)
727       if val is not None:
728         setattr(node, attr, val)
729         result.append((attr, str(val)))
730
731     if self.new_role != self.old_role:
732       # Tell the node to demote itself, if no longer MC and not offline
733       if self.old_role == self._ROLE_CANDIDATE and \
734           self.new_role != self._ROLE_OFFLINE:
735         msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
736         if msg:
737           self.LogWarning("Node failed to demote itself: %s", msg)
738
739       new_flags = self._R2F[self.new_role]
740       for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
741         if of != nf:
742           result.append((desc, str(nf)))
743       (node.master_candidate, node.drained, node.offline) = new_flags
744
745       # we locked all nodes, we adjust the CP before updating this node
746       if self.lock_all:
747         AdjustCandidatePool(self, [node.uuid])
748
749     if self.op.secondary_ip:
750       node.secondary_ip = self.op.secondary_ip
751       result.append(("secondary_ip", self.op.secondary_ip))
752
753     # this will trigger configuration file update, if needed
754     self.cfg.Update(node, feedback_fn)
755
756     # this will trigger job queue propagation or cleanup if the mc
757     # flag changed
758     if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
759       self.context.ReaddNode(node)
760
761     return result
762
763
764 class LUNodePowercycle(NoHooksLU):
765   """Powercycles a node.
766
767   """
768   REQ_BGL = False
769
770   def CheckArguments(self):
771     (self.op.node_uuid, self.op.node_name) = \
772       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
773
774     if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
775       raise errors.OpPrereqError("The node is the master and the force"
776                                  " parameter was not set",
777                                  errors.ECODE_INVAL)
778
779   def ExpandNames(self):
780     """Locking for PowercycleNode.
781
782     This is a last-resort option and shouldn't block on other
783     jobs. Therefore, we grab no locks.
784
785     """
786     self.needed_locks = {}
787
788   def Exec(self, feedback_fn):
789     """Reboots a node.
790
791     """
792     default_hypervisor = self.cfg.GetHypervisorType()
793     hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
794     result = self.rpc.call_node_powercycle(self.op.node_uuid,
795                                            default_hypervisor,
796                                            hvparams)
797     result.Raise("Failed to schedule the reboot")
798     return result.payload
799
800
801 def _GetNodeInstancesInner(cfg, fn):
802   return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
803
804
805 def _GetNodePrimaryInstances(cfg, node_uuid):
806   """Returns primary instances on a node.
807
808   """
809   return _GetNodeInstancesInner(cfg,
810                                 lambda inst: node_uuid == inst.primary_node)
811
812
813 def _GetNodeSecondaryInstances(cfg, node_uuid):
814   """Returns secondary instances on a node.
815
816   """
817   return _GetNodeInstancesInner(cfg,
818                                 lambda inst: node_uuid in inst.secondary_nodes)
819
820
821 def _GetNodeInstances(cfg, node_uuid):
822   """Returns a list of all primary and secondary instances on a node.
823
824   """
825
826   return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
827
828
829 class LUNodeEvacuate(NoHooksLU):
830   """Evacuates instances off a list of nodes.
831
832   """
833   REQ_BGL = False
834
835   _MODE2IALLOCATOR = {
836     constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
837     constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
838     constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
839     }
840   assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
841   assert (frozenset(_MODE2IALLOCATOR.values()) ==
842           constants.IALLOCATOR_NEVAC_MODES)
843
844   def CheckArguments(self):
845     CheckIAllocatorOrNode(self, "iallocator", "remote_node")
846
847   def ExpandNames(self):
848     (self.op.node_uuid, self.op.node_name) = \
849       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
850
851     if self.op.remote_node is not None:
852       (self.op.remote_node_uuid, self.op.remote_node) = \
853         ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
854                               self.op.remote_node)
855       assert self.op.remote_node
856
857       if self.op.node_uuid == self.op.remote_node_uuid:
858         raise errors.OpPrereqError("Can not use evacuated node as a new"
859                                    " secondary node", errors.ECODE_INVAL)
860
861       if self.op.mode != constants.NODE_EVAC_SEC:
862         raise errors.OpPrereqError("Without the use of an iallocator only"
863                                    " secondary instances can be evacuated",
864                                    errors.ECODE_INVAL)
865
866     # Declare locks
867     self.share_locks = ShareAll()
868     self.needed_locks = {
869       locking.LEVEL_INSTANCE: [],
870       locking.LEVEL_NODEGROUP: [],
871       locking.LEVEL_NODE: [],
872       }
873
874     # Determine nodes (via group) optimistically, needs verification once locks
875     # have been acquired
876     self.lock_nodes = self._DetermineNodes()
877
878   def _DetermineNodes(self):
879     """Gets the list of node UUIDs to operate on.
880
881     """
882     if self.op.remote_node is None:
883       # Iallocator will choose any node(s) in the same group
884       group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
885     else:
886       group_nodes = frozenset([self.op.remote_node_uuid])
887
888     # Determine nodes to be locked
889     return set([self.op.node_uuid]) | group_nodes
890
891   def _DetermineInstances(self):
892     """Builds list of instances to operate on.
893
894     """
895     assert self.op.mode in constants.NODE_EVAC_MODES
896
897     if self.op.mode == constants.NODE_EVAC_PRI:
898       # Primary instances only
899       inst_fn = _GetNodePrimaryInstances
900       assert self.op.remote_node is None, \
901         "Evacuating primary instances requires iallocator"
902     elif self.op.mode == constants.NODE_EVAC_SEC:
903       # Secondary instances only
904       inst_fn = _GetNodeSecondaryInstances
905     else:
906       # All instances
907       assert self.op.mode == constants.NODE_EVAC_ALL
908       inst_fn = _GetNodeInstances
909       # TODO: In 2.6, change the iallocator interface to take an evacuation mode
910       # per instance
911       raise errors.OpPrereqError("Due to an issue with the iallocator"
912                                  " interface it is not possible to evacuate"
913                                  " all instances at once; specify explicitly"
914                                  " whether to evacuate primary or secondary"
915                                  " instances",
916                                  errors.ECODE_INVAL)
917
918     return inst_fn(self.cfg, self.op.node_uuid)
919
920   def DeclareLocks(self, level):
921     if level == locking.LEVEL_INSTANCE:
922       # Lock instances optimistically, needs verification once node and group
923       # locks have been acquired
924       self.needed_locks[locking.LEVEL_INSTANCE] = \
925         set(i.name for i in self._DetermineInstances())
926
927     elif level == locking.LEVEL_NODEGROUP:
928       # Lock node groups for all potential target nodes optimistically, needs
929       # verification once nodes have been acquired
930       self.needed_locks[locking.LEVEL_NODEGROUP] = \
931         self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
932
933     elif level == locking.LEVEL_NODE:
934       self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
935
936   def CheckPrereq(self):
937     # Verify locks
938     owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
939     owned_nodes = self.owned_locks(locking.LEVEL_NODE)
940     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
941
942     need_nodes = self._DetermineNodes()
943
944     if not owned_nodes.issuperset(need_nodes):
945       raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
946                                  " locks were acquired, current nodes are"
947                                  " are '%s', used to be '%s'; retry the"
948                                  " operation" %
949                                  (self.op.node_name,
950                                   utils.CommaJoin(need_nodes),
951                                   utils.CommaJoin(owned_nodes)),
952                                  errors.ECODE_STATE)
953
954     wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
955     if owned_groups != wanted_groups:
956       raise errors.OpExecError("Node groups changed since locks were acquired,"
957                                " current groups are '%s', used to be '%s';"
958                                " retry the operation" %
959                                (utils.CommaJoin(wanted_groups),
960                                 utils.CommaJoin(owned_groups)))
961
962     # Determine affected instances
963     self.instances = self._DetermineInstances()
964     self.instance_names = [i.name for i in self.instances]
965
966     if set(self.instance_names) != owned_instance_names:
967       raise errors.OpExecError("Instances on node '%s' changed since locks"
968                                " were acquired, current instances are '%s',"
969                                " used to be '%s'; retry the operation" %
970                                (self.op.node_name,
971                                 utils.CommaJoin(self.instance_names),
972                                 utils.CommaJoin(owned_instance_names)))
973
974     if self.instance_names:
975       self.LogInfo("Evacuating instances from node '%s': %s",
976                    self.op.node_name,
977                    utils.CommaJoin(utils.NiceSort(self.instance_names)))
978     else:
979       self.LogInfo("No instances to evacuate from node '%s'",
980                    self.op.node_name)
981
982     if self.op.remote_node is not None:
983       for i in self.instances:
984         if i.primary_node == self.op.remote_node_uuid:
985           raise errors.OpPrereqError("Node %s is the primary node of"
986                                      " instance %s, cannot use it as"
987                                      " secondary" %
988                                      (self.op.remote_node, i.name),
989                                      errors.ECODE_INVAL)
990
991   def Exec(self, feedback_fn):
992     assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
993
994     if not self.instance_names:
995       # No instances to evacuate
996       jobs = []
997
998     elif self.op.iallocator is not None:
999       # TODO: Implement relocation to other group
1000       evac_mode = self._MODE2IALLOCATOR[self.op.mode]
1001       req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
1002                                      instances=list(self.instance_names))
1003       ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1004
1005       ial.Run(self.op.iallocator)
1006
1007       if not ial.success:
1008         raise errors.OpPrereqError("Can't compute node evacuation using"
1009                                    " iallocator '%s': %s" %
1010                                    (self.op.iallocator, ial.info),
1011                                    errors.ECODE_NORES)
1012
1013       jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1014
1015     elif self.op.remote_node is not None:
1016       assert self.op.mode == constants.NODE_EVAC_SEC
1017       jobs = [
1018         [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1019                                         remote_node=self.op.remote_node,
1020                                         disks=[],
1021                                         mode=constants.REPLACE_DISK_CHG,
1022                                         early_release=self.op.early_release)]
1023         for instance_name in self.instance_names]
1024
1025     else:
1026       raise errors.ProgrammerError("No iallocator or remote node")
1027
1028     return ResultWithJobs(jobs)
1029
1030
1031 class LUNodeMigrate(LogicalUnit):
1032   """Migrate all instances from a node.
1033
1034   """
1035   HPATH = "node-migrate"
1036   HTYPE = constants.HTYPE_NODE
1037   REQ_BGL = False
1038
1039   def CheckArguments(self):
1040     pass
1041
1042   def ExpandNames(self):
1043     (self.op.node_uuid, self.op.node_name) = \
1044       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1045
1046     self.share_locks = ShareAll()
1047     self.needed_locks = {
1048       locking.LEVEL_NODE: [self.op.node_uuid],
1049       }
1050
1051   def BuildHooksEnv(self):
1052     """Build hooks env.
1053
1054     This runs on the master, the primary and all the secondaries.
1055
1056     """
1057     return {
1058       "NODE_NAME": self.op.node_name,
1059       "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1060       }
1061
1062   def BuildHooksNodes(self):
1063     """Build hooks nodes.
1064
1065     """
1066     nl = [self.cfg.GetMasterNode()]
1067     return (nl, nl)
1068
1069   def CheckPrereq(self):
1070     pass
1071
1072   def Exec(self, feedback_fn):
1073     # Prepare jobs for migration instances
1074     jobs = [
1075       [opcodes.OpInstanceMigrate(
1076         instance_name=inst.name,
1077         mode=self.op.mode,
1078         live=self.op.live,
1079         iallocator=self.op.iallocator,
1080         target_node=self.op.target_node,
1081         allow_runtime_changes=self.op.allow_runtime_changes,
1082         ignore_ipolicy=self.op.ignore_ipolicy)]
1083       for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1084
1085     # TODO: Run iallocator in this opcode and pass correct placement options to
1086     # OpInstanceMigrate. Since other jobs can modify the cluster between
1087     # running the iallocator and the actual migration, a good consistency model
1088     # will have to be found.
1089
1090     assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1091             frozenset([self.op.node_uuid]))
1092
1093     return ResultWithJobs(jobs)
1094
1095
1096 def _GetStorageTypeArgs(cfg, storage_type):
1097   """Returns the arguments for a storage type.
1098
1099   """
1100   # Special case for file storage
1101   if storage_type == constants.ST_FILE:
1102     # storage.FileStorage wants a list of storage directories
1103     return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1104
1105   return []
1106
1107
1108 class LUNodeModifyStorage(NoHooksLU):
1109   """Logical unit for modifying a storage volume on a node.
1110
1111   """
1112   REQ_BGL = False
1113
1114   def CheckArguments(self):
1115     (self.op.node_uuid, self.op.node_name) = \
1116       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1117
1118     storage_type = self.op.storage_type
1119
1120     try:
1121       modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1122     except KeyError:
1123       raise errors.OpPrereqError("Storage units of type '%s' can not be"
1124                                  " modified" % storage_type,
1125                                  errors.ECODE_INVAL)
1126
1127     diff = set(self.op.changes.keys()) - modifiable
1128     if diff:
1129       raise errors.OpPrereqError("The following fields can not be modified for"
1130                                  " storage units of type '%s': %r" %
1131                                  (storage_type, list(diff)),
1132                                  errors.ECODE_INVAL)
1133
1134   def CheckPrereq(self):
1135     """Check prerequisites.
1136
1137     """
1138     CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1139
1140   def ExpandNames(self):
1141     self.needed_locks = {
1142       locking.LEVEL_NODE: self.op.node_uuid,
1143       }
1144
1145   def Exec(self, feedback_fn):
1146     """Computes the list of nodes and their attributes.
1147
1148     """
1149     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1150     result = self.rpc.call_storage_modify(self.op.node_uuid,
1151                                           self.op.storage_type, st_args,
1152                                           self.op.name, self.op.changes)
1153     result.Raise("Failed to modify storage unit '%s' on %s" %
1154                  (self.op.name, self.op.node_name))
1155
1156
1157 class NodeQuery(QueryBase):
1158   FIELDS = query.NODE_FIELDS
1159
1160   def ExpandNames(self, lu):
1161     lu.needed_locks = {}
1162     lu.share_locks = ShareAll()
1163
1164     if self.names:
1165       (self.wanted, _) = GetWantedNodes(lu, self.names)
1166     else:
1167       self.wanted = locking.ALL_SET
1168
1169     self.do_locking = (self.use_locking and
1170                        query.NQ_LIVE in self.requested_data)
1171
1172     if self.do_locking:
1173       # If any non-static field is requested we need to lock the nodes
1174       lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1175       lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1176
1177   def DeclareLocks(self, lu, level):
1178     pass
1179
1180   def _GetQueryData(self, lu):
1181     """Computes the list of nodes and their attributes.
1182
1183     """
1184     all_info = lu.cfg.GetAllNodesInfo()
1185
1186     node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1187
1188     # Gather data as requested
1189     if query.NQ_LIVE in self.requested_data:
1190       # filter out non-vm_capable nodes
1191       toquery_node_uuids = [node.uuid for node in all_info.values()
1192                             if node.vm_capable and node.uuid in node_uuids]
1193       lvm_enabled = utils.storage.IsLvmEnabled(
1194           lu.cfg.GetClusterInfo().enabled_disk_templates)
1195       # FIXME: this per default asks for storage space information for all
1196       # enabled disk templates. Fix this by making it possible to specify
1197       # space report fields for specific disk templates.
1198       raw_storage_units = utils.storage.GetStorageUnitsOfCluster(
1199           lu.cfg, include_spindles=lvm_enabled)
1200       storage_units = rpc.PrepareStorageUnitsForNodes(
1201           lu.cfg, raw_storage_units, toquery_node_uuids)
1202       default_hypervisor = lu.cfg.GetHypervisorType()
1203       hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1204       hvspecs = [(default_hypervisor, hvparams)]
1205       node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1206                                         hvspecs)
1207       live_data = dict(
1208           (uuid, rpc.MakeLegacyNodeInfo(nresult.payload,
1209                                         require_spindles=lvm_enabled))
1210           for (uuid, nresult) in node_data.items()
1211           if not nresult.fail_msg and nresult.payload)
1212     else:
1213       live_data = None
1214
1215     if query.NQ_INST in self.requested_data:
1216       node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1217       node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1218
1219       inst_data = lu.cfg.GetAllInstancesInfo()
1220       inst_uuid_to_inst_name = {}
1221
1222       for inst in inst_data.values():
1223         inst_uuid_to_inst_name[inst.uuid] = inst.name
1224         if inst.primary_node in node_to_primary:
1225           node_to_primary[inst.primary_node].add(inst.uuid)
1226         for secnode in inst.secondary_nodes:
1227           if secnode in node_to_secondary:
1228             node_to_secondary[secnode].add(inst.uuid)
1229     else:
1230       node_to_primary = None
1231       node_to_secondary = None
1232       inst_uuid_to_inst_name = None
1233
1234     if query.NQ_OOB in self.requested_data:
1235       oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1236                          for uuid, node in all_info.iteritems())
1237     else:
1238       oob_support = None
1239
1240     if query.NQ_GROUP in self.requested_data:
1241       groups = lu.cfg.GetAllNodeGroupsInfo()
1242     else:
1243       groups = {}
1244
1245     return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1246                                live_data, lu.cfg.GetMasterNode(),
1247                                node_to_primary, node_to_secondary,
1248                                inst_uuid_to_inst_name, groups, oob_support,
1249                                lu.cfg.GetClusterInfo())
1250
1251
1252 class LUNodeQuery(NoHooksLU):
1253   """Logical unit for querying nodes.
1254
1255   """
1256   # pylint: disable=W0142
1257   REQ_BGL = False
1258
1259   def CheckArguments(self):
1260     self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1261                          self.op.output_fields, self.op.use_locking)
1262
1263   def ExpandNames(self):
1264     self.nq.ExpandNames(self)
1265
1266   def DeclareLocks(self, level):
1267     self.nq.DeclareLocks(self, level)
1268
1269   def Exec(self, feedback_fn):
1270     return self.nq.OldStyleQuery(self)
1271
1272
1273 def _CheckOutputFields(static, dynamic, selected):
1274   """Checks whether all selected fields are valid.
1275
1276   @type static: L{utils.FieldSet}
1277   @param static: static fields set
1278   @type dynamic: L{utils.FieldSet}
1279   @param dynamic: dynamic fields set
1280
1281   """
1282   f = utils.FieldSet()
1283   f.Extend(static)
1284   f.Extend(dynamic)
1285
1286   delta = f.NonMatching(selected)
1287   if delta:
1288     raise errors.OpPrereqError("Unknown output fields selected: %s"
1289                                % ",".join(delta), errors.ECODE_INVAL)
1290
1291
1292 class LUNodeQueryvols(NoHooksLU):
1293   """Logical unit for getting volumes on node(s).
1294
1295   """
1296   REQ_BGL = False
1297   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1298   _FIELDS_STATIC = utils.FieldSet("node")
1299
1300   def CheckArguments(self):
1301     _CheckOutputFields(static=self._FIELDS_STATIC,
1302                        dynamic=self._FIELDS_DYNAMIC,
1303                        selected=self.op.output_fields)
1304
1305   def ExpandNames(self):
1306     self.share_locks = ShareAll()
1307
1308     if self.op.nodes:
1309       self.needed_locks = {
1310         locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1311         }
1312     else:
1313       self.needed_locks = {
1314         locking.LEVEL_NODE: locking.ALL_SET,
1315         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1316         }
1317
1318   def Exec(self, feedback_fn):
1319     """Computes the list of nodes and their attributes.
1320
1321     """
1322     node_uuids = self.owned_locks(locking.LEVEL_NODE)
1323     volumes = self.rpc.call_node_volumes(node_uuids)
1324
1325     ilist = self.cfg.GetAllInstancesInfo()
1326     vol2inst = MapInstanceLvsToNodes(ilist.values())
1327
1328     output = []
1329     for node_uuid in node_uuids:
1330       nresult = volumes[node_uuid]
1331       if nresult.offline:
1332         continue
1333       msg = nresult.fail_msg
1334       if msg:
1335         self.LogWarning("Can't compute volume data on node %s: %s",
1336                         self.cfg.GetNodeName(node_uuid), msg)
1337         continue
1338
1339       node_vols = sorted(nresult.payload,
1340                          key=operator.itemgetter("dev"))
1341
1342       for vol in node_vols:
1343         node_output = []
1344         for field in self.op.output_fields:
1345           if field == "node":
1346             val = self.cfg.GetNodeName(node_uuid)
1347           elif field == "phys":
1348             val = vol["dev"]
1349           elif field == "vg":
1350             val = vol["vg"]
1351           elif field == "name":
1352             val = vol["name"]
1353           elif field == "size":
1354             val = int(float(vol["size"]))
1355           elif field == "instance":
1356             inst = vol2inst.get((node_uuid, vol["vg"] + "/" + vol["name"]),
1357                                 None)
1358             if inst is not None:
1359               val = inst.name
1360             else:
1361               val = "-"
1362           else:
1363             raise errors.ParameterError(field)
1364           node_output.append(str(val))
1365
1366         output.append(node_output)
1367
1368     return output
1369
1370
1371 class LUNodeQueryStorage(NoHooksLU):
1372   """Logical unit for getting information on storage units on node(s).
1373
1374   """
1375   _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1376   REQ_BGL = False
1377
1378   def CheckArguments(self):
1379     _CheckOutputFields(static=self._FIELDS_STATIC,
1380                        dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1381                        selected=self.op.output_fields)
1382
1383   def ExpandNames(self):
1384     self.share_locks = ShareAll()
1385
1386     if self.op.nodes:
1387       self.needed_locks = {
1388         locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1389         }
1390     else:
1391       self.needed_locks = {
1392         locking.LEVEL_NODE: locking.ALL_SET,
1393         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1394         }
1395
1396   def CheckPrereq(self):
1397     """Check prerequisites.
1398
1399     """
1400     CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1401
1402   def Exec(self, feedback_fn):
1403     """Computes the list of nodes and their attributes.
1404
1405     """
1406     self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1407
1408     # Always get name to sort by
1409     if constants.SF_NAME in self.op.output_fields:
1410       fields = self.op.output_fields[:]
1411     else:
1412       fields = [constants.SF_NAME] + self.op.output_fields
1413
1414     # Never ask for node or type as it's only known to the LU
1415     for extra in [constants.SF_NODE, constants.SF_TYPE]:
1416       while extra in fields:
1417         fields.remove(extra)
1418
1419     field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1420     name_idx = field_idx[constants.SF_NAME]
1421
1422     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1423     data = self.rpc.call_storage_list(self.node_uuids,
1424                                       self.op.storage_type, st_args,
1425                                       self.op.name, fields)
1426
1427     result = []
1428
1429     for node_uuid in utils.NiceSort(self.node_uuids):
1430       node_name = self.cfg.GetNodeName(node_uuid)
1431       nresult = data[node_uuid]
1432       if nresult.offline:
1433         continue
1434
1435       msg = nresult.fail_msg
1436       if msg:
1437         self.LogWarning("Can't get storage data from node %s: %s",
1438                         node_name, msg)
1439         continue
1440
1441       rows = dict([(row[name_idx], row) for row in nresult.payload])
1442
1443       for name in utils.NiceSort(rows.keys()):
1444         row = rows[name]
1445
1446         out = []
1447
1448         for field in self.op.output_fields:
1449           if field == constants.SF_NODE:
1450             val = node_name
1451           elif field == constants.SF_TYPE:
1452             val = self.op.storage_type
1453           elif field in field_idx:
1454             val = row[field_idx[field]]
1455           else:
1456             raise errors.ParameterError(field)
1457
1458           out.append(val)
1459
1460         result.append(out)
1461
1462     return result
1463
1464
1465 class LUNodeRemove(LogicalUnit):
1466   """Logical unit for removing a node.
1467
1468   """
1469   HPATH = "node-remove"
1470   HTYPE = constants.HTYPE_NODE
1471
1472   def BuildHooksEnv(self):
1473     """Build hooks env.
1474
1475     """
1476     return {
1477       "OP_TARGET": self.op.node_name,
1478       "NODE_NAME": self.op.node_name,
1479       }
1480
1481   def BuildHooksNodes(self):
1482     """Build hooks nodes.
1483
1484     This doesn't run on the target node in the pre phase as a failed
1485     node would then be impossible to remove.
1486
1487     """
1488     all_nodes = self.cfg.GetNodeList()
1489     try:
1490       all_nodes.remove(self.op.node_uuid)
1491     except ValueError:
1492       pass
1493     return (all_nodes, all_nodes)
1494
1495   def CheckPrereq(self):
1496     """Check prerequisites.
1497
1498     This checks:
1499      - the node exists in the configuration
1500      - it does not have primary or secondary instances
1501      - it's not the master
1502
1503     Any errors are signaled by raising errors.OpPrereqError.
1504
1505     """
1506     (self.op.node_uuid, self.op.node_name) = \
1507       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1508     node = self.cfg.GetNodeInfo(self.op.node_uuid)
1509     assert node is not None
1510
1511     masternode = self.cfg.GetMasterNode()
1512     if node.uuid == masternode:
1513       raise errors.OpPrereqError("Node is the master node, failover to another"
1514                                  " node is required", errors.ECODE_INVAL)
1515
1516     for _, instance in self.cfg.GetAllInstancesInfo().items():
1517       if node.uuid in instance.all_nodes:
1518         raise errors.OpPrereqError("Instance %s is still running on the node,"
1519                                    " please remove first" % instance.name,
1520                                    errors.ECODE_INVAL)
1521     self.op.node_name = node.name
1522     self.node = node
1523
1524   def Exec(self, feedback_fn):
1525     """Removes the node from the cluster.
1526
1527     """
1528     logging.info("Stopping the node daemon and removing configs from node %s",
1529                  self.node.name)
1530
1531     modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1532
1533     assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1534       "Not owning BGL"
1535
1536     # Promote nodes to master candidate as needed
1537     AdjustCandidatePool(self, exceptions=[self.node.uuid])
1538     self.context.RemoveNode(self.node)
1539
1540     # Run post hooks on the node before it's removed
1541     RunPostHook(self, self.node.name)
1542
1543     # we have to call this by name rather than by UUID, as the node is no longer
1544     # in the config
1545     result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1546     msg = result.fail_msg
1547     if msg:
1548       self.LogWarning("Errors encountered on the remote node while leaving"
1549                       " the cluster: %s", msg)
1550
1551     # Remove node from our /etc/hosts
1552     if self.cfg.GetClusterInfo().modify_etc_hosts:
1553       master_node_uuid = self.cfg.GetMasterNode()
1554       result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1555                                               constants.ETC_HOSTS_REMOVE,
1556                                               self.node.name, None)
1557       result.Raise("Can't update hosts file with new host data")
1558       RedistributeAncillaryFiles(self)
1559
1560
1561 class LURepairNodeStorage(NoHooksLU):
1562   """Repairs the volume group on a node.
1563
1564   """
1565   REQ_BGL = False
1566
1567   def CheckArguments(self):
1568     (self.op.node_uuid, self.op.node_name) = \
1569       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1570
1571     storage_type = self.op.storage_type
1572
1573     if (constants.SO_FIX_CONSISTENCY not in
1574         constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1575       raise errors.OpPrereqError("Storage units of type '%s' can not be"
1576                                  " repaired" % storage_type,
1577                                  errors.ECODE_INVAL)
1578
1579   def ExpandNames(self):
1580     self.needed_locks = {
1581       locking.LEVEL_NODE: [self.op.node_uuid],
1582       }
1583
1584   def _CheckFaultyDisks(self, instance, node_uuid):
1585     """Ensure faulty disks abort the opcode or at least warn."""
1586     try:
1587       if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1588                                  node_uuid, True):
1589         raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1590                                    " node '%s'" %
1591                                    (instance.name,
1592                                     self.cfg.GetNodeName(node_uuid)),
1593                                    errors.ECODE_STATE)
1594     except errors.OpPrereqError, err:
1595       if self.op.ignore_consistency:
1596         self.LogWarning(str(err.args[0]))
1597       else:
1598         raise
1599
1600   def CheckPrereq(self):
1601     """Check prerequisites.
1602
1603     """
1604     CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1605
1606     # Check whether any instance on this node has faulty disks
1607     for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1608       if not inst.disks_active:
1609         continue
1610       check_nodes = set(inst.all_nodes)
1611       check_nodes.discard(self.op.node_uuid)
1612       for inst_node_uuid in check_nodes:
1613         self._CheckFaultyDisks(inst, inst_node_uuid)
1614
1615   def Exec(self, feedback_fn):
1616     feedback_fn("Repairing storage unit '%s' on %s ..." %
1617                 (self.op.name, self.op.node_name))
1618
1619     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1620     result = self.rpc.call_storage_execute(self.op.node_uuid,
1621                                            self.op.storage_type, st_args,
1622                                            self.op.name,
1623                                            constants.SO_FIX_CONSISTENCY)
1624     result.Raise("Failed to repair storage unit '%s' on %s" %
1625                  (self.op.name, self.op.node_name))