Make names more descriptive
[ganeti-local] / lib / cmdlib / node.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with nodes."""
23
24 import logging
25 import operator
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti import netutils
31 from ganeti import objects
32 from ganeti import opcodes
33 from ganeti import qlang
34 from ganeti import query
35 from ganeti import rpc
36 from ganeti import utils
37 from ganeti.masterd import iallocator
38
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40   ResultWithJobs
41 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42   MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43   IsExclusiveStorageEnabledNode, CheckNodePVs, \
44   RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45   CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46   AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47   GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
48   FindFaultyInstanceDisks
49
50
51 def _DecideSelfPromotion(lu, exceptions=None):
52   """Decide whether I should promote myself as a master candidate.
53
54   """
55   cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56   mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57   # the new node will increase mc_max with one, so:
58   mc_should = min(mc_should + 1, cp_size)
59   return mc_now < mc_should
60
61
62 def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63   """Ensure that a node has the given secondary ip.
64
65   @type lu: L{LogicalUnit}
66   @param lu: the LU on behalf of which we make the check
67   @type node: L{objects.Node}
68   @param node: the node to check
69   @type secondary_ip: string
70   @param secondary_ip: the ip to check
71   @type prereq: boolean
72   @param prereq: whether to throw a prerequisite or an execute error
73   @raise errors.OpPrereqError: if the node doesn't have the ip,
74   and prereq=True
75   @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76
77   """
78   # this can be called with a new node, which has no UUID yet, so perform the
79   # RPC call using its name
80   result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81   result.Raise("Failure checking secondary ip on node %s" % node.name,
82                prereq=prereq, ecode=errors.ECODE_ENVIRON)
83   if not result.payload:
84     msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85            " please fix and re-run this command" % secondary_ip)
86     if prereq:
87       raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88     else:
89       raise errors.OpExecError(msg)
90
91
92 class LUNodeAdd(LogicalUnit):
93   """Logical unit for adding node to the cluster.
94
95   """
96   HPATH = "node-add"
97   HTYPE = constants.HTYPE_NODE
98   _NFLAGS = ["master_capable", "vm_capable"]
99
100   def CheckArguments(self):
101     self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102     # validate/normalize the node name
103     self.hostname = netutils.GetHostname(name=self.op.node_name,
104                                          family=self.primary_ip_family)
105     self.op.node_name = self.hostname.name
106
107     if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108       raise errors.OpPrereqError("Cannot readd the master node",
109                                  errors.ECODE_STATE)
110
111     if self.op.readd and self.op.group:
112       raise errors.OpPrereqError("Cannot pass a node group when a node is"
113                                  " being readded", errors.ECODE_INVAL)
114
115   def BuildHooksEnv(self):
116     """Build hooks env.
117
118     This will run on all nodes before, and on all nodes + the new node after.
119
120     """
121     return {
122       "OP_TARGET": self.op.node_name,
123       "NODE_NAME": self.op.node_name,
124       "NODE_PIP": self.op.primary_ip,
125       "NODE_SIP": self.op.secondary_ip,
126       "MASTER_CAPABLE": str(self.op.master_capable),
127       "VM_CAPABLE": str(self.op.vm_capable),
128       }
129
130   def BuildHooksNodes(self):
131     """Build hooks nodes.
132
133     """
134     hook_nodes = self.cfg.GetNodeList()
135     new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
136     if new_node_info is not None:
137       # Exclude added node
138       hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
139
140     # add the new node as post hook node by name; it does not have an UUID yet
141     return (hook_nodes, hook_nodes, [self.op.node_name, ])
142
143   def CheckPrereq(self):
144     """Check prerequisites.
145
146     This checks:
147      - the new node is not already in the config
148      - it is resolvable
149      - its parameters (single/dual homed) matches the cluster
150
151     Any errors are signaled by raising errors.OpPrereqError.
152
153     """
154     node_name = self.hostname.name
155     self.op.primary_ip = self.hostname.ip
156     if self.op.secondary_ip is None:
157       if self.primary_ip_family == netutils.IP6Address.family:
158         raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
159                                    " IPv4 address must be given as secondary",
160                                    errors.ECODE_INVAL)
161       self.op.secondary_ip = self.op.primary_ip
162
163     secondary_ip = self.op.secondary_ip
164     if not netutils.IP4Address.IsValid(secondary_ip):
165       raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
166                                  " address" % secondary_ip, errors.ECODE_INVAL)
167
168     existing_node_info = self.cfg.GetNodeInfoByName(node_name)
169     if not self.op.readd and existing_node_info is not None:
170       raise errors.OpPrereqError("Node %s is already in the configuration" %
171                                  node_name, errors.ECODE_EXISTS)
172     elif self.op.readd and existing_node_info is None:
173       raise errors.OpPrereqError("Node %s is not in the configuration" %
174                                  node_name, errors.ECODE_NOENT)
175
176     self.changed_primary_ip = False
177
178     for existing_node in self.cfg.GetAllNodesInfo().values():
179       if self.op.readd and node_name == existing_node.name:
180         if existing_node.secondary_ip != secondary_ip:
181           raise errors.OpPrereqError("Readded node doesn't have the same IP"
182                                      " address configuration as before",
183                                      errors.ECODE_INVAL)
184         if existing_node.primary_ip != self.op.primary_ip:
185           self.changed_primary_ip = True
186
187         continue
188
189       if (existing_node.primary_ip == self.op.primary_ip or
190           existing_node.secondary_ip == self.op.primary_ip or
191           existing_node.primary_ip == secondary_ip or
192           existing_node.secondary_ip == secondary_ip):
193         raise errors.OpPrereqError("New node ip address(es) conflict with"
194                                    " existing node %s" % existing_node.name,
195                                    errors.ECODE_NOTUNIQUE)
196
197     # After this 'if' block, None is no longer a valid value for the
198     # _capable op attributes
199     if self.op.readd:
200       assert existing_node_info is not None, \
201         "Can't retrieve locked node %s" % node_name
202       for attr in self._NFLAGS:
203         if getattr(self.op, attr) is None:
204           setattr(self.op, attr, getattr(existing_node_info, attr))
205     else:
206       for attr in self._NFLAGS:
207         if getattr(self.op, attr) is None:
208           setattr(self.op, attr, True)
209
210     if self.op.readd and not self.op.vm_capable:
211       pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
212       if pri or sec:
213         raise errors.OpPrereqError("Node %s being re-added with vm_capable"
214                                    " flag set to false, but it already holds"
215                                    " instances" % node_name,
216                                    errors.ECODE_STATE)
217
218     # check that the type of the node (single versus dual homed) is the
219     # same as for the master
220     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
221     master_singlehomed = myself.secondary_ip == myself.primary_ip
222     newbie_singlehomed = secondary_ip == self.op.primary_ip
223     if master_singlehomed != newbie_singlehomed:
224       if master_singlehomed:
225         raise errors.OpPrereqError("The master has no secondary ip but the"
226                                    " new node has one",
227                                    errors.ECODE_INVAL)
228       else:
229         raise errors.OpPrereqError("The master has a secondary ip but the"
230                                    " new node doesn't have one",
231                                    errors.ECODE_INVAL)
232
233     # checks reachability
234     if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
235       raise errors.OpPrereqError("Node not reachable by ping",
236                                  errors.ECODE_ENVIRON)
237
238     if not newbie_singlehomed:
239       # check reachability from my secondary ip to newbie's secondary ip
240       if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
241                               source=myself.secondary_ip):
242         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
243                                    " based ping to node daemon port",
244                                    errors.ECODE_ENVIRON)
245
246     if self.op.readd:
247       exceptions = [existing_node_info.uuid]
248     else:
249       exceptions = []
250
251     if self.op.master_capable:
252       self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
253     else:
254       self.master_candidate = False
255
256     if self.op.readd:
257       self.new_node = existing_node_info
258     else:
259       node_group = self.cfg.LookupNodeGroup(self.op.group)
260       self.new_node = objects.Node(name=node_name,
261                                    primary_ip=self.op.primary_ip,
262                                    secondary_ip=secondary_ip,
263                                    master_candidate=self.master_candidate,
264                                    offline=False, drained=False,
265                                    group=node_group, ndparams={})
266
267     if self.op.ndparams:
268       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
269       CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
270                            "node", "cluster or group")
271
272     if self.op.hv_state:
273       self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
274
275     if self.op.disk_state:
276       self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
277
278     # TODO: If we need to have multiple DnsOnlyRunner we probably should make
279     #       it a property on the base class.
280     rpcrunner = rpc.DnsOnlyRunner()
281     result = rpcrunner.call_version([node_name])[node_name]
282     result.Raise("Can't get version information from node %s" % node_name)
283     if constants.PROTOCOL_VERSION == result.payload:
284       logging.info("Communication to node %s fine, sw version %s match",
285                    node_name, result.payload)
286     else:
287       raise errors.OpPrereqError("Version mismatch master version %s,"
288                                  " node version %s" %
289                                  (constants.PROTOCOL_VERSION, result.payload),
290                                  errors.ECODE_ENVIRON)
291
292     vg_name = self.cfg.GetVGName()
293     if vg_name is not None:
294       vparams = {constants.NV_PVLIST: [vg_name]}
295       excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
296       cname = self.cfg.GetClusterName()
297       result = rpcrunner.call_node_verify_light(
298           [node_name], vparams, cname,
299           self.cfg.GetClusterInfo().hvparams)[node_name]
300       (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
301       if errmsgs:
302         raise errors.OpPrereqError("Checks on node PVs failed: %s" %
303                                    "; ".join(errmsgs), errors.ECODE_ENVIRON)
304
305   def Exec(self, feedback_fn):
306     """Adds the new node to the cluster.
307
308     """
309     assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
310       "Not owning BGL"
311
312     # We adding a new node so we assume it's powered
313     self.new_node.powered = True
314
315     # for re-adds, reset the offline/drained/master-candidate flags;
316     # we need to reset here, otherwise offline would prevent RPC calls
317     # later in the procedure; this also means that if the re-add
318     # fails, we are left with a non-offlined, broken node
319     if self.op.readd:
320       self.new_node.drained = False
321       self.LogInfo("Readding a node, the offline/drained flags were reset")
322       # if we demote the node, we do cleanup later in the procedure
323       self.new_node.master_candidate = self.master_candidate
324       if self.changed_primary_ip:
325         self.new_node.primary_ip = self.op.primary_ip
326
327     # copy the master/vm_capable flags
328     for attr in self._NFLAGS:
329       setattr(self.new_node, attr, getattr(self.op, attr))
330
331     # notify the user about any possible mc promotion
332     if self.new_node.master_candidate:
333       self.LogInfo("Node will be a master candidate")
334
335     if self.op.ndparams:
336       self.new_node.ndparams = self.op.ndparams
337     else:
338       self.new_node.ndparams = {}
339
340     if self.op.hv_state:
341       self.new_node.hv_state_static = self.new_hv_state
342
343     if self.op.disk_state:
344       self.new_node.disk_state_static = self.new_disk_state
345
346     # Add node to our /etc/hosts, and add key to known_hosts
347     if self.cfg.GetClusterInfo().modify_etc_hosts:
348       master_node = self.cfg.GetMasterNode()
349       result = self.rpc.call_etc_hosts_modify(
350                  master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
351                  self.hostname.ip)
352       result.Raise("Can't update hosts file with new host data")
353
354     if self.new_node.secondary_ip != self.new_node.primary_ip:
355       _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
356                                False)
357
358     node_verifier_uuids = [self.cfg.GetMasterNode()]
359     node_verify_param = {
360       constants.NV_NODELIST: ([self.new_node.name], {}),
361       # TODO: do a node-net-test as well?
362     }
363
364     result = self.rpc.call_node_verify(
365                node_verifier_uuids, node_verify_param,
366                self.cfg.GetClusterName(),
367                self.cfg.GetClusterInfo().hvparams)
368     for verifier in node_verifier_uuids:
369       result[verifier].Raise("Cannot communicate with node %s" % verifier)
370       nl_payload = result[verifier].payload[constants.NV_NODELIST]
371       if nl_payload:
372         for failed in nl_payload:
373           feedback_fn("ssh/hostname verification failed"
374                       " (checking from %s): %s" %
375                       (verifier, nl_payload[failed]))
376         raise errors.OpExecError("ssh/hostname verification failed")
377
378     if self.op.readd:
379       self.context.ReaddNode(self.new_node)
380       RedistributeAncillaryFiles(self)
381       # make sure we redistribute the config
382       self.cfg.Update(self.new_node, feedback_fn)
383       # and make sure the new node will not have old files around
384       if not self.new_node.master_candidate:
385         result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
386         result.Warn("Node failed to demote itself from master candidate status",
387                     self.LogWarning)
388     else:
389       self.context.AddNode(self.new_node, self.proc.GetECId())
390       RedistributeAncillaryFiles(self)
391
392
393 class LUNodeSetParams(LogicalUnit):
394   """Modifies the parameters of a node.
395
396   @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
397       to the node role (as _ROLE_*)
398   @cvar _R2F: a dictionary from node role to tuples of flags
399   @cvar _FLAGS: a list of attribute names corresponding to the flags
400
401   """
402   HPATH = "node-modify"
403   HTYPE = constants.HTYPE_NODE
404   REQ_BGL = False
405   (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
406   _F2R = {
407     (True, False, False): _ROLE_CANDIDATE,
408     (False, True, False): _ROLE_DRAINED,
409     (False, False, True): _ROLE_OFFLINE,
410     (False, False, False): _ROLE_REGULAR,
411     }
412   _R2F = dict((v, k) for k, v in _F2R.items())
413   _FLAGS = ["master_candidate", "drained", "offline"]
414
415   def CheckArguments(self):
416     (self.op.node_uuid, self.op.node_name) = \
417       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
418     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
419                 self.op.master_capable, self.op.vm_capable,
420                 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
421                 self.op.disk_state]
422     if all_mods.count(None) == len(all_mods):
423       raise errors.OpPrereqError("Please pass at least one modification",
424                                  errors.ECODE_INVAL)
425     if all_mods.count(True) > 1:
426       raise errors.OpPrereqError("Can't set the node into more than one"
427                                  " state at the same time",
428                                  errors.ECODE_INVAL)
429
430     # Boolean value that tells us whether we might be demoting from MC
431     self.might_demote = (self.op.master_candidate is False or
432                          self.op.offline is True or
433                          self.op.drained is True or
434                          self.op.master_capable is False)
435
436     if self.op.secondary_ip:
437       if not netutils.IP4Address.IsValid(self.op.secondary_ip):
438         raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
439                                    " address" % self.op.secondary_ip,
440                                    errors.ECODE_INVAL)
441
442     self.lock_all = self.op.auto_promote and self.might_demote
443     self.lock_instances = self.op.secondary_ip is not None
444
445   def _InstanceFilter(self, instance):
446     """Filter for getting affected instances.
447
448     """
449     return (instance.disk_template in constants.DTS_INT_MIRROR and
450             self.op.node_uuid in instance.all_nodes)
451
452   def ExpandNames(self):
453     if self.lock_all:
454       self.needed_locks = {
455         locking.LEVEL_NODE: locking.ALL_SET,
456
457         # Block allocations when all nodes are locked
458         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
459         }
460     else:
461       self.needed_locks = {
462         locking.LEVEL_NODE: self.op.node_uuid,
463         }
464
465     # Since modifying a node can have severe effects on currently running
466     # operations the resource lock is at least acquired in shared mode
467     self.needed_locks[locking.LEVEL_NODE_RES] = \
468       self.needed_locks[locking.LEVEL_NODE]
469
470     # Get all locks except nodes in shared mode; they are not used for anything
471     # but read-only access
472     self.share_locks = ShareAll()
473     self.share_locks[locking.LEVEL_NODE] = 0
474     self.share_locks[locking.LEVEL_NODE_RES] = 0
475     self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
476
477     if self.lock_instances:
478       self.needed_locks[locking.LEVEL_INSTANCE] = \
479         self.cfg.GetInstanceNames(
480           self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
481
482   def BuildHooksEnv(self):
483     """Build hooks env.
484
485     This runs on the master node.
486
487     """
488     return {
489       "OP_TARGET": self.op.node_name,
490       "MASTER_CANDIDATE": str(self.op.master_candidate),
491       "OFFLINE": str(self.op.offline),
492       "DRAINED": str(self.op.drained),
493       "MASTER_CAPABLE": str(self.op.master_capable),
494       "VM_CAPABLE": str(self.op.vm_capable),
495       }
496
497   def BuildHooksNodes(self):
498     """Build hooks nodes.
499
500     """
501     nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
502     return (nl, nl)
503
504   def CheckPrereq(self):
505     """Check prerequisites.
506
507     This only checks the instance list against the existing names.
508
509     """
510     node = self.cfg.GetNodeInfo(self.op.node_uuid)
511     if self.lock_instances:
512       affected_instances = \
513         self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
514
515       # Verify instance locks
516       owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
517       wanted_instance_names = frozenset([inst.name for inst in
518                                          affected_instances.values()])
519       if wanted_instance_names - owned_instance_names:
520         raise errors.OpPrereqError("Instances affected by changing node %s's"
521                                    " secondary IP address have changed since"
522                                    " locks were acquired, wanted '%s', have"
523                                    " '%s'; retry the operation" %
524                                    (node.name,
525                                     utils.CommaJoin(wanted_instance_names),
526                                     utils.CommaJoin(owned_instance_names)),
527                                    errors.ECODE_STATE)
528     else:
529       affected_instances = None
530
531     if (self.op.master_candidate is not None or
532         self.op.drained is not None or
533         self.op.offline is not None):
534       # we can't change the master's node flags
535       if node.uuid == self.cfg.GetMasterNode():
536         raise errors.OpPrereqError("The master role can be changed"
537                                    " only via master-failover",
538                                    errors.ECODE_INVAL)
539
540     if self.op.master_candidate and not node.master_capable:
541       raise errors.OpPrereqError("Node %s is not master capable, cannot make"
542                                  " it a master candidate" % node.name,
543                                  errors.ECODE_STATE)
544
545     if self.op.vm_capable is False:
546       (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
547       if ipri or isec:
548         raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
549                                    " the vm_capable flag" % node.name,
550                                    errors.ECODE_STATE)
551
552     if node.master_candidate and self.might_demote and not self.lock_all:
553       assert not self.op.auto_promote, "auto_promote set but lock_all not"
554       # check if after removing the current node, we're missing master
555       # candidates
556       (mc_remaining, mc_should, _) = \
557           self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
558       if mc_remaining < mc_should:
559         raise errors.OpPrereqError("Not enough master candidates, please"
560                                    " pass auto promote option to allow"
561                                    " promotion (--auto-promote or RAPI"
562                                    " auto_promote=True)", errors.ECODE_STATE)
563
564     self.old_flags = old_flags = (node.master_candidate,
565                                   node.drained, node.offline)
566     assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
567     self.old_role = old_role = self._F2R[old_flags]
568
569     # Check for ineffective changes
570     for attr in self._FLAGS:
571       if getattr(self.op, attr) is False and getattr(node, attr) is False:
572         self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
573         setattr(self.op, attr, None)
574
575     # Past this point, any flag change to False means a transition
576     # away from the respective state, as only real changes are kept
577
578     # TODO: We might query the real power state if it supports OOB
579     if SupportsOob(self.cfg, node):
580       if self.op.offline is False and not (node.powered or
581                                            self.op.powered is True):
582         raise errors.OpPrereqError(("Node %s needs to be turned on before its"
583                                     " offline status can be reset") %
584                                    self.op.node_name, errors.ECODE_STATE)
585     elif self.op.powered is not None:
586       raise errors.OpPrereqError(("Unable to change powered state for node %s"
587                                   " as it does not support out-of-band"
588                                   " handling") % self.op.node_name,
589                                  errors.ECODE_STATE)
590
591     # If we're being deofflined/drained, we'll MC ourself if needed
592     if (self.op.drained is False or self.op.offline is False or
593         (self.op.master_capable and not node.master_capable)):
594       if _DecideSelfPromotion(self):
595         self.op.master_candidate = True
596         self.LogInfo("Auto-promoting node to master candidate")
597
598     # If we're no longer master capable, we'll demote ourselves from MC
599     if self.op.master_capable is False and node.master_candidate:
600       self.LogInfo("Demoting from master candidate")
601       self.op.master_candidate = False
602
603     # Compute new role
604     assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
605     if self.op.master_candidate:
606       new_role = self._ROLE_CANDIDATE
607     elif self.op.drained:
608       new_role = self._ROLE_DRAINED
609     elif self.op.offline:
610       new_role = self._ROLE_OFFLINE
611     elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
612       # False is still in new flags, which means we're un-setting (the
613       # only) True flag
614       new_role = self._ROLE_REGULAR
615     else: # no new flags, nothing, keep old role
616       new_role = old_role
617
618     self.new_role = new_role
619
620     if old_role == self._ROLE_OFFLINE and new_role != old_role:
621       # Trying to transition out of offline status
622       result = self.rpc.call_version([node.uuid])[node.uuid]
623       if result.fail_msg:
624         raise errors.OpPrereqError("Node %s is being de-offlined but fails"
625                                    " to report its version: %s" %
626                                    (node.name, result.fail_msg),
627                                    errors.ECODE_STATE)
628       else:
629         self.LogWarning("Transitioning node from offline to online state"
630                         " without using re-add. Please make sure the node"
631                         " is healthy!")
632
633     # When changing the secondary ip, verify if this is a single-homed to
634     # multi-homed transition or vice versa, and apply the relevant
635     # restrictions.
636     if self.op.secondary_ip:
637       # Ok even without locking, because this can't be changed by any LU
638       master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
639       master_singlehomed = master.secondary_ip == master.primary_ip
640       if master_singlehomed and self.op.secondary_ip != node.primary_ip:
641         if self.op.force and node.uuid == master.uuid:
642           self.LogWarning("Transitioning from single-homed to multi-homed"
643                           " cluster; all nodes will require a secondary IP"
644                           " address")
645         else:
646           raise errors.OpPrereqError("Changing the secondary ip on a"
647                                      " single-homed cluster requires the"
648                                      " --force option to be passed, and the"
649                                      " target node to be the master",
650                                      errors.ECODE_INVAL)
651       elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
652         if self.op.force and node.uuid == master.uuid:
653           self.LogWarning("Transitioning from multi-homed to single-homed"
654                           " cluster; secondary IP addresses will have to be"
655                           " removed")
656         else:
657           raise errors.OpPrereqError("Cannot set the secondary IP to be the"
658                                      " same as the primary IP on a multi-homed"
659                                      " cluster, unless the --force option is"
660                                      " passed, and the target node is the"
661                                      " master", errors.ECODE_INVAL)
662
663       assert not (set([inst.name for inst in affected_instances.values()]) -
664                   self.owned_locks(locking.LEVEL_INSTANCE))
665
666       if node.offline:
667         if affected_instances:
668           msg = ("Cannot change secondary IP address: offline node has"
669                  " instances (%s) configured to use it" %
670                  utils.CommaJoin(
671                    [inst.name for inst in affected_instances.values()]))
672           raise errors.OpPrereqError(msg, errors.ECODE_STATE)
673       else:
674         # On online nodes, check that no instances are running, and that
675         # the node has the new ip and we can reach it.
676         for instance in affected_instances.values():
677           CheckInstanceState(self, instance, INSTANCE_DOWN,
678                              msg="cannot change secondary ip")
679
680         _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
681         if master.uuid != node.uuid:
682           # check reachability from master secondary ip to new secondary ip
683           if not netutils.TcpPing(self.op.secondary_ip,
684                                   constants.DEFAULT_NODED_PORT,
685                                   source=master.secondary_ip):
686             raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
687                                        " based ping to node daemon port",
688                                        errors.ECODE_ENVIRON)
689
690     if self.op.ndparams:
691       new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
692       utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
693       CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
694                            "node", "cluster or group")
695       self.new_ndparams = new_ndparams
696
697     if self.op.hv_state:
698       self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
699                                                 node.hv_state_static)
700
701     if self.op.disk_state:
702       self.new_disk_state = \
703         MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
704
705   def Exec(self, feedback_fn):
706     """Modifies a node.
707
708     """
709     node = self.cfg.GetNodeInfo(self.op.node_uuid)
710     result = []
711
712     if self.op.ndparams:
713       node.ndparams = self.new_ndparams
714
715     if self.op.powered is not None:
716       node.powered = self.op.powered
717
718     if self.op.hv_state:
719       node.hv_state_static = self.new_hv_state
720
721     if self.op.disk_state:
722       node.disk_state_static = self.new_disk_state
723
724     for attr in ["master_capable", "vm_capable"]:
725       val = getattr(self.op, attr)
726       if val is not None:
727         setattr(node, attr, val)
728         result.append((attr, str(val)))
729
730     if self.new_role != self.old_role:
731       # Tell the node to demote itself, if no longer MC and not offline
732       if self.old_role == self._ROLE_CANDIDATE and \
733           self.new_role != self._ROLE_OFFLINE:
734         msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
735         if msg:
736           self.LogWarning("Node failed to demote itself: %s", msg)
737
738       new_flags = self._R2F[self.new_role]
739       for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
740         if of != nf:
741           result.append((desc, str(nf)))
742       (node.master_candidate, node.drained, node.offline) = new_flags
743
744       # we locked all nodes, we adjust the CP before updating this node
745       if self.lock_all:
746         AdjustCandidatePool(self, [node.uuid])
747
748     if self.op.secondary_ip:
749       node.secondary_ip = self.op.secondary_ip
750       result.append(("secondary_ip", self.op.secondary_ip))
751
752     # this will trigger configuration file update, if needed
753     self.cfg.Update(node, feedback_fn)
754
755     # this will trigger job queue propagation or cleanup if the mc
756     # flag changed
757     if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
758       self.context.ReaddNode(node)
759
760     return result
761
762
763 class LUNodePowercycle(NoHooksLU):
764   """Powercycles a node.
765
766   """
767   REQ_BGL = False
768
769   def CheckArguments(self):
770     (self.op.node_uuid, self.op.node_name) = \
771       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
772
773     if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
774       raise errors.OpPrereqError("The node is the master and the force"
775                                  " parameter was not set",
776                                  errors.ECODE_INVAL)
777
778   def ExpandNames(self):
779     """Locking for PowercycleNode.
780
781     This is a last-resort option and shouldn't block on other
782     jobs. Therefore, we grab no locks.
783
784     """
785     self.needed_locks = {}
786
787   def Exec(self, feedback_fn):
788     """Reboots a node.
789
790     """
791     default_hypervisor = self.cfg.GetHypervisorType()
792     hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
793     result = self.rpc.call_node_powercycle(self.op.node_uuid,
794                                            default_hypervisor,
795                                            hvparams)
796     result.Raise("Failed to schedule the reboot")
797     return result.payload
798
799
800 def _GetNodeInstancesInner(cfg, fn):
801   return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
802
803
804 def _GetNodePrimaryInstances(cfg, node_uuid):
805   """Returns primary instances on a node.
806
807   """
808   return _GetNodeInstancesInner(cfg,
809                                 lambda inst: node_uuid == inst.primary_node)
810
811
812 def _GetNodeSecondaryInstances(cfg, node_uuid):
813   """Returns secondary instances on a node.
814
815   """
816   return _GetNodeInstancesInner(cfg,
817                                 lambda inst: node_uuid in inst.secondary_nodes)
818
819
820 def _GetNodeInstances(cfg, node_uuid):
821   """Returns a list of all primary and secondary instances on a node.
822
823   """
824
825   return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
826
827
828 class LUNodeEvacuate(NoHooksLU):
829   """Evacuates instances off a list of nodes.
830
831   """
832   REQ_BGL = False
833
834   _MODE2IALLOCATOR = {
835     constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
836     constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
837     constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
838     }
839   assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
840   assert (frozenset(_MODE2IALLOCATOR.values()) ==
841           constants.IALLOCATOR_NEVAC_MODES)
842
843   def CheckArguments(self):
844     CheckIAllocatorOrNode(self, "iallocator", "remote_node")
845
846   def ExpandNames(self):
847     (self.op.node_uuid, self.op.node_name) = \
848       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
849
850     if self.op.remote_node is not None:
851       (self.op.remote_node_uuid, self.op.remote_node) = \
852         ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
853                               self.op.remote_node)
854       assert self.op.remote_node
855
856       if self.op.node_uuid == self.op.remote_node_uuid:
857         raise errors.OpPrereqError("Can not use evacuated node as a new"
858                                    " secondary node", errors.ECODE_INVAL)
859
860       if self.op.mode != constants.NODE_EVAC_SEC:
861         raise errors.OpPrereqError("Without the use of an iallocator only"
862                                    " secondary instances can be evacuated",
863                                    errors.ECODE_INVAL)
864
865     # Declare locks
866     self.share_locks = ShareAll()
867     self.needed_locks = {
868       locking.LEVEL_INSTANCE: [],
869       locking.LEVEL_NODEGROUP: [],
870       locking.LEVEL_NODE: [],
871       }
872
873     # Determine nodes (via group) optimistically, needs verification once locks
874     # have been acquired
875     self.lock_nodes = self._DetermineNodes()
876
877   def _DetermineNodes(self):
878     """Gets the list of node UUIDs to operate on.
879
880     """
881     if self.op.remote_node is None:
882       # Iallocator will choose any node(s) in the same group
883       group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
884     else:
885       group_nodes = frozenset([self.op.remote_node_uuid])
886
887     # Determine nodes to be locked
888     return set([self.op.node_uuid]) | group_nodes
889
890   def _DetermineInstances(self):
891     """Builds list of instances to operate on.
892
893     """
894     assert self.op.mode in constants.NODE_EVAC_MODES
895
896     if self.op.mode == constants.NODE_EVAC_PRI:
897       # Primary instances only
898       inst_fn = _GetNodePrimaryInstances
899       assert self.op.remote_node is None, \
900         "Evacuating primary instances requires iallocator"
901     elif self.op.mode == constants.NODE_EVAC_SEC:
902       # Secondary instances only
903       inst_fn = _GetNodeSecondaryInstances
904     else:
905       # All instances
906       assert self.op.mode == constants.NODE_EVAC_ALL
907       inst_fn = _GetNodeInstances
908       # TODO: In 2.6, change the iallocator interface to take an evacuation mode
909       # per instance
910       raise errors.OpPrereqError("Due to an issue with the iallocator"
911                                  " interface it is not possible to evacuate"
912                                  " all instances at once; specify explicitly"
913                                  " whether to evacuate primary or secondary"
914                                  " instances",
915                                  errors.ECODE_INVAL)
916
917     return inst_fn(self.cfg, self.op.node_uuid)
918
919   def DeclareLocks(self, level):
920     if level == locking.LEVEL_INSTANCE:
921       # Lock instances optimistically, needs verification once node and group
922       # locks have been acquired
923       self.needed_locks[locking.LEVEL_INSTANCE] = \
924         set(i.name for i in self._DetermineInstances())
925
926     elif level == locking.LEVEL_NODEGROUP:
927       # Lock node groups for all potential target nodes optimistically, needs
928       # verification once nodes have been acquired
929       self.needed_locks[locking.LEVEL_NODEGROUP] = \
930         self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
931
932     elif level == locking.LEVEL_NODE:
933       self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
934
935   def CheckPrereq(self):
936     # Verify locks
937     owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
938     owned_nodes = self.owned_locks(locking.LEVEL_NODE)
939     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
940
941     need_nodes = self._DetermineNodes()
942
943     if not owned_nodes.issuperset(need_nodes):
944       raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
945                                  " locks were acquired, current nodes are"
946                                  " are '%s', used to be '%s'; retry the"
947                                  " operation" %
948                                  (self.op.node_name,
949                                   utils.CommaJoin(need_nodes),
950                                   utils.CommaJoin(owned_nodes)),
951                                  errors.ECODE_STATE)
952
953     wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
954     if owned_groups != wanted_groups:
955       raise errors.OpExecError("Node groups changed since locks were acquired,"
956                                " current groups are '%s', used to be '%s';"
957                                " retry the operation" %
958                                (utils.CommaJoin(wanted_groups),
959                                 utils.CommaJoin(owned_groups)))
960
961     # Determine affected instances
962     self.instances = self._DetermineInstances()
963     self.instance_names = [i.name for i in self.instances]
964
965     if set(self.instance_names) != owned_instance_names:
966       raise errors.OpExecError("Instances on node '%s' changed since locks"
967                                " were acquired, current instances are '%s',"
968                                " used to be '%s'; retry the operation" %
969                                (self.op.node_name,
970                                 utils.CommaJoin(self.instance_names),
971                                 utils.CommaJoin(owned_instance_names)))
972
973     if self.instance_names:
974       self.LogInfo("Evacuating instances from node '%s': %s",
975                    self.op.node_name,
976                    utils.CommaJoin(utils.NiceSort(self.instance_names)))
977     else:
978       self.LogInfo("No instances to evacuate from node '%s'",
979                    self.op.node_name)
980
981     if self.op.remote_node is not None:
982       for i in self.instances:
983         if i.primary_node == self.op.remote_node_uuid:
984           raise errors.OpPrereqError("Node %s is the primary node of"
985                                      " instance %s, cannot use it as"
986                                      " secondary" %
987                                      (self.op.remote_node, i.name),
988                                      errors.ECODE_INVAL)
989
990   def Exec(self, feedback_fn):
991     assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
992
993     if not self.instance_names:
994       # No instances to evacuate
995       jobs = []
996
997     elif self.op.iallocator is not None:
998       # TODO: Implement relocation to other group
999       evac_mode = self._MODE2IALLOCATOR[self.op.mode]
1000       req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
1001                                      instances=list(self.instance_names))
1002       ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1003
1004       ial.Run(self.op.iallocator)
1005
1006       if not ial.success:
1007         raise errors.OpPrereqError("Can't compute node evacuation using"
1008                                    " iallocator '%s': %s" %
1009                                    (self.op.iallocator, ial.info),
1010                                    errors.ECODE_NORES)
1011
1012       jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1013
1014     elif self.op.remote_node is not None:
1015       assert self.op.mode == constants.NODE_EVAC_SEC
1016       jobs = [
1017         [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1018                                         remote_node=self.op.remote_node,
1019                                         disks=[],
1020                                         mode=constants.REPLACE_DISK_CHG,
1021                                         early_release=self.op.early_release)]
1022         for instance_name in self.instance_names]
1023
1024     else:
1025       raise errors.ProgrammerError("No iallocator or remote node")
1026
1027     return ResultWithJobs(jobs)
1028
1029
1030 class LUNodeMigrate(LogicalUnit):
1031   """Migrate all instances from a node.
1032
1033   """
1034   HPATH = "node-migrate"
1035   HTYPE = constants.HTYPE_NODE
1036   REQ_BGL = False
1037
1038   def CheckArguments(self):
1039     pass
1040
1041   def ExpandNames(self):
1042     (self.op.node_uuid, self.op.node_name) = \
1043       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1044
1045     self.share_locks = ShareAll()
1046     self.needed_locks = {
1047       locking.LEVEL_NODE: [self.op.node_uuid],
1048       }
1049
1050   def BuildHooksEnv(self):
1051     """Build hooks env.
1052
1053     This runs on the master, the primary and all the secondaries.
1054
1055     """
1056     return {
1057       "NODE_NAME": self.op.node_name,
1058       "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1059       }
1060
1061   def BuildHooksNodes(self):
1062     """Build hooks nodes.
1063
1064     """
1065     nl = [self.cfg.GetMasterNode()]
1066     return (nl, nl)
1067
1068   def CheckPrereq(self):
1069     pass
1070
1071   def Exec(self, feedback_fn):
1072     # Prepare jobs for migration instances
1073     jobs = [
1074       [opcodes.OpInstanceMigrate(
1075         instance_name=inst.name,
1076         mode=self.op.mode,
1077         live=self.op.live,
1078         iallocator=self.op.iallocator,
1079         target_node=self.op.target_node,
1080         allow_runtime_changes=self.op.allow_runtime_changes,
1081         ignore_ipolicy=self.op.ignore_ipolicy)]
1082       for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1083
1084     # TODO: Run iallocator in this opcode and pass correct placement options to
1085     # OpInstanceMigrate. Since other jobs can modify the cluster between
1086     # running the iallocator and the actual migration, a good consistency model
1087     # will have to be found.
1088
1089     assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1090             frozenset([self.op.node_uuid]))
1091
1092     return ResultWithJobs(jobs)
1093
1094
1095 def _GetStorageTypeArgs(cfg, storage_type):
1096   """Returns the arguments for a storage type.
1097
1098   """
1099   # Special case for file storage
1100   if storage_type == constants.ST_FILE:
1101     # storage.FileStorage wants a list of storage directories
1102     return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1103
1104   return []
1105
1106
1107 class LUNodeModifyStorage(NoHooksLU):
1108   """Logical unit for modifying a storage volume on a node.
1109
1110   """
1111   REQ_BGL = False
1112
1113   def CheckArguments(self):
1114     (self.op.node_uuid, self.op.node_name) = \
1115       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1116
1117     storage_type = self.op.storage_type
1118
1119     try:
1120       modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1121     except KeyError:
1122       raise errors.OpPrereqError("Storage units of type '%s' can not be"
1123                                  " modified" % storage_type,
1124                                  errors.ECODE_INVAL)
1125
1126     diff = set(self.op.changes.keys()) - modifiable
1127     if diff:
1128       raise errors.OpPrereqError("The following fields can not be modified for"
1129                                  " storage units of type '%s': %r" %
1130                                  (storage_type, list(diff)),
1131                                  errors.ECODE_INVAL)
1132
1133   def ExpandNames(self):
1134     self.needed_locks = {
1135       locking.LEVEL_NODE: self.op.node_uuid,
1136       }
1137
1138   def Exec(self, feedback_fn):
1139     """Computes the list of nodes and their attributes.
1140
1141     """
1142     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1143     result = self.rpc.call_storage_modify(self.op.node_uuid,
1144                                           self.op.storage_type, st_args,
1145                                           self.op.name, self.op.changes)
1146     result.Raise("Failed to modify storage unit '%s' on %s" %
1147                  (self.op.name, self.op.node_name))
1148
1149
1150 class NodeQuery(QueryBase):
1151   FIELDS = query.NODE_FIELDS
1152
1153   def ExpandNames(self, lu):
1154     lu.needed_locks = {}
1155     lu.share_locks = ShareAll()
1156
1157     if self.names:
1158       (self.wanted, _) = GetWantedNodes(lu, self.names)
1159     else:
1160       self.wanted = locking.ALL_SET
1161
1162     self.do_locking = (self.use_locking and
1163                        query.NQ_LIVE in self.requested_data)
1164
1165     if self.do_locking:
1166       # If any non-static field is requested we need to lock the nodes
1167       lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1168       lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1169
1170   def DeclareLocks(self, lu, level):
1171     pass
1172
1173   def _GetQueryData(self, lu):
1174     """Computes the list of nodes and their attributes.
1175
1176     """
1177     all_info = lu.cfg.GetAllNodesInfo()
1178
1179     node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1180
1181     # Gather data as requested
1182     if query.NQ_LIVE in self.requested_data:
1183       # filter out non-vm_capable nodes
1184       toquery_node_uuids = [node.uuid for node in all_info.values()
1185                             if node.vm_capable and node.uuid in node_uuids]
1186       # FIXME: this per default asks for storage space information for all
1187       # enabled disk templates. Fix this by making it possible to specify
1188       # space report fields for specific disk templates.
1189       raw_storage_units = utils.storage.GetStorageUnitsOfCluster(
1190           lu.cfg, include_spindles=True)
1191       storage_units = rpc.PrepareStorageUnitsForNodes(
1192           lu.cfg, raw_storage_units, toquery_node_uuids)
1193       lvm_enabled = utils.storage.IsLvmEnabled(
1194           lu.cfg.GetClusterInfo().enabled_disk_templates)
1195       default_hypervisor = lu.cfg.GetHypervisorType()
1196       hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1197       hvspecs = [(default_hypervisor, hvparams)]
1198       node_data = lu.rpc.call_node_info(toquery_node_uuids, storage_units,
1199                                         hvspecs)
1200       live_data = dict(
1201           (uuid, rpc.MakeLegacyNodeInfo(nresult.payload,
1202                                         require_vg_info=lvm_enabled))
1203           for (uuid, nresult) in node_data.items()
1204           if not nresult.fail_msg and nresult.payload)
1205     else:
1206       live_data = None
1207
1208     if query.NQ_INST in self.requested_data:
1209       node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1210       node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1211
1212       inst_data = lu.cfg.GetAllInstancesInfo()
1213       inst_uuid_to_inst_name = {}
1214
1215       for inst in inst_data.values():
1216         inst_uuid_to_inst_name[inst.uuid] = inst.name
1217         if inst.primary_node in node_to_primary:
1218           node_to_primary[inst.primary_node].add(inst.uuid)
1219         for secnode in inst.secondary_nodes:
1220           if secnode in node_to_secondary:
1221             node_to_secondary[secnode].add(inst.uuid)
1222     else:
1223       node_to_primary = None
1224       node_to_secondary = None
1225       inst_uuid_to_inst_name = None
1226
1227     if query.NQ_OOB in self.requested_data:
1228       oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1229                          for uuid, node in all_info.iteritems())
1230     else:
1231       oob_support = None
1232
1233     if query.NQ_GROUP in self.requested_data:
1234       groups = lu.cfg.GetAllNodeGroupsInfo()
1235     else:
1236       groups = {}
1237
1238     return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1239                                live_data, lu.cfg.GetMasterNode(),
1240                                node_to_primary, node_to_secondary,
1241                                inst_uuid_to_inst_name, groups, oob_support,
1242                                lu.cfg.GetClusterInfo())
1243
1244
1245 class LUNodeQuery(NoHooksLU):
1246   """Logical unit for querying nodes.
1247
1248   """
1249   # pylint: disable=W0142
1250   REQ_BGL = False
1251
1252   def CheckArguments(self):
1253     self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1254                          self.op.output_fields, self.op.use_locking)
1255
1256   def ExpandNames(self):
1257     self.nq.ExpandNames(self)
1258
1259   def DeclareLocks(self, level):
1260     self.nq.DeclareLocks(self, level)
1261
1262   def Exec(self, feedback_fn):
1263     return self.nq.OldStyleQuery(self)
1264
1265
1266 def _CheckOutputFields(static, dynamic, selected):
1267   """Checks whether all selected fields are valid.
1268
1269   @type static: L{utils.FieldSet}
1270   @param static: static fields set
1271   @type dynamic: L{utils.FieldSet}
1272   @param dynamic: dynamic fields set
1273
1274   """
1275   f = utils.FieldSet()
1276   f.Extend(static)
1277   f.Extend(dynamic)
1278
1279   delta = f.NonMatching(selected)
1280   if delta:
1281     raise errors.OpPrereqError("Unknown output fields selected: %s"
1282                                % ",".join(delta), errors.ECODE_INVAL)
1283
1284
1285 class LUNodeQueryvols(NoHooksLU):
1286   """Logical unit for getting volumes on node(s).
1287
1288   """
1289   REQ_BGL = False
1290   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1291   _FIELDS_STATIC = utils.FieldSet("node")
1292
1293   def CheckArguments(self):
1294     _CheckOutputFields(static=self._FIELDS_STATIC,
1295                        dynamic=self._FIELDS_DYNAMIC,
1296                        selected=self.op.output_fields)
1297
1298   def ExpandNames(self):
1299     self.share_locks = ShareAll()
1300
1301     if self.op.nodes:
1302       self.needed_locks = {
1303         locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1304         }
1305     else:
1306       self.needed_locks = {
1307         locking.LEVEL_NODE: locking.ALL_SET,
1308         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1309         }
1310
1311   def Exec(self, feedback_fn):
1312     """Computes the list of nodes and their attributes.
1313
1314     """
1315     node_uuids = self.owned_locks(locking.LEVEL_NODE)
1316     volumes = self.rpc.call_node_volumes(node_uuids)
1317
1318     ilist = self.cfg.GetAllInstancesInfo()
1319     vol2inst = MapInstanceLvsToNodes(ilist.values())
1320
1321     output = []
1322     for node_uuid in node_uuids:
1323       nresult = volumes[node_uuid]
1324       if nresult.offline:
1325         continue
1326       msg = nresult.fail_msg
1327       if msg:
1328         self.LogWarning("Can't compute volume data on node %s: %s",
1329                         self.cfg.GetNodeName(node_uuid), msg)
1330         continue
1331
1332       node_vols = sorted(nresult.payload,
1333                          key=operator.itemgetter("dev"))
1334
1335       for vol in node_vols:
1336         node_output = []
1337         for field in self.op.output_fields:
1338           if field == "node":
1339             val = self.cfg.GetNodeName(node_uuid)
1340           elif field == "phys":
1341             val = vol["dev"]
1342           elif field == "vg":
1343             val = vol["vg"]
1344           elif field == "name":
1345             val = vol["name"]
1346           elif field == "size":
1347             val = int(float(vol["size"]))
1348           elif field == "instance":
1349             inst = vol2inst.get((node_uuid, vol["vg"] + "/" + vol["name"]),
1350                                 None)
1351             if inst is not None:
1352               val = inst.name
1353             else:
1354               val = "-"
1355           else:
1356             raise errors.ParameterError(field)
1357           node_output.append(str(val))
1358
1359         output.append(node_output)
1360
1361     return output
1362
1363
1364 class LUNodeQueryStorage(NoHooksLU):
1365   """Logical unit for getting information on storage units on node(s).
1366
1367   """
1368   _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1369   REQ_BGL = False
1370
1371   def CheckArguments(self):
1372     _CheckOutputFields(static=self._FIELDS_STATIC,
1373                        dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1374                        selected=self.op.output_fields)
1375
1376   def ExpandNames(self):
1377     self.share_locks = ShareAll()
1378
1379     if self.op.nodes:
1380       self.needed_locks = {
1381         locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1382         }
1383     else:
1384       self.needed_locks = {
1385         locking.LEVEL_NODE: locking.ALL_SET,
1386         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1387         }
1388
1389   def Exec(self, feedback_fn):
1390     """Computes the list of nodes and their attributes.
1391
1392     """
1393     self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1394
1395     # Always get name to sort by
1396     if constants.SF_NAME in self.op.output_fields:
1397       fields = self.op.output_fields[:]
1398     else:
1399       fields = [constants.SF_NAME] + self.op.output_fields
1400
1401     # Never ask for node or type as it's only known to the LU
1402     for extra in [constants.SF_NODE, constants.SF_TYPE]:
1403       while extra in fields:
1404         fields.remove(extra)
1405
1406     field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1407     name_idx = field_idx[constants.SF_NAME]
1408
1409     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1410     data = self.rpc.call_storage_list(self.node_uuids,
1411                                       self.op.storage_type, st_args,
1412                                       self.op.name, fields)
1413
1414     result = []
1415
1416     for node_uuid in utils.NiceSort(self.node_uuids):
1417       node_name = self.cfg.GetNodeName(node_uuid)
1418       nresult = data[node_uuid]
1419       if nresult.offline:
1420         continue
1421
1422       msg = nresult.fail_msg
1423       if msg:
1424         self.LogWarning("Can't get storage data from node %s: %s",
1425                         node_name, msg)
1426         continue
1427
1428       rows = dict([(row[name_idx], row) for row in nresult.payload])
1429
1430       for name in utils.NiceSort(rows.keys()):
1431         row = rows[name]
1432
1433         out = []
1434
1435         for field in self.op.output_fields:
1436           if field == constants.SF_NODE:
1437             val = node_name
1438           elif field == constants.SF_TYPE:
1439             val = self.op.storage_type
1440           elif field in field_idx:
1441             val = row[field_idx[field]]
1442           else:
1443             raise errors.ParameterError(field)
1444
1445           out.append(val)
1446
1447         result.append(out)
1448
1449     return result
1450
1451
1452 class LUNodeRemove(LogicalUnit):
1453   """Logical unit for removing a node.
1454
1455   """
1456   HPATH = "node-remove"
1457   HTYPE = constants.HTYPE_NODE
1458
1459   def BuildHooksEnv(self):
1460     """Build hooks env.
1461
1462     """
1463     return {
1464       "OP_TARGET": self.op.node_name,
1465       "NODE_NAME": self.op.node_name,
1466       }
1467
1468   def BuildHooksNodes(self):
1469     """Build hooks nodes.
1470
1471     This doesn't run on the target node in the pre phase as a failed
1472     node would then be impossible to remove.
1473
1474     """
1475     all_nodes = self.cfg.GetNodeList()
1476     try:
1477       all_nodes.remove(self.op.node_uuid)
1478     except ValueError:
1479       pass
1480     return (all_nodes, all_nodes)
1481
1482   def CheckPrereq(self):
1483     """Check prerequisites.
1484
1485     This checks:
1486      - the node exists in the configuration
1487      - it does not have primary or secondary instances
1488      - it's not the master
1489
1490     Any errors are signaled by raising errors.OpPrereqError.
1491
1492     """
1493     (self.op.node_uuid, self.op.node_name) = \
1494       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1495     node = self.cfg.GetNodeInfo(self.op.node_uuid)
1496     assert node is not None
1497
1498     masternode = self.cfg.GetMasterNode()
1499     if node.uuid == masternode:
1500       raise errors.OpPrereqError("Node is the master node, failover to another"
1501                                  " node is required", errors.ECODE_INVAL)
1502
1503     for _, instance in self.cfg.GetAllInstancesInfo().items():
1504       if node.uuid in instance.all_nodes:
1505         raise errors.OpPrereqError("Instance %s is still running on the node,"
1506                                    " please remove first" % instance.name,
1507                                    errors.ECODE_INVAL)
1508     self.op.node_name = node.name
1509     self.node = node
1510
1511   def Exec(self, feedback_fn):
1512     """Removes the node from the cluster.
1513
1514     """
1515     logging.info("Stopping the node daemon and removing configs from node %s",
1516                  self.node.name)
1517
1518     modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1519
1520     assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1521       "Not owning BGL"
1522
1523     # Promote nodes to master candidate as needed
1524     AdjustCandidatePool(self, exceptions=[self.node.uuid])
1525     self.context.RemoveNode(self.node)
1526
1527     # Run post hooks on the node before it's removed
1528     RunPostHook(self, self.node.name)
1529
1530     # we have to call this by name rather than by UUID, as the node is no longer
1531     # in the config
1532     result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1533     msg = result.fail_msg
1534     if msg:
1535       self.LogWarning("Errors encountered on the remote node while leaving"
1536                       " the cluster: %s", msg)
1537
1538     # Remove node from our /etc/hosts
1539     if self.cfg.GetClusterInfo().modify_etc_hosts:
1540       master_node_uuid = self.cfg.GetMasterNode()
1541       result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1542                                               constants.ETC_HOSTS_REMOVE,
1543                                               self.node.name, None)
1544       result.Raise("Can't update hosts file with new host data")
1545       RedistributeAncillaryFiles(self)
1546
1547
1548 class LURepairNodeStorage(NoHooksLU):
1549   """Repairs the volume group on a node.
1550
1551   """
1552   REQ_BGL = False
1553
1554   def CheckArguments(self):
1555     (self.op.node_uuid, self.op.node_name) = \
1556       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1557
1558     storage_type = self.op.storage_type
1559
1560     if (constants.SO_FIX_CONSISTENCY not in
1561         constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1562       raise errors.OpPrereqError("Storage units of type '%s' can not be"
1563                                  " repaired" % storage_type,
1564                                  errors.ECODE_INVAL)
1565
1566   def ExpandNames(self):
1567     self.needed_locks = {
1568       locking.LEVEL_NODE: [self.op.node_uuid],
1569       }
1570
1571   def _CheckFaultyDisks(self, instance, node_uuid):
1572     """Ensure faulty disks abort the opcode or at least warn."""
1573     try:
1574       if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1575                                  node_uuid, True):
1576         raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1577                                    " node '%s'" %
1578                                    (instance.name,
1579                                     self.cfg.GetNodeName(node_uuid)),
1580                                    errors.ECODE_STATE)
1581     except errors.OpPrereqError, err:
1582       if self.op.ignore_consistency:
1583         self.LogWarning(str(err.args[0]))
1584       else:
1585         raise
1586
1587   def CheckPrereq(self):
1588     """Check prerequisites.
1589
1590     """
1591     # Check whether any instance on this node has faulty disks
1592     for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1593       if not inst.disks_active:
1594         continue
1595       check_nodes = set(inst.all_nodes)
1596       check_nodes.discard(self.op.node_uuid)
1597       for inst_node_uuid in check_nodes:
1598         self._CheckFaultyDisks(inst, inst_node_uuid)
1599
1600   def Exec(self, feedback_fn):
1601     feedback_fn("Repairing storage unit '%s' on %s ..." %
1602                 (self.op.name, self.op.node_name))
1603
1604     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1605     result = self.rpc.call_storage_execute(self.op.node_uuid,
1606                                            self.op.storage_type, st_args,
1607                                            self.op.name,
1608                                            constants.SO_FIX_CONSISTENCY)
1609     result.Raise("Failed to repair storage unit '%s' on %s" %
1610                  (self.op.name, self.op.node_name))