Index nodes by their UUID
[ganeti-local] / lib / cmdlib / node.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with nodes."""
23
24 import logging
25 import operator
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti import netutils
31 from ganeti import objects
32 from ganeti import opcodes
33 from ganeti import qlang
34 from ganeti import query
35 from ganeti import rpc
36 from ganeti import utils
37 from ganeti.masterd import iallocator
38
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40   ResultWithJobs
41 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42   MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43   IsExclusiveStorageEnabledNode, CheckNodePVs, \
44   RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
45   CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46   AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47   GetWantedNodes, MapInstanceDisksToNodes, RunPostHook, \
48   FindFaultyInstanceDisks
49
50
51 def _DecideSelfPromotion(lu, exceptions=None):
52   """Decide whether I should promote myself as a master candidate.
53
54   """
55   cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56   mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57   # the new node will increase mc_max with one, so:
58   mc_should = min(mc_should + 1, cp_size)
59   return mc_now < mc_should
60
61
62 def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63   """Ensure that a node has the given secondary ip.
64
65   @type lu: L{LogicalUnit}
66   @param lu: the LU on behalf of which we make the check
67   @type node: L{objects.Node}
68   @param node: the node to check
69   @type secondary_ip: string
70   @param secondary_ip: the ip to check
71   @type prereq: boolean
72   @param prereq: whether to throw a prerequisite or an execute error
73   @raise errors.OpPrereqError: if the node doesn't have the ip,
74   and prereq=True
75   @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
76
77   """
78   # this can be called with a new node, which has no UUID yet, so perform the
79   # RPC call using its name
80   result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
81   result.Raise("Failure checking secondary ip on node %s" % node.name,
82                prereq=prereq, ecode=errors.ECODE_ENVIRON)
83   if not result.payload:
84     msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
85            " please fix and re-run this command" % secondary_ip)
86     if prereq:
87       raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
88     else:
89       raise errors.OpExecError(msg)
90
91
92 class LUNodeAdd(LogicalUnit):
93   """Logical unit for adding node to the cluster.
94
95   """
96   HPATH = "node-add"
97   HTYPE = constants.HTYPE_NODE
98   _NFLAGS = ["master_capable", "vm_capable"]
99
100   def CheckArguments(self):
101     self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
102     # validate/normalize the node name
103     self.hostname = netutils.GetHostname(name=self.op.node_name,
104                                          family=self.primary_ip_family)
105     self.op.node_name = self.hostname.name
106
107     if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
108       raise errors.OpPrereqError("Cannot readd the master node",
109                                  errors.ECODE_STATE)
110
111     if self.op.readd and self.op.group:
112       raise errors.OpPrereqError("Cannot pass a node group when a node is"
113                                  " being readded", errors.ECODE_INVAL)
114
115   def BuildHooksEnv(self):
116     """Build hooks env.
117
118     This will run on all nodes before, and on all nodes + the new node after.
119
120     """
121     return {
122       "OP_TARGET": self.op.node_name,
123       "NODE_NAME": self.op.node_name,
124       "NODE_PIP": self.op.primary_ip,
125       "NODE_SIP": self.op.secondary_ip,
126       "MASTER_CAPABLE": str(self.op.master_capable),
127       "VM_CAPABLE": str(self.op.vm_capable),
128       }
129
130   def BuildHooksNodes(self):
131     """Build hooks nodes.
132
133     """
134     hook_nodes = self.cfg.GetNodeList()
135     new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
136     if new_node_info is not None:
137       # Exclude added node
138       hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
139
140     # add the new node as post hook node by name; it does not have an UUID yet
141     return (hook_nodes, hook_nodes, [self.op.node_name, ])
142
143   def CheckPrereq(self):
144     """Check prerequisites.
145
146     This checks:
147      - the new node is not already in the config
148      - it is resolvable
149      - its parameters (single/dual homed) matches the cluster
150
151     Any errors are signaled by raising errors.OpPrereqError.
152
153     """
154     cfg = self.cfg
155     hostname = self.hostname
156     node_name = hostname.name
157     primary_ip = self.op.primary_ip = hostname.ip
158     if self.op.secondary_ip is None:
159       if self.primary_ip_family == netutils.IP6Address.family:
160         raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
161                                    " IPv4 address must be given as secondary",
162                                    errors.ECODE_INVAL)
163       self.op.secondary_ip = primary_ip
164
165     secondary_ip = self.op.secondary_ip
166     if not netutils.IP4Address.IsValid(secondary_ip):
167       raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
168                                  " address" % secondary_ip, errors.ECODE_INVAL)
169
170     existing_node_info = cfg.GetNodeInfoByName(node_name)
171     if not self.op.readd and existing_node_info is not None:
172       raise errors.OpPrereqError("Node %s is already in the configuration" %
173                                  node_name, errors.ECODE_EXISTS)
174     elif self.op.readd and existing_node_info is None:
175       raise errors.OpPrereqError("Node %s is not in the configuration" %
176                                  node_name, errors.ECODE_NOENT)
177
178     self.changed_primary_ip = False
179
180     for existing_node in cfg.GetAllNodesInfo().values():
181       if self.op.readd and node_name == existing_node.name:
182         if existing_node.secondary_ip != secondary_ip:
183           raise errors.OpPrereqError("Readded node doesn't have the same IP"
184                                      " address configuration as before",
185                                      errors.ECODE_INVAL)
186         if existing_node.primary_ip != primary_ip:
187           self.changed_primary_ip = True
188
189         continue
190
191       if (existing_node.primary_ip == primary_ip or
192           existing_node.secondary_ip == primary_ip or
193           existing_node.primary_ip == secondary_ip or
194           existing_node.secondary_ip == secondary_ip):
195         raise errors.OpPrereqError("New node ip address(es) conflict with"
196                                    " existing node %s" % existing_node.name,
197                                    errors.ECODE_NOTUNIQUE)
198
199     # After this 'if' block, None is no longer a valid value for the
200     # _capable op attributes
201     if self.op.readd:
202       assert existing_node_info is not None, \
203         "Can't retrieve locked node %s" % node_name
204       for attr in self._NFLAGS:
205         if getattr(self.op, attr) is None:
206           setattr(self.op, attr, getattr(existing_node_info, attr))
207     else:
208       for attr in self._NFLAGS:
209         if getattr(self.op, attr) is None:
210           setattr(self.op, attr, True)
211
212     if self.op.readd and not self.op.vm_capable:
213       pri, sec = cfg.GetNodeInstances(existing_node_info.uuid)
214       if pri or sec:
215         raise errors.OpPrereqError("Node %s being re-added with vm_capable"
216                                    " flag set to false, but it already holds"
217                                    " instances" % node_name,
218                                    errors.ECODE_STATE)
219
220     # check that the type of the node (single versus dual homed) is the
221     # same as for the master
222     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
223     master_singlehomed = myself.secondary_ip == myself.primary_ip
224     newbie_singlehomed = secondary_ip == primary_ip
225     if master_singlehomed != newbie_singlehomed:
226       if master_singlehomed:
227         raise errors.OpPrereqError("The master has no secondary ip but the"
228                                    " new node has one",
229                                    errors.ECODE_INVAL)
230       else:
231         raise errors.OpPrereqError("The master has a secondary ip but the"
232                                    " new node doesn't have one",
233                                    errors.ECODE_INVAL)
234
235     # checks reachability
236     if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
237       raise errors.OpPrereqError("Node not reachable by ping",
238                                  errors.ECODE_ENVIRON)
239
240     if not newbie_singlehomed:
241       # check reachability from my secondary ip to newbie's secondary ip
242       if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
243                               source=myself.secondary_ip):
244         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
245                                    " based ping to node daemon port",
246                                    errors.ECODE_ENVIRON)
247
248     if self.op.readd:
249       exceptions = [existing_node_info.uuid]
250     else:
251       exceptions = []
252
253     if self.op.master_capable:
254       self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
255     else:
256       self.master_candidate = False
257
258     if self.op.readd:
259       self.new_node = existing_node_info
260     else:
261       node_group = cfg.LookupNodeGroup(self.op.group)
262       self.new_node = objects.Node(name=node_name,
263                                    primary_ip=primary_ip,
264                                    secondary_ip=secondary_ip,
265                                    master_candidate=self.master_candidate,
266                                    offline=False, drained=False,
267                                    group=node_group, ndparams={})
268
269     if self.op.ndparams:
270       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
271       CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
272                            "node", "cluster or group")
273
274     if self.op.hv_state:
275       self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
276
277     if self.op.disk_state:
278       self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
279
280     # TODO: If we need to have multiple DnsOnlyRunner we probably should make
281     #       it a property on the base class.
282     rpcrunner = rpc.DnsOnlyRunner()
283     result = rpcrunner.call_version([node_name])[node_name]
284     result.Raise("Can't get version information from node %s" % node_name)
285     if constants.PROTOCOL_VERSION == result.payload:
286       logging.info("Communication to node %s fine, sw version %s match",
287                    node_name, result.payload)
288     else:
289       raise errors.OpPrereqError("Version mismatch master version %s,"
290                                  " node version %s" %
291                                  (constants.PROTOCOL_VERSION, result.payload),
292                                  errors.ECODE_ENVIRON)
293
294     vg_name = cfg.GetVGName()
295     if vg_name is not None:
296       vparams = {constants.NV_PVLIST: [vg_name]}
297       excl_stor = IsExclusiveStorageEnabledNode(cfg, self.new_node)
298       cname = self.cfg.GetClusterName()
299       result = rpcrunner.call_node_verify_light(
300           [node_name], vparams, cname, cfg.GetClusterInfo().hvparams)[node_name]
301       (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
302       if errmsgs:
303         raise errors.OpPrereqError("Checks on node PVs failed: %s" %
304                                    "; ".join(errmsgs), errors.ECODE_ENVIRON)
305
306   def Exec(self, feedback_fn):
307     """Adds the new node to the cluster.
308
309     """
310     new_node = self.new_node
311     node_name = new_node.name
312
313     assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
314       "Not owning BGL"
315
316     # We adding a new node so we assume it's powered
317     new_node.powered = True
318
319     # for re-adds, reset the offline/drained/master-candidate flags;
320     # we need to reset here, otherwise offline would prevent RPC calls
321     # later in the procedure; this also means that if the re-add
322     # fails, we are left with a non-offlined, broken node
323     if self.op.readd:
324       new_node.drained = new_node.offline = False # pylint: disable=W0201
325       self.LogInfo("Readding a node, the offline/drained flags were reset")
326       # if we demote the node, we do cleanup later in the procedure
327       new_node.master_candidate = self.master_candidate
328       if self.changed_primary_ip:
329         new_node.primary_ip = self.op.primary_ip
330
331     # copy the master/vm_capable flags
332     for attr in self._NFLAGS:
333       setattr(new_node, attr, getattr(self.op, attr))
334
335     # notify the user about any possible mc promotion
336     if new_node.master_candidate:
337       self.LogInfo("Node will be a master candidate")
338
339     if self.op.ndparams:
340       new_node.ndparams = self.op.ndparams
341     else:
342       new_node.ndparams = {}
343
344     if self.op.hv_state:
345       new_node.hv_state_static = self.new_hv_state
346
347     if self.op.disk_state:
348       new_node.disk_state_static = self.new_disk_state
349
350     # Add node to our /etc/hosts, and add key to known_hosts
351     if self.cfg.GetClusterInfo().modify_etc_hosts:
352       master_node = self.cfg.GetMasterNode()
353       result = self.rpc.call_etc_hosts_modify(
354                  master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
355                  self.hostname.ip)
356       result.Raise("Can't update hosts file with new host data")
357
358     if new_node.secondary_ip != new_node.primary_ip:
359       _CheckNodeHasSecondaryIP(self, new_node, new_node.secondary_ip, False)
360
361     node_verifier_uuids = [self.cfg.GetMasterNode()]
362     node_verify_param = {
363       constants.NV_NODELIST: ([node_name], {}),
364       # TODO: do a node-net-test as well?
365     }
366
367     result = self.rpc.call_node_verify(
368                node_verifier_uuids, node_verify_param,
369                self.cfg.GetClusterName(),
370                self.cfg.GetClusterInfo().hvparams)
371     for verifier in node_verifier_uuids:
372       result[verifier].Raise("Cannot communicate with node %s" % verifier)
373       nl_payload = result[verifier].payload[constants.NV_NODELIST]
374       if nl_payload:
375         for failed in nl_payload:
376           feedback_fn("ssh/hostname verification failed"
377                       " (checking from %s): %s" %
378                       (verifier, nl_payload[failed]))
379         raise errors.OpExecError("ssh/hostname verification failed")
380
381     if self.op.readd:
382       self.context.ReaddNode(new_node)
383       RedistributeAncillaryFiles(self)
384       # make sure we redistribute the config
385       self.cfg.Update(new_node, feedback_fn)
386       # and make sure the new node will not have old files around
387       if not new_node.master_candidate:
388         result = self.rpc.call_node_demote_from_mc(new_node.uuid)
389         result.Warn("Node failed to demote itself from master candidate status",
390                     self.LogWarning)
391     else:
392       self.context.AddNode(new_node, self.proc.GetECId())
393       RedistributeAncillaryFiles(self)
394
395
396 class LUNodeSetParams(LogicalUnit):
397   """Modifies the parameters of a node.
398
399   @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
400       to the node role (as _ROLE_*)
401   @cvar _R2F: a dictionary from node role to tuples of flags
402   @cvar _FLAGS: a list of attribute names corresponding to the flags
403
404   """
405   HPATH = "node-modify"
406   HTYPE = constants.HTYPE_NODE
407   REQ_BGL = False
408   (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
409   _F2R = {
410     (True, False, False): _ROLE_CANDIDATE,
411     (False, True, False): _ROLE_DRAINED,
412     (False, False, True): _ROLE_OFFLINE,
413     (False, False, False): _ROLE_REGULAR,
414     }
415   _R2F = dict((v, k) for k, v in _F2R.items())
416   _FLAGS = ["master_candidate", "drained", "offline"]
417
418   def CheckArguments(self):
419     (self.op.node_uuid, self.op.node_name) = \
420       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
421     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
422                 self.op.master_capable, self.op.vm_capable,
423                 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
424                 self.op.disk_state]
425     if all_mods.count(None) == len(all_mods):
426       raise errors.OpPrereqError("Please pass at least one modification",
427                                  errors.ECODE_INVAL)
428     if all_mods.count(True) > 1:
429       raise errors.OpPrereqError("Can't set the node into more than one"
430                                  " state at the same time",
431                                  errors.ECODE_INVAL)
432
433     # Boolean value that tells us whether we might be demoting from MC
434     self.might_demote = (self.op.master_candidate is False or
435                          self.op.offline is True or
436                          self.op.drained is True or
437                          self.op.master_capable is False)
438
439     if self.op.secondary_ip:
440       if not netutils.IP4Address.IsValid(self.op.secondary_ip):
441         raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
442                                    " address" % self.op.secondary_ip,
443                                    errors.ECODE_INVAL)
444
445     self.lock_all = self.op.auto_promote and self.might_demote
446     self.lock_instances = self.op.secondary_ip is not None
447
448   def _InstanceFilter(self, instance):
449     """Filter for getting affected instances.
450
451     """
452     return (instance.disk_template in constants.DTS_INT_MIRROR and
453             self.op.node_uuid in instance.all_nodes)
454
455   def ExpandNames(self):
456     if self.lock_all:
457       self.needed_locks = {
458         locking.LEVEL_NODE: locking.ALL_SET,
459
460         # Block allocations when all nodes are locked
461         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
462         }
463     else:
464       self.needed_locks = {
465         locking.LEVEL_NODE: self.op.node_uuid,
466         }
467
468     # Since modifying a node can have severe effects on currently running
469     # operations the resource lock is at least acquired in shared mode
470     self.needed_locks[locking.LEVEL_NODE_RES] = \
471       self.needed_locks[locking.LEVEL_NODE]
472
473     # Get all locks except nodes in shared mode; they are not used for anything
474     # but read-only access
475     self.share_locks = ShareAll()
476     self.share_locks[locking.LEVEL_NODE] = 0
477     self.share_locks[locking.LEVEL_NODE_RES] = 0
478     self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
479
480     if self.lock_instances:
481       self.needed_locks[locking.LEVEL_INSTANCE] = \
482         frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
483
484   def BuildHooksEnv(self):
485     """Build hooks env.
486
487     This runs on the master node.
488
489     """
490     return {
491       "OP_TARGET": self.op.node_name,
492       "MASTER_CANDIDATE": str(self.op.master_candidate),
493       "OFFLINE": str(self.op.offline),
494       "DRAINED": str(self.op.drained),
495       "MASTER_CAPABLE": str(self.op.master_capable),
496       "VM_CAPABLE": str(self.op.vm_capable),
497       }
498
499   def BuildHooksNodes(self):
500     """Build hooks nodes.
501
502     """
503     nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
504     return (nl, nl)
505
506   def CheckPrereq(self):
507     """Check prerequisites.
508
509     This only checks the instance list against the existing names.
510
511     """
512     node = self.cfg.GetNodeInfo(self.op.node_uuid)
513     if self.lock_instances:
514       affected_instances = \
515         self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
516
517       # Verify instance locks
518       owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
519       wanted_instances = frozenset(affected_instances.keys())
520       if wanted_instances - owned_instances:
521         raise errors.OpPrereqError("Instances affected by changing node %s's"
522                                    " secondary IP address have changed since"
523                                    " locks were acquired, wanted '%s', have"
524                                    " '%s'; retry the operation" %
525                                    (node.name,
526                                     utils.CommaJoin(wanted_instances),
527                                     utils.CommaJoin(owned_instances)),
528                                    errors.ECODE_STATE)
529     else:
530       affected_instances = None
531
532     if (self.op.master_candidate is not None or
533         self.op.drained is not None or
534         self.op.offline is not None):
535       # we can't change the master's node flags
536       if node.uuid == self.cfg.GetMasterNode():
537         raise errors.OpPrereqError("The master role can be changed"
538                                    " only via master-failover",
539                                    errors.ECODE_INVAL)
540
541     if self.op.master_candidate and not node.master_capable:
542       raise errors.OpPrereqError("Node %s is not master capable, cannot make"
543                                  " it a master candidate" % node.name,
544                                  errors.ECODE_STATE)
545
546     if self.op.vm_capable is False:
547       (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
548       if ipri or isec:
549         raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
550                                    " the vm_capable flag" % node.name,
551                                    errors.ECODE_STATE)
552
553     if node.master_candidate and self.might_demote and not self.lock_all:
554       assert not self.op.auto_promote, "auto_promote set but lock_all not"
555       # check if after removing the current node, we're missing master
556       # candidates
557       (mc_remaining, mc_should, _) = \
558           self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
559       if mc_remaining < mc_should:
560         raise errors.OpPrereqError("Not enough master candidates, please"
561                                    " pass auto promote option to allow"
562                                    " promotion (--auto-promote or RAPI"
563                                    " auto_promote=True)", errors.ECODE_STATE)
564
565     self.old_flags = old_flags = (node.master_candidate,
566                                   node.drained, node.offline)
567     assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
568     self.old_role = old_role = self._F2R[old_flags]
569
570     # Check for ineffective changes
571     for attr in self._FLAGS:
572       if getattr(self.op, attr) is False and getattr(node, attr) is False:
573         self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
574         setattr(self.op, attr, None)
575
576     # Past this point, any flag change to False means a transition
577     # away from the respective state, as only real changes are kept
578
579     # TODO: We might query the real power state if it supports OOB
580     if SupportsOob(self.cfg, node):
581       if self.op.offline is False and not (node.powered or
582                                            self.op.powered is True):
583         raise errors.OpPrereqError(("Node %s needs to be turned on before its"
584                                     " offline status can be reset") %
585                                    self.op.node_name, errors.ECODE_STATE)
586     elif self.op.powered is not None:
587       raise errors.OpPrereqError(("Unable to change powered state for node %s"
588                                   " as it does not support out-of-band"
589                                   " handling") % self.op.node_name,
590                                  errors.ECODE_STATE)
591
592     # If we're being deofflined/drained, we'll MC ourself if needed
593     if (self.op.drained is False or self.op.offline is False or
594         (self.op.master_capable and not node.master_capable)):
595       if _DecideSelfPromotion(self):
596         self.op.master_candidate = True
597         self.LogInfo("Auto-promoting node to master candidate")
598
599     # If we're no longer master capable, we'll demote ourselves from MC
600     if self.op.master_capable is False and node.master_candidate:
601       self.LogInfo("Demoting from master candidate")
602       self.op.master_candidate = False
603
604     # Compute new role
605     assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
606     if self.op.master_candidate:
607       new_role = self._ROLE_CANDIDATE
608     elif self.op.drained:
609       new_role = self._ROLE_DRAINED
610     elif self.op.offline:
611       new_role = self._ROLE_OFFLINE
612     elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
613       # False is still in new flags, which means we're un-setting (the
614       # only) True flag
615       new_role = self._ROLE_REGULAR
616     else: # no new flags, nothing, keep old role
617       new_role = old_role
618
619     self.new_role = new_role
620
621     if old_role == self._ROLE_OFFLINE and new_role != old_role:
622       # Trying to transition out of offline status
623       result = self.rpc.call_version([node.uuid])[node.uuid]
624       if result.fail_msg:
625         raise errors.OpPrereqError("Node %s is being de-offlined but fails"
626                                    " to report its version: %s" %
627                                    (node.name, result.fail_msg),
628                                    errors.ECODE_STATE)
629       else:
630         self.LogWarning("Transitioning node from offline to online state"
631                         " without using re-add. Please make sure the node"
632                         " is healthy!")
633
634     # When changing the secondary ip, verify if this is a single-homed to
635     # multi-homed transition or vice versa, and apply the relevant
636     # restrictions.
637     if self.op.secondary_ip:
638       # Ok even without locking, because this can't be changed by any LU
639       master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
640       master_singlehomed = master.secondary_ip == master.primary_ip
641       if master_singlehomed and self.op.secondary_ip != node.primary_ip:
642         if self.op.force and node.uuid == master.uuid:
643           self.LogWarning("Transitioning from single-homed to multi-homed"
644                           " cluster; all nodes will require a secondary IP"
645                           " address")
646         else:
647           raise errors.OpPrereqError("Changing the secondary ip on a"
648                                      " single-homed cluster requires the"
649                                      " --force option to be passed, and the"
650                                      " target node to be the master",
651                                      errors.ECODE_INVAL)
652       elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
653         if self.op.force and node.uuid == master.uuid:
654           self.LogWarning("Transitioning from multi-homed to single-homed"
655                           " cluster; secondary IP addresses will have to be"
656                           " removed")
657         else:
658           raise errors.OpPrereqError("Cannot set the secondary IP to be the"
659                                      " same as the primary IP on a multi-homed"
660                                      " cluster, unless the --force option is"
661                                      " passed, and the target node is the"
662                                      " master", errors.ECODE_INVAL)
663
664       assert not (frozenset(affected_instances) -
665                   self.owned_locks(locking.LEVEL_INSTANCE))
666
667       if node.offline:
668         if affected_instances:
669           msg = ("Cannot change secondary IP address: offline node has"
670                  " instances (%s) configured to use it" %
671                  utils.CommaJoin(affected_instances.keys()))
672           raise errors.OpPrereqError(msg, errors.ECODE_STATE)
673       else:
674         # On online nodes, check that no instances are running, and that
675         # the node has the new ip and we can reach it.
676         for instance in affected_instances.values():
677           CheckInstanceState(self, instance, INSTANCE_DOWN,
678                              msg="cannot change secondary ip")
679
680         _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
681         if master.uuid != node.uuid:
682           # check reachability from master secondary ip to new secondary ip
683           if not netutils.TcpPing(self.op.secondary_ip,
684                                   constants.DEFAULT_NODED_PORT,
685                                   source=master.secondary_ip):
686             raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
687                                        " based ping to node daemon port",
688                                        errors.ECODE_ENVIRON)
689
690     if self.op.ndparams:
691       new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
692       utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
693       CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
694                            "node", "cluster or group")
695       self.new_ndparams = new_ndparams
696
697     if self.op.hv_state:
698       self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
699                                                 node.hv_state_static)
700
701     if self.op.disk_state:
702       self.new_disk_state = \
703         MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
704
705   def Exec(self, feedback_fn):
706     """Modifies a node.
707
708     """
709     node = self.cfg.GetNodeInfo(self.op.node_uuid)
710     old_role = self.old_role
711     new_role = self.new_role
712
713     result = []
714
715     if self.op.ndparams:
716       node.ndparams = self.new_ndparams
717
718     if self.op.powered is not None:
719       node.powered = self.op.powered
720
721     if self.op.hv_state:
722       node.hv_state_static = self.new_hv_state
723
724     if self.op.disk_state:
725       node.disk_state_static = self.new_disk_state
726
727     for attr in ["master_capable", "vm_capable"]:
728       val = getattr(self.op, attr)
729       if val is not None:
730         setattr(node, attr, val)
731         result.append((attr, str(val)))
732
733     if new_role != old_role:
734       # Tell the node to demote itself, if no longer MC and not offline
735       if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE:
736         msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
737         if msg:
738           self.LogWarning("Node failed to demote itself: %s", msg)
739
740       new_flags = self._R2F[new_role]
741       for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
742         if of != nf:
743           result.append((desc, str(nf)))
744       (node.master_candidate, node.drained, node.offline) = new_flags
745
746       # we locked all nodes, we adjust the CP before updating this node
747       if self.lock_all:
748         AdjustCandidatePool(self, [node.uuid])
749
750     if self.op.secondary_ip:
751       node.secondary_ip = self.op.secondary_ip
752       result.append(("secondary_ip", self.op.secondary_ip))
753
754     # this will trigger configuration file update, if needed
755     self.cfg.Update(node, feedback_fn)
756
757     # this will trigger job queue propagation or cleanup if the mc
758     # flag changed
759     if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1:
760       self.context.ReaddNode(node)
761
762     return result
763
764
765 class LUNodePowercycle(NoHooksLU):
766   """Powercycles a node.
767
768   """
769   REQ_BGL = False
770
771   def CheckArguments(self):
772     (self.op.node_uuid, self.op.node_name) = \
773       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
774
775     if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
776       raise errors.OpPrereqError("The node is the master and the force"
777                                  " parameter was not set",
778                                  errors.ECODE_INVAL)
779
780   def ExpandNames(self):
781     """Locking for PowercycleNode.
782
783     This is a last-resort option and shouldn't block on other
784     jobs. Therefore, we grab no locks.
785
786     """
787     self.needed_locks = {}
788
789   def Exec(self, feedback_fn):
790     """Reboots a node.
791
792     """
793     default_hypervisor = self.cfg.GetHypervisorType()
794     hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
795     result = self.rpc.call_node_powercycle(self.op.node_uuid,
796                                            default_hypervisor,
797                                            hvparams)
798     result.Raise("Failed to schedule the reboot")
799     return result.payload
800
801
802 def _GetNodeInstancesInner(cfg, fn):
803   return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
804
805
806 def _GetNodePrimaryInstances(cfg, node_uuid):
807   """Returns primary instances on a node.
808
809   """
810   return _GetNodeInstancesInner(cfg,
811                                 lambda inst: node_uuid == inst.primary_node)
812
813
814 def _GetNodeSecondaryInstances(cfg, node_uuid):
815   """Returns secondary instances on a node.
816
817   """
818   return _GetNodeInstancesInner(cfg,
819                                 lambda inst: node_uuid in inst.secondary_nodes)
820
821
822 def _GetNodeInstances(cfg, node_uuid):
823   """Returns a list of all primary and secondary instances on a node.
824
825   """
826
827   return _GetNodeInstancesInner(cfg, lambda inst: node_uuid in inst.all_nodes)
828
829
830 class LUNodeEvacuate(NoHooksLU):
831   """Evacuates instances off a list of nodes.
832
833   """
834   REQ_BGL = False
835
836   _MODE2IALLOCATOR = {
837     constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
838     constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
839     constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
840     }
841   assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
842   assert (frozenset(_MODE2IALLOCATOR.values()) ==
843           constants.IALLOCATOR_NEVAC_MODES)
844
845   def CheckArguments(self):
846     CheckIAllocatorOrNode(self, "iallocator", "remote_node")
847
848   def ExpandNames(self):
849     (self.op.node_uuid, self.op.node_name) = \
850       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
851
852     if self.op.remote_node is not None:
853       (self.op.remote_node_uuid, self.op.remote_node) = \
854         ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
855                               self.op.remote_node)
856       assert self.op.remote_node
857
858       if self.op.node_uuid == self.op.remote_node_uuid:
859         raise errors.OpPrereqError("Can not use evacuated node as a new"
860                                    " secondary node", errors.ECODE_INVAL)
861
862       if self.op.mode != constants.NODE_EVAC_SEC:
863         raise errors.OpPrereqError("Without the use of an iallocator only"
864                                    " secondary instances can be evacuated",
865                                    errors.ECODE_INVAL)
866
867     # Declare locks
868     self.share_locks = ShareAll()
869     self.needed_locks = {
870       locking.LEVEL_INSTANCE: [],
871       locking.LEVEL_NODEGROUP: [],
872       locking.LEVEL_NODE: [],
873       }
874
875     # Determine nodes (via group) optimistically, needs verification once locks
876     # have been acquired
877     self.lock_nodes = self._DetermineNodes()
878
879   def _DetermineNodes(self):
880     """Gets the list of node UUIDs to operate on.
881
882     """
883     if self.op.remote_node is None:
884       # Iallocator will choose any node(s) in the same group
885       group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
886     else:
887       group_nodes = frozenset([self.op.remote_node_uuid])
888
889     # Determine nodes to be locked
890     return set([self.op.node_uuid]) | group_nodes
891
892   def _DetermineInstances(self):
893     """Builds list of instances to operate on.
894
895     """
896     assert self.op.mode in constants.NODE_EVAC_MODES
897
898     if self.op.mode == constants.NODE_EVAC_PRI:
899       # Primary instances only
900       inst_fn = _GetNodePrimaryInstances
901       assert self.op.remote_node is None, \
902         "Evacuating primary instances requires iallocator"
903     elif self.op.mode == constants.NODE_EVAC_SEC:
904       # Secondary instances only
905       inst_fn = _GetNodeSecondaryInstances
906     else:
907       # All instances
908       assert self.op.mode == constants.NODE_EVAC_ALL
909       inst_fn = _GetNodeInstances
910       # TODO: In 2.6, change the iallocator interface to take an evacuation mode
911       # per instance
912       raise errors.OpPrereqError("Due to an issue with the iallocator"
913                                  " interface it is not possible to evacuate"
914                                  " all instances at once; specify explicitly"
915                                  " whether to evacuate primary or secondary"
916                                  " instances",
917                                  errors.ECODE_INVAL)
918
919     return inst_fn(self.cfg, self.op.node_uuid)
920
921   def DeclareLocks(self, level):
922     if level == locking.LEVEL_INSTANCE:
923       # Lock instances optimistically, needs verification once node and group
924       # locks have been acquired
925       self.needed_locks[locking.LEVEL_INSTANCE] = \
926         set(i.name for i in self._DetermineInstances())
927
928     elif level == locking.LEVEL_NODEGROUP:
929       # Lock node groups for all potential target nodes optimistically, needs
930       # verification once nodes have been acquired
931       self.needed_locks[locking.LEVEL_NODEGROUP] = \
932         self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
933
934     elif level == locking.LEVEL_NODE:
935       self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
936
937   def CheckPrereq(self):
938     # Verify locks
939     owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
940     owned_nodes = self.owned_locks(locking.LEVEL_NODE)
941     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
942
943     need_nodes = self._DetermineNodes()
944
945     if not owned_nodes.issuperset(need_nodes):
946       raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
947                                  " locks were acquired, current nodes are"
948                                  " are '%s', used to be '%s'; retry the"
949                                  " operation" %
950                                  (self.op.node_name,
951                                   utils.CommaJoin(need_nodes),
952                                   utils.CommaJoin(owned_nodes)),
953                                  errors.ECODE_STATE)
954
955     wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
956     if owned_groups != wanted_groups:
957       raise errors.OpExecError("Node groups changed since locks were acquired,"
958                                " current groups are '%s', used to be '%s';"
959                                " retry the operation" %
960                                (utils.CommaJoin(wanted_groups),
961                                 utils.CommaJoin(owned_groups)))
962
963     # Determine affected instances
964     self.instances = self._DetermineInstances()
965     self.instance_names = [i.name for i in self.instances]
966
967     if set(self.instance_names) != owned_instances:
968       raise errors.OpExecError("Instances on node '%s' changed since locks"
969                                " were acquired, current instances are '%s',"
970                                " used to be '%s'; retry the operation" %
971                                (self.op.node_name,
972                                 utils.CommaJoin(self.instance_names),
973                                 utils.CommaJoin(owned_instances)))
974
975     if self.instance_names:
976       self.LogInfo("Evacuating instances from node '%s': %s",
977                    self.op.node_name,
978                    utils.CommaJoin(utils.NiceSort(self.instance_names)))
979     else:
980       self.LogInfo("No instances to evacuate from node '%s'",
981                    self.op.node_name)
982
983     if self.op.remote_node is not None:
984       for i in self.instances:
985         if i.primary_node == self.op.remote_node_uuid:
986           raise errors.OpPrereqError("Node %s is the primary node of"
987                                      " instance %s, cannot use it as"
988                                      " secondary" %
989                                      (self.op.remote_node, i.name),
990                                      errors.ECODE_INVAL)
991
992   def Exec(self, feedback_fn):
993     assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
994
995     if not self.instance_names:
996       # No instances to evacuate
997       jobs = []
998
999     elif self.op.iallocator is not None:
1000       # TODO: Implement relocation to other group
1001       evac_mode = self._MODE2IALLOCATOR[self.op.mode]
1002       req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
1003                                      instances=list(self.instance_names))
1004       ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1005
1006       ial.Run(self.op.iallocator)
1007
1008       if not ial.success:
1009         raise errors.OpPrereqError("Can't compute node evacuation using"
1010                                    " iallocator '%s': %s" %
1011                                    (self.op.iallocator, ial.info),
1012                                    errors.ECODE_NORES)
1013
1014       jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1015
1016     elif self.op.remote_node is not None:
1017       assert self.op.mode == constants.NODE_EVAC_SEC
1018       jobs = [
1019         [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1020                                         remote_node=self.op.remote_node,
1021                                         disks=[],
1022                                         mode=constants.REPLACE_DISK_CHG,
1023                                         early_release=self.op.early_release)]
1024         for instance_name in self.instance_names]
1025
1026     else:
1027       raise errors.ProgrammerError("No iallocator or remote node")
1028
1029     return ResultWithJobs(jobs)
1030
1031
1032 class LUNodeMigrate(LogicalUnit):
1033   """Migrate all instances from a node.
1034
1035   """
1036   HPATH = "node-migrate"
1037   HTYPE = constants.HTYPE_NODE
1038   REQ_BGL = False
1039
1040   def CheckArguments(self):
1041     pass
1042
1043   def ExpandNames(self):
1044     (self.op.node_uuid, self.op.node_name) = \
1045       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1046
1047     self.share_locks = ShareAll()
1048     self.needed_locks = {
1049       locking.LEVEL_NODE: [self.op.node_uuid],
1050       }
1051
1052   def BuildHooksEnv(self):
1053     """Build hooks env.
1054
1055     This runs on the master, the primary and all the secondaries.
1056
1057     """
1058     return {
1059       "NODE_NAME": self.op.node_name,
1060       "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1061       }
1062
1063   def BuildHooksNodes(self):
1064     """Build hooks nodes.
1065
1066     """
1067     nl = [self.cfg.GetMasterNode()]
1068     return (nl, nl)
1069
1070   def CheckPrereq(self):
1071     pass
1072
1073   def Exec(self, feedback_fn):
1074     # Prepare jobs for migration instances
1075     allow_runtime_changes = self.op.allow_runtime_changes
1076     jobs = [
1077       [opcodes.OpInstanceMigrate(instance_name=inst.name,
1078                                  mode=self.op.mode,
1079                                  live=self.op.live,
1080                                  iallocator=self.op.iallocator,
1081                                  target_node=self.op.target_node,
1082                                  allow_runtime_changes=allow_runtime_changes,
1083                                  ignore_ipolicy=self.op.ignore_ipolicy)]
1084       for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1085
1086     # TODO: Run iallocator in this opcode and pass correct placement options to
1087     # OpInstanceMigrate. Since other jobs can modify the cluster between
1088     # running the iallocator and the actual migration, a good consistency model
1089     # will have to be found.
1090
1091     assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1092             frozenset([self.op.node_name]))
1093
1094     return ResultWithJobs(jobs)
1095
1096
1097 def _GetStorageTypeArgs(cfg, storage_type):
1098   """Returns the arguments for a storage type.
1099
1100   """
1101   # Special case for file storage
1102   if storage_type == constants.ST_FILE:
1103     # storage.FileStorage wants a list of storage directories
1104     return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1105
1106   return []
1107
1108
1109 class LUNodeModifyStorage(NoHooksLU):
1110   """Logical unit for modifying a storage volume on a node.
1111
1112   """
1113   REQ_BGL = False
1114
1115   def CheckArguments(self):
1116     (self.op.node_uuid, self.op.node_name) = \
1117       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1118
1119     storage_type = self.op.storage_type
1120
1121     try:
1122       modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1123     except KeyError:
1124       raise errors.OpPrereqError("Storage units of type '%s' can not be"
1125                                  " modified" % storage_type,
1126                                  errors.ECODE_INVAL)
1127
1128     diff = set(self.op.changes.keys()) - modifiable
1129     if diff:
1130       raise errors.OpPrereqError("The following fields can not be modified for"
1131                                  " storage units of type '%s': %r" %
1132                                  (storage_type, list(diff)),
1133                                  errors.ECODE_INVAL)
1134
1135   def ExpandNames(self):
1136     self.needed_locks = {
1137       locking.LEVEL_NODE: self.op.node_uuid,
1138       }
1139
1140   def Exec(self, feedback_fn):
1141     """Computes the list of nodes and their attributes.
1142
1143     """
1144     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1145     result = self.rpc.call_storage_modify(self.op.node_uuid,
1146                                           self.op.storage_type, st_args,
1147                                           self.op.name, self.op.changes)
1148     result.Raise("Failed to modify storage unit '%s' on %s" %
1149                  (self.op.name, self.op.node_name))
1150
1151
1152 class NodeQuery(QueryBase):
1153   FIELDS = query.NODE_FIELDS
1154
1155   def ExpandNames(self, lu):
1156     lu.needed_locks = {}
1157     lu.share_locks = ShareAll()
1158
1159     if self.names:
1160       (self.wanted, _) = GetWantedNodes(lu, self.names)
1161     else:
1162       self.wanted = locking.ALL_SET
1163
1164     self.do_locking = (self.use_locking and
1165                        query.NQ_LIVE in self.requested_data)
1166
1167     if self.do_locking:
1168       # If any non-static field is requested we need to lock the nodes
1169       lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1170       lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1171
1172   def DeclareLocks(self, lu, level):
1173     pass
1174
1175   def _GetQueryData(self, lu):
1176     """Computes the list of nodes and their attributes.
1177
1178     """
1179     all_info = lu.cfg.GetAllNodesInfo()
1180
1181     node_uuids = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1182
1183     # Gather data as requested
1184     if query.NQ_LIVE in self.requested_data:
1185       # filter out non-vm_capable nodes
1186       toquery_node_uuids = [node.uuid for node in all_info.values()
1187                             if node.vm_capable and node.uuid in node_uuids]
1188
1189       es_flags = rpc.GetExclusiveStorageForNodes(lu.cfg, toquery_node_uuids)
1190       # FIXME: This currently maps everything to lvm, this should be more
1191       # flexible
1192       vg_req = rpc.BuildVgInfoQuery(lu.cfg)
1193       default_hypervisor = lu.cfg.GetHypervisorType()
1194       hvparams = lu.cfg.GetClusterInfo().hvparams[default_hypervisor]
1195       hvspecs = [(default_hypervisor, hvparams)]
1196       node_data = lu.rpc.call_node_info(toquery_node_uuids, vg_req,
1197                                         hvspecs, es_flags)
1198       live_data = dict((uuid, rpc.MakeLegacyNodeInfo(nresult.payload))
1199                        for (uuid, nresult) in node_data.items()
1200                        if not nresult.fail_msg and nresult.payload)
1201     else:
1202       live_data = None
1203
1204     if query.NQ_INST in self.requested_data:
1205       node_to_primary = dict([(uuid, set()) for uuid in node_uuids])
1206       node_to_secondary = dict([(uuid, set()) for uuid in node_uuids])
1207
1208       inst_data = lu.cfg.GetAllInstancesInfo()
1209
1210       for inst in inst_data.values():
1211         if inst.primary_node in node_to_primary:
1212           node_to_primary[inst.primary_node].add(inst.name)
1213         for secnode in inst.secondary_nodes:
1214           if secnode in node_to_secondary:
1215             node_to_secondary[secnode].add(inst.name)
1216     else:
1217       node_to_primary = None
1218       node_to_secondary = None
1219
1220     if query.NQ_OOB in self.requested_data:
1221       oob_support = dict((uuid, bool(SupportsOob(lu.cfg, node)))
1222                          for uuid, node in all_info.iteritems())
1223     else:
1224       oob_support = None
1225
1226     if query.NQ_GROUP in self.requested_data:
1227       groups = lu.cfg.GetAllNodeGroupsInfo()
1228     else:
1229       groups = {}
1230
1231     return query.NodeQueryData([all_info[uuid] for uuid in node_uuids],
1232                                live_data, lu.cfg.GetMasterNode(),
1233                                node_to_primary, node_to_secondary, groups,
1234                                oob_support, lu.cfg.GetClusterInfo())
1235
1236
1237 class LUNodeQuery(NoHooksLU):
1238   """Logical unit for querying nodes.
1239
1240   """
1241   # pylint: disable=W0142
1242   REQ_BGL = False
1243
1244   def CheckArguments(self):
1245     self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1246                          self.op.output_fields, self.op.use_locking)
1247
1248   def ExpandNames(self):
1249     self.nq.ExpandNames(self)
1250
1251   def DeclareLocks(self, level):
1252     self.nq.DeclareLocks(self, level)
1253
1254   def Exec(self, feedback_fn):
1255     return self.nq.OldStyleQuery(self)
1256
1257
1258 def _CheckOutputFields(static, dynamic, selected):
1259   """Checks whether all selected fields are valid.
1260
1261   @type static: L{utils.FieldSet}
1262   @param static: static fields set
1263   @type dynamic: L{utils.FieldSet}
1264   @param dynamic: dynamic fields set
1265
1266   """
1267   f = utils.FieldSet()
1268   f.Extend(static)
1269   f.Extend(dynamic)
1270
1271   delta = f.NonMatching(selected)
1272   if delta:
1273     raise errors.OpPrereqError("Unknown output fields selected: %s"
1274                                % ",".join(delta), errors.ECODE_INVAL)
1275
1276
1277 class LUNodeQueryvols(NoHooksLU):
1278   """Logical unit for getting volumes on node(s).
1279
1280   """
1281   REQ_BGL = False
1282   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1283   _FIELDS_STATIC = utils.FieldSet("node")
1284
1285   def CheckArguments(self):
1286     _CheckOutputFields(static=self._FIELDS_STATIC,
1287                        dynamic=self._FIELDS_DYNAMIC,
1288                        selected=self.op.output_fields)
1289
1290   def ExpandNames(self):
1291     self.share_locks = ShareAll()
1292
1293     if self.op.nodes:
1294       self.needed_locks = {
1295         locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1296         }
1297     else:
1298       self.needed_locks = {
1299         locking.LEVEL_NODE: locking.ALL_SET,
1300         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1301         }
1302
1303   def Exec(self, feedback_fn):
1304     """Computes the list of nodes and their attributes.
1305
1306     """
1307     node_uuids = self.owned_locks(locking.LEVEL_NODE)
1308     volumes = self.rpc.call_node_volumes(node_uuids)
1309
1310     ilist = self.cfg.GetAllInstancesInfo()
1311     vol2inst = MapInstanceDisksToNodes(ilist.values())
1312
1313     output = []
1314     for node_uuid in node_uuids:
1315       nresult = volumes[node_uuid]
1316       if nresult.offline:
1317         continue
1318       msg = nresult.fail_msg
1319       if msg:
1320         self.LogWarning("Can't compute volume data on node %s: %s",
1321                         self.cfg.GetNodeName(node_uuid), msg)
1322         continue
1323
1324       node_vols = sorted(nresult.payload,
1325                          key=operator.itemgetter("dev"))
1326
1327       for vol in node_vols:
1328         node_output = []
1329         for field in self.op.output_fields:
1330           if field == "node":
1331             val = self.cfg.GetNodeName(node_uuid)
1332           elif field == "phys":
1333             val = vol["dev"]
1334           elif field == "vg":
1335             val = vol["vg"]
1336           elif field == "name":
1337             val = vol["name"]
1338           elif field == "size":
1339             val = int(float(vol["size"]))
1340           elif field == "instance":
1341             val = vol2inst.get((node_uuid, vol["vg"] + "/" + vol["name"]), "-")
1342           else:
1343             raise errors.ParameterError(field)
1344           node_output.append(str(val))
1345
1346         output.append(node_output)
1347
1348     return output
1349
1350
1351 class LUNodeQueryStorage(NoHooksLU):
1352   """Logical unit for getting information on storage units on node(s).
1353
1354   """
1355   _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1356   REQ_BGL = False
1357
1358   def CheckArguments(self):
1359     _CheckOutputFields(static=self._FIELDS_STATIC,
1360                        dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1361                        selected=self.op.output_fields)
1362
1363   def ExpandNames(self):
1364     self.share_locks = ShareAll()
1365
1366     if self.op.nodes:
1367       self.needed_locks = {
1368         locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1369         }
1370     else:
1371       self.needed_locks = {
1372         locking.LEVEL_NODE: locking.ALL_SET,
1373         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1374         }
1375
1376   def Exec(self, feedback_fn):
1377     """Computes the list of nodes and their attributes.
1378
1379     """
1380     self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1381
1382     # Always get name to sort by
1383     if constants.SF_NAME in self.op.output_fields:
1384       fields = self.op.output_fields[:]
1385     else:
1386       fields = [constants.SF_NAME] + self.op.output_fields
1387
1388     # Never ask for node or type as it's only known to the LU
1389     for extra in [constants.SF_NODE, constants.SF_TYPE]:
1390       while extra in fields:
1391         fields.remove(extra)
1392
1393     field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1394     name_idx = field_idx[constants.SF_NAME]
1395
1396     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1397     data = self.rpc.call_storage_list(self.node_uuids,
1398                                       self.op.storage_type, st_args,
1399                                       self.op.name, fields)
1400
1401     result = []
1402
1403     for node_uuid in utils.NiceSort(self.node_uuids):
1404       node_name = self.cfg.GetNodeName(node_uuid)
1405       nresult = data[node_uuid]
1406       if nresult.offline:
1407         continue
1408
1409       msg = nresult.fail_msg
1410       if msg:
1411         self.LogWarning("Can't get storage data from node %s: %s",
1412                         node_name, msg)
1413         continue
1414
1415       rows = dict([(row[name_idx], row) for row in nresult.payload])
1416
1417       for name in utils.NiceSort(rows.keys()):
1418         row = rows[name]
1419
1420         out = []
1421
1422         for field in self.op.output_fields:
1423           if field == constants.SF_NODE:
1424             val = node_name
1425           elif field == constants.SF_TYPE:
1426             val = self.op.storage_type
1427           elif field in field_idx:
1428             val = row[field_idx[field]]
1429           else:
1430             raise errors.ParameterError(field)
1431
1432           out.append(val)
1433
1434         result.append(out)
1435
1436     return result
1437
1438
1439 class LUNodeRemove(LogicalUnit):
1440   """Logical unit for removing a node.
1441
1442   """
1443   HPATH = "node-remove"
1444   HTYPE = constants.HTYPE_NODE
1445
1446   def BuildHooksEnv(self):
1447     """Build hooks env.
1448
1449     """
1450     return {
1451       "OP_TARGET": self.op.node_name,
1452       "NODE_NAME": self.op.node_name,
1453       }
1454
1455   def BuildHooksNodes(self):
1456     """Build hooks nodes.
1457
1458     This doesn't run on the target node in the pre phase as a failed
1459     node would then be impossible to remove.
1460
1461     """
1462     all_nodes = self.cfg.GetNodeList()
1463     try:
1464       all_nodes.remove(self.op.node_uuid)
1465     except ValueError:
1466       pass
1467     return (all_nodes, all_nodes)
1468
1469   def CheckPrereq(self):
1470     """Check prerequisites.
1471
1472     This checks:
1473      - the node exists in the configuration
1474      - it does not have primary or secondary instances
1475      - it's not the master
1476
1477     Any errors are signaled by raising errors.OpPrereqError.
1478
1479     """
1480     (self.op.node_uuid, self.op.node_name) = \
1481       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1482     node = self.cfg.GetNodeInfo(self.op.node_uuid)
1483     assert node is not None
1484
1485     masternode = self.cfg.GetMasterNode()
1486     if node.uuid == masternode:
1487       raise errors.OpPrereqError("Node is the master node, failover to another"
1488                                  " node is required", errors.ECODE_INVAL)
1489
1490     for instance_name, instance in self.cfg.GetAllInstancesInfo().items():
1491       if node.uuid in instance.all_nodes:
1492         raise errors.OpPrereqError("Instance %s is still running on the node,"
1493                                    " please remove first" % instance_name,
1494                                    errors.ECODE_INVAL)
1495     self.op.node_name = node.name
1496     self.node = node
1497
1498   def Exec(self, feedback_fn):
1499     """Removes the node from the cluster.
1500
1501     """
1502     node = self.node
1503     logging.info("Stopping the node daemon and removing configs from node %s",
1504                  node.name)
1505
1506     modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1507
1508     assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1509       "Not owning BGL"
1510
1511     # Promote nodes to master candidate as needed
1512     AdjustCandidatePool(self, exceptions=[node.uuid])
1513     self.context.RemoveNode(node)
1514
1515     # Run post hooks on the node before it's removed
1516     RunPostHook(self, node.name)
1517
1518     # we have to call this by name rather than by UUID, as the node is no longer
1519     # in the config
1520     result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
1521     msg = result.fail_msg
1522     if msg:
1523       self.LogWarning("Errors encountered on the remote node while leaving"
1524                       " the cluster: %s", msg)
1525
1526     # Remove node from our /etc/hosts
1527     if self.cfg.GetClusterInfo().modify_etc_hosts:
1528       master_node_uuid = self.cfg.GetMasterNode()
1529       result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1530                                               constants.ETC_HOSTS_REMOVE,
1531                                               node.name, None)
1532       result.Raise("Can't update hosts file with new host data")
1533       RedistributeAncillaryFiles(self)
1534
1535
1536 class LURepairNodeStorage(NoHooksLU):
1537   """Repairs the volume group on a node.
1538
1539   """
1540   REQ_BGL = False
1541
1542   def CheckArguments(self):
1543     (self.op.node_uuid, self.op.node_name) = \
1544       ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1545
1546     storage_type = self.op.storage_type
1547
1548     if (constants.SO_FIX_CONSISTENCY not in
1549         constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1550       raise errors.OpPrereqError("Storage units of type '%s' can not be"
1551                                  " repaired" % storage_type,
1552                                  errors.ECODE_INVAL)
1553
1554   def ExpandNames(self):
1555     self.needed_locks = {
1556       locking.LEVEL_NODE: [self.op.node_uuid],
1557       }
1558
1559   def _CheckFaultyDisks(self, instance, node_uuid):
1560     """Ensure faulty disks abort the opcode or at least warn."""
1561     try:
1562       if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1563                                  node_uuid, True):
1564         raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1565                                    " node '%s'" %
1566                                    (instance.name,
1567                                     self.cfg.GetNodeName(node_uuid)),
1568                                    errors.ECODE_STATE)
1569     except errors.OpPrereqError, err:
1570       if self.op.ignore_consistency:
1571         self.LogWarning(str(err.args[0]))
1572       else:
1573         raise
1574
1575   def CheckPrereq(self):
1576     """Check prerequisites.
1577
1578     """
1579     # Check whether any instance on this node has faulty disks
1580     for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1581       if not inst.disks_active:
1582         continue
1583       check_nodes = set(inst.all_nodes)
1584       check_nodes.discard(self.op.node_uuid)
1585       for inst_node_uuid in check_nodes:
1586         self._CheckFaultyDisks(inst, inst_node_uuid)
1587
1588   def Exec(self, feedback_fn):
1589     feedback_fn("Repairing storage unit '%s' on %s ..." %
1590                 (self.op.name, self.op.node_name))
1591
1592     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1593     result = self.rpc.call_storage_execute(self.op.node_uuid,
1594                                            self.op.storage_type, st_args,
1595                                            self.op.name,
1596                                            constants.SO_FIX_CONSISTENCY)
1597     result.Raise("Failed to repair storage unit '%s' on %s" %
1598                  (self.op.name, self.op.node_name))