Verify: node info and instance list
[ganeti-local] / lib / cmdlib / node.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with nodes."""
23
24 import logging
25 import operator
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti import netutils
31 from ganeti import objects
32 from ganeti import opcodes
33 from ganeti import qlang
34 from ganeti import query
35 from ganeti import rpc
36 from ganeti import utils
37 from ganeti.masterd import iallocator
38
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40   ResultWithJobs
41 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42   MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43   IsExclusiveStorageEnabledNode, CheckNodePVs, \
44   RedistributeAncillaryFiles, ExpandNodeName, ShareAll, SupportsOob, \
45   CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46   AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47   GetWantedNodes, MapInstanceDisksToNodes, RunPostHook, \
48   FindFaultyInstanceDisks
49
50
51 def _DecideSelfPromotion(lu, exceptions=None):
52   """Decide whether I should promote myself as a master candidate.
53
54   """
55   cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56   mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57   # the new node will increase mc_max with one, so:
58   mc_should = min(mc_should + 1, cp_size)
59   return mc_now < mc_should
60
61
62 def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63   """Ensure that a node has the given secondary ip.
64
65   @type lu: L{LogicalUnit}
66   @param lu: the LU on behalf of which we make the check
67   @type node: string
68   @param node: the node to check
69   @type secondary_ip: string
70   @param secondary_ip: the ip to check
71   @type prereq: boolean
72   @param prereq: whether to throw a prerequisite or an execute error
73   @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
74   @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
75
76   """
77   result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
78   result.Raise("Failure checking secondary ip on node %s" % node,
79                prereq=prereq, ecode=errors.ECODE_ENVIRON)
80   if not result.payload:
81     msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
82            " please fix and re-run this command" % secondary_ip)
83     if prereq:
84       raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
85     else:
86       raise errors.OpExecError(msg)
87
88
89 class LUNodeAdd(LogicalUnit):
90   """Logical unit for adding node to the cluster.
91
92   """
93   HPATH = "node-add"
94   HTYPE = constants.HTYPE_NODE
95   _NFLAGS = ["master_capable", "vm_capable"]
96
97   def CheckArguments(self):
98     self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
99     # validate/normalize the node name
100     self.hostname = netutils.GetHostname(name=self.op.node_name,
101                                          family=self.primary_ip_family)
102     self.op.node_name = self.hostname.name
103
104     if self.op.readd and self.op.node_name == self.cfg.GetMasterNode():
105       raise errors.OpPrereqError("Cannot readd the master node",
106                                  errors.ECODE_STATE)
107
108     if self.op.readd and self.op.group:
109       raise errors.OpPrereqError("Cannot pass a node group when a node is"
110                                  " being readded", errors.ECODE_INVAL)
111
112   def BuildHooksEnv(self):
113     """Build hooks env.
114
115     This will run on all nodes before, and on all nodes + the new node after.
116
117     """
118     return {
119       "OP_TARGET": self.op.node_name,
120       "NODE_NAME": self.op.node_name,
121       "NODE_PIP": self.op.primary_ip,
122       "NODE_SIP": self.op.secondary_ip,
123       "MASTER_CAPABLE": str(self.op.master_capable),
124       "VM_CAPABLE": str(self.op.vm_capable),
125       }
126
127   def BuildHooksNodes(self):
128     """Build hooks nodes.
129
130     """
131     # Exclude added node
132     pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name]))
133     post_nodes = pre_nodes + [self.op.node_name, ]
134
135     return (pre_nodes, post_nodes)
136
137   def CheckPrereq(self):
138     """Check prerequisites.
139
140     This checks:
141      - the new node is not already in the config
142      - it is resolvable
143      - its parameters (single/dual homed) matches the cluster
144
145     Any errors are signaled by raising errors.OpPrereqError.
146
147     """
148     cfg = self.cfg
149     hostname = self.hostname
150     node = hostname.name
151     primary_ip = self.op.primary_ip = hostname.ip
152     if self.op.secondary_ip is None:
153       if self.primary_ip_family == netutils.IP6Address.family:
154         raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
155                                    " IPv4 address must be given as secondary",
156                                    errors.ECODE_INVAL)
157       self.op.secondary_ip = primary_ip
158
159     secondary_ip = self.op.secondary_ip
160     if not netutils.IP4Address.IsValid(secondary_ip):
161       raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
162                                  " address" % secondary_ip, errors.ECODE_INVAL)
163
164     node_list = cfg.GetNodeList()
165     if not self.op.readd and node in node_list:
166       raise errors.OpPrereqError("Node %s is already in the configuration" %
167                                  node, errors.ECODE_EXISTS)
168     elif self.op.readd and node not in node_list:
169       raise errors.OpPrereqError("Node %s is not in the configuration" % node,
170                                  errors.ECODE_NOENT)
171
172     self.changed_primary_ip = False
173
174     for existing_node_name, existing_node in cfg.GetMultiNodeInfo(node_list):
175       if self.op.readd and node == existing_node_name:
176         if existing_node.secondary_ip != secondary_ip:
177           raise errors.OpPrereqError("Readded node doesn't have the same IP"
178                                      " address configuration as before",
179                                      errors.ECODE_INVAL)
180         if existing_node.primary_ip != primary_ip:
181           self.changed_primary_ip = True
182
183         continue
184
185       if (existing_node.primary_ip == primary_ip or
186           existing_node.secondary_ip == primary_ip or
187           existing_node.primary_ip == secondary_ip or
188           existing_node.secondary_ip == secondary_ip):
189         raise errors.OpPrereqError("New node ip address(es) conflict with"
190                                    " existing node %s" % existing_node.name,
191                                    errors.ECODE_NOTUNIQUE)
192
193     # After this 'if' block, None is no longer a valid value for the
194     # _capable op attributes
195     if self.op.readd:
196       old_node = self.cfg.GetNodeInfo(node)
197       assert old_node is not None, "Can't retrieve locked node %s" % node
198       for attr in self._NFLAGS:
199         if getattr(self.op, attr) is None:
200           setattr(self.op, attr, getattr(old_node, attr))
201     else:
202       for attr in self._NFLAGS:
203         if getattr(self.op, attr) is None:
204           setattr(self.op, attr, True)
205
206     if self.op.readd and not self.op.vm_capable:
207       pri, sec = cfg.GetNodeInstances(node)
208       if pri or sec:
209         raise errors.OpPrereqError("Node %s being re-added with vm_capable"
210                                    " flag set to false, but it already holds"
211                                    " instances" % node,
212                                    errors.ECODE_STATE)
213
214     # check that the type of the node (single versus dual homed) is the
215     # same as for the master
216     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
217     master_singlehomed = myself.secondary_ip == myself.primary_ip
218     newbie_singlehomed = secondary_ip == primary_ip
219     if master_singlehomed != newbie_singlehomed:
220       if master_singlehomed:
221         raise errors.OpPrereqError("The master has no secondary ip but the"
222                                    " new node has one",
223                                    errors.ECODE_INVAL)
224       else:
225         raise errors.OpPrereqError("The master has a secondary ip but the"
226                                    " new node doesn't have one",
227                                    errors.ECODE_INVAL)
228
229     # checks reachability
230     if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
231       raise errors.OpPrereqError("Node not reachable by ping",
232                                  errors.ECODE_ENVIRON)
233
234     if not newbie_singlehomed:
235       # check reachability from my secondary ip to newbie's secondary ip
236       if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
237                               source=myself.secondary_ip):
238         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
239                                    " based ping to node daemon port",
240                                    errors.ECODE_ENVIRON)
241
242     if self.op.readd:
243       exceptions = [node]
244     else:
245       exceptions = []
246
247     if self.op.master_capable:
248       self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
249     else:
250       self.master_candidate = False
251
252     if self.op.readd:
253       self.new_node = old_node
254     else:
255       node_group = cfg.LookupNodeGroup(self.op.group)
256       self.new_node = objects.Node(name=node,
257                                    primary_ip=primary_ip,
258                                    secondary_ip=secondary_ip,
259                                    master_candidate=self.master_candidate,
260                                    offline=False, drained=False,
261                                    group=node_group, ndparams={})
262
263     if self.op.ndparams:
264       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
265       CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
266                            "node", "cluster or group")
267
268     if self.op.hv_state:
269       self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
270
271     if self.op.disk_state:
272       self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
273
274     # TODO: If we need to have multiple DnsOnlyRunner we probably should make
275     #       it a property on the base class.
276     rpcrunner = rpc.DnsOnlyRunner()
277     result = rpcrunner.call_version([node])[node]
278     result.Raise("Can't get version information from node %s" % node)
279     if constants.PROTOCOL_VERSION == result.payload:
280       logging.info("Communication to node %s fine, sw version %s match",
281                    node, result.payload)
282     else:
283       raise errors.OpPrereqError("Version mismatch master version %s,"
284                                  " node version %s" %
285                                  (constants.PROTOCOL_VERSION, result.payload),
286                                  errors.ECODE_ENVIRON)
287
288     vg_name = cfg.GetVGName()
289     if vg_name is not None:
290       vparams = {constants.NV_PVLIST: [vg_name]}
291       excl_stor = IsExclusiveStorageEnabledNode(cfg, self.new_node)
292       cname = self.cfg.GetClusterName()
293       result = rpcrunner.call_node_verify_light(
294           [node], vparams, cname, cfg.GetClusterInfo().hvparams)[node]
295       (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
296       if errmsgs:
297         raise errors.OpPrereqError("Checks on node PVs failed: %s" %
298                                    "; ".join(errmsgs), errors.ECODE_ENVIRON)
299
300   def Exec(self, feedback_fn):
301     """Adds the new node to the cluster.
302
303     """
304     new_node = self.new_node
305     node = new_node.name
306
307     assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
308       "Not owning BGL"
309
310     # We adding a new node so we assume it's powered
311     new_node.powered = True
312
313     # for re-adds, reset the offline/drained/master-candidate flags;
314     # we need to reset here, otherwise offline would prevent RPC calls
315     # later in the procedure; this also means that if the re-add
316     # fails, we are left with a non-offlined, broken node
317     if self.op.readd:
318       new_node.drained = new_node.offline = False # pylint: disable=W0201
319       self.LogInfo("Readding a node, the offline/drained flags were reset")
320       # if we demote the node, we do cleanup later in the procedure
321       new_node.master_candidate = self.master_candidate
322       if self.changed_primary_ip:
323         new_node.primary_ip = self.op.primary_ip
324
325     # copy the master/vm_capable flags
326     for attr in self._NFLAGS:
327       setattr(new_node, attr, getattr(self.op, attr))
328
329     # notify the user about any possible mc promotion
330     if new_node.master_candidate:
331       self.LogInfo("Node will be a master candidate")
332
333     if self.op.ndparams:
334       new_node.ndparams = self.op.ndparams
335     else:
336       new_node.ndparams = {}
337
338     if self.op.hv_state:
339       new_node.hv_state_static = self.new_hv_state
340
341     if self.op.disk_state:
342       new_node.disk_state_static = self.new_disk_state
343
344     # Add node to our /etc/hosts, and add key to known_hosts
345     if self.cfg.GetClusterInfo().modify_etc_hosts:
346       master_node = self.cfg.GetMasterNode()
347       result = self.rpc.call_etc_hosts_modify(master_node,
348                                               constants.ETC_HOSTS_ADD,
349                                               self.hostname.name,
350                                               self.hostname.ip)
351       result.Raise("Can't update hosts file with new host data")
352
353     if new_node.secondary_ip != new_node.primary_ip:
354       _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip,
355                                False)
356
357     node_verify_list = [self.cfg.GetMasterNode()]
358     node_verify_param = {
359       constants.NV_NODELIST: ([node], {}),
360       # TODO: do a node-net-test as well?
361     }
362
363     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
364                                        self.cfg.GetClusterName(),
365                                        self.cfg.GetClusterInfo().hvparams)
366     for verifier in node_verify_list:
367       result[verifier].Raise("Cannot communicate with node %s" % verifier)
368       nl_payload = result[verifier].payload[constants.NV_NODELIST]
369       if nl_payload:
370         for failed in nl_payload:
371           feedback_fn("ssh/hostname verification failed"
372                       " (checking from %s): %s" %
373                       (verifier, nl_payload[failed]))
374         raise errors.OpExecError("ssh/hostname verification failed")
375
376     if self.op.readd:
377       RedistributeAncillaryFiles(self)
378       self.context.ReaddNode(new_node)
379       # make sure we redistribute the config
380       self.cfg.Update(new_node, feedback_fn)
381       # and make sure the new node will not have old files around
382       if not new_node.master_candidate:
383         result = self.rpc.call_node_demote_from_mc(new_node.name)
384         result.Warn("Node failed to demote itself from master candidate status",
385                     self.LogWarning)
386     else:
387       RedistributeAncillaryFiles(self, additional_nodes=[node],
388                                  additional_vm=self.op.vm_capable)
389       self.context.AddNode(new_node, self.proc.GetECId())
390
391
392 class LUNodeSetParams(LogicalUnit):
393   """Modifies the parameters of a node.
394
395   @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
396       to the node role (as _ROLE_*)
397   @cvar _R2F: a dictionary from node role to tuples of flags
398   @cvar _FLAGS: a list of attribute names corresponding to the flags
399
400   """
401   HPATH = "node-modify"
402   HTYPE = constants.HTYPE_NODE
403   REQ_BGL = False
404   (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
405   _F2R = {
406     (True, False, False): _ROLE_CANDIDATE,
407     (False, True, False): _ROLE_DRAINED,
408     (False, False, True): _ROLE_OFFLINE,
409     (False, False, False): _ROLE_REGULAR,
410     }
411   _R2F = dict((v, k) for k, v in _F2R.items())
412   _FLAGS = ["master_candidate", "drained", "offline"]
413
414   def CheckArguments(self):
415     self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
416     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
417                 self.op.master_capable, self.op.vm_capable,
418                 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
419                 self.op.disk_state]
420     if all_mods.count(None) == len(all_mods):
421       raise errors.OpPrereqError("Please pass at least one modification",
422                                  errors.ECODE_INVAL)
423     if all_mods.count(True) > 1:
424       raise errors.OpPrereqError("Can't set the node into more than one"
425                                  " state at the same time",
426                                  errors.ECODE_INVAL)
427
428     # Boolean value that tells us whether we might be demoting from MC
429     self.might_demote = (self.op.master_candidate is False or
430                          self.op.offline is True or
431                          self.op.drained is True or
432                          self.op.master_capable is False)
433
434     if self.op.secondary_ip:
435       if not netutils.IP4Address.IsValid(self.op.secondary_ip):
436         raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
437                                    " address" % self.op.secondary_ip,
438                                    errors.ECODE_INVAL)
439
440     self.lock_all = self.op.auto_promote and self.might_demote
441     self.lock_instances = self.op.secondary_ip is not None
442
443   def _InstanceFilter(self, instance):
444     """Filter for getting affected instances.
445
446     """
447     return (instance.disk_template in constants.DTS_INT_MIRROR and
448             self.op.node_name in instance.all_nodes)
449
450   def ExpandNames(self):
451     if self.lock_all:
452       self.needed_locks = {
453         locking.LEVEL_NODE: locking.ALL_SET,
454
455         # Block allocations when all nodes are locked
456         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
457         }
458     else:
459       self.needed_locks = {
460         locking.LEVEL_NODE: self.op.node_name,
461         }
462
463     # Since modifying a node can have severe effects on currently running
464     # operations the resource lock is at least acquired in shared mode
465     self.needed_locks[locking.LEVEL_NODE_RES] = \
466       self.needed_locks[locking.LEVEL_NODE]
467
468     # Get all locks except nodes in shared mode; they are not used for anything
469     # but read-only access
470     self.share_locks = ShareAll()
471     self.share_locks[locking.LEVEL_NODE] = 0
472     self.share_locks[locking.LEVEL_NODE_RES] = 0
473     self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
474
475     if self.lock_instances:
476       self.needed_locks[locking.LEVEL_INSTANCE] = \
477         frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
478
479   def BuildHooksEnv(self):
480     """Build hooks env.
481
482     This runs on the master node.
483
484     """
485     return {
486       "OP_TARGET": self.op.node_name,
487       "MASTER_CANDIDATE": str(self.op.master_candidate),
488       "OFFLINE": str(self.op.offline),
489       "DRAINED": str(self.op.drained),
490       "MASTER_CAPABLE": str(self.op.master_capable),
491       "VM_CAPABLE": str(self.op.vm_capable),
492       }
493
494   def BuildHooksNodes(self):
495     """Build hooks nodes.
496
497     """
498     nl = [self.cfg.GetMasterNode(), self.op.node_name]
499     return (nl, nl)
500
501   def CheckPrereq(self):
502     """Check prerequisites.
503
504     This only checks the instance list against the existing names.
505
506     """
507     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
508
509     if self.lock_instances:
510       affected_instances = \
511         self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
512
513       # Verify instance locks
514       owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
515       wanted_instances = frozenset(affected_instances.keys())
516       if wanted_instances - owned_instances:
517         raise errors.OpPrereqError("Instances affected by changing node %s's"
518                                    " secondary IP address have changed since"
519                                    " locks were acquired, wanted '%s', have"
520                                    " '%s'; retry the operation" %
521                                    (self.op.node_name,
522                                     utils.CommaJoin(wanted_instances),
523                                     utils.CommaJoin(owned_instances)),
524                                    errors.ECODE_STATE)
525     else:
526       affected_instances = None
527
528     if (self.op.master_candidate is not None or
529         self.op.drained is not None or
530         self.op.offline is not None):
531       # we can't change the master's node flags
532       if self.op.node_name == self.cfg.GetMasterNode():
533         raise errors.OpPrereqError("The master role can be changed"
534                                    " only via master-failover",
535                                    errors.ECODE_INVAL)
536
537     if self.op.master_candidate and not node.master_capable:
538       raise errors.OpPrereqError("Node %s is not master capable, cannot make"
539                                  " it a master candidate" % node.name,
540                                  errors.ECODE_STATE)
541
542     if self.op.vm_capable is False:
543       (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name)
544       if ipri or isec:
545         raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
546                                    " the vm_capable flag" % node.name,
547                                    errors.ECODE_STATE)
548
549     if node.master_candidate and self.might_demote and not self.lock_all:
550       assert not self.op.auto_promote, "auto_promote set but lock_all not"
551       # check if after removing the current node, we're missing master
552       # candidates
553       (mc_remaining, mc_should, _) = \
554           self.cfg.GetMasterCandidateStats(exceptions=[node.name])
555       if mc_remaining < mc_should:
556         raise errors.OpPrereqError("Not enough master candidates, please"
557                                    " pass auto promote option to allow"
558                                    " promotion (--auto-promote or RAPI"
559                                    " auto_promote=True)", errors.ECODE_STATE)
560
561     self.old_flags = old_flags = (node.master_candidate,
562                                   node.drained, node.offline)
563     assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
564     self.old_role = old_role = self._F2R[old_flags]
565
566     # Check for ineffective changes
567     for attr in self._FLAGS:
568       if (getattr(self.op, attr) is False and getattr(node, attr) is False):
569         self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
570         setattr(self.op, attr, None)
571
572     # Past this point, any flag change to False means a transition
573     # away from the respective state, as only real changes are kept
574
575     # TODO: We might query the real power state if it supports OOB
576     if SupportsOob(self.cfg, node):
577       if self.op.offline is False and not (node.powered or
578                                            self.op.powered is True):
579         raise errors.OpPrereqError(("Node %s needs to be turned on before its"
580                                     " offline status can be reset") %
581                                    self.op.node_name, errors.ECODE_STATE)
582     elif self.op.powered is not None:
583       raise errors.OpPrereqError(("Unable to change powered state for node %s"
584                                   " as it does not support out-of-band"
585                                   " handling") % self.op.node_name,
586                                  errors.ECODE_STATE)
587
588     # If we're being deofflined/drained, we'll MC ourself if needed
589     if (self.op.drained is False or self.op.offline is False or
590         (self.op.master_capable and not node.master_capable)):
591       if _DecideSelfPromotion(self):
592         self.op.master_candidate = True
593         self.LogInfo("Auto-promoting node to master candidate")
594
595     # If we're no longer master capable, we'll demote ourselves from MC
596     if self.op.master_capable is False and node.master_candidate:
597       self.LogInfo("Demoting from master candidate")
598       self.op.master_candidate = False
599
600     # Compute new role
601     assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
602     if self.op.master_candidate:
603       new_role = self._ROLE_CANDIDATE
604     elif self.op.drained:
605       new_role = self._ROLE_DRAINED
606     elif self.op.offline:
607       new_role = self._ROLE_OFFLINE
608     elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
609       # False is still in new flags, which means we're un-setting (the
610       # only) True flag
611       new_role = self._ROLE_REGULAR
612     else: # no new flags, nothing, keep old role
613       new_role = old_role
614
615     self.new_role = new_role
616
617     if old_role == self._ROLE_OFFLINE and new_role != old_role:
618       # Trying to transition out of offline status
619       result = self.rpc.call_version([node.name])[node.name]
620       if result.fail_msg:
621         raise errors.OpPrereqError("Node %s is being de-offlined but fails"
622                                    " to report its version: %s" %
623                                    (node.name, result.fail_msg),
624                                    errors.ECODE_STATE)
625       else:
626         self.LogWarning("Transitioning node from offline to online state"
627                         " without using re-add. Please make sure the node"
628                         " is healthy!")
629
630     # When changing the secondary ip, verify if this is a single-homed to
631     # multi-homed transition or vice versa, and apply the relevant
632     # restrictions.
633     if self.op.secondary_ip:
634       # Ok even without locking, because this can't be changed by any LU
635       master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
636       master_singlehomed = master.secondary_ip == master.primary_ip
637       if master_singlehomed and self.op.secondary_ip != node.primary_ip:
638         if self.op.force and node.name == master.name:
639           self.LogWarning("Transitioning from single-homed to multi-homed"
640                           " cluster; all nodes will require a secondary IP"
641                           " address")
642         else:
643           raise errors.OpPrereqError("Changing the secondary ip on a"
644                                      " single-homed cluster requires the"
645                                      " --force option to be passed, and the"
646                                      " target node to be the master",
647                                      errors.ECODE_INVAL)
648       elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
649         if self.op.force and node.name == master.name:
650           self.LogWarning("Transitioning from multi-homed to single-homed"
651                           " cluster; secondary IP addresses will have to be"
652                           " removed")
653         else:
654           raise errors.OpPrereqError("Cannot set the secondary IP to be the"
655                                      " same as the primary IP on a multi-homed"
656                                      " cluster, unless the --force option is"
657                                      " passed, and the target node is the"
658                                      " master", errors.ECODE_INVAL)
659
660       assert not (frozenset(affected_instances) -
661                   self.owned_locks(locking.LEVEL_INSTANCE))
662
663       if node.offline:
664         if affected_instances:
665           msg = ("Cannot change secondary IP address: offline node has"
666                  " instances (%s) configured to use it" %
667                  utils.CommaJoin(affected_instances.keys()))
668           raise errors.OpPrereqError(msg, errors.ECODE_STATE)
669       else:
670         # On online nodes, check that no instances are running, and that
671         # the node has the new ip and we can reach it.
672         for instance in affected_instances.values():
673           CheckInstanceState(self, instance, INSTANCE_DOWN,
674                              msg="cannot change secondary ip")
675
676         _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
677         if master.name != node.name:
678           # check reachability from master secondary ip to new secondary ip
679           if not netutils.TcpPing(self.op.secondary_ip,
680                                   constants.DEFAULT_NODED_PORT,
681                                   source=master.secondary_ip):
682             raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
683                                        " based ping to node daemon port",
684                                        errors.ECODE_ENVIRON)
685
686     if self.op.ndparams:
687       new_ndparams = GetUpdatedParams(self.node.ndparams, self.op.ndparams)
688       utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
689       CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
690                            "node", "cluster or group")
691       self.new_ndparams = new_ndparams
692
693     if self.op.hv_state:
694       self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
695                                                 self.node.hv_state_static)
696
697     if self.op.disk_state:
698       self.new_disk_state = \
699         MergeAndVerifyDiskState(self.op.disk_state,
700                                 self.node.disk_state_static)
701
702   def Exec(self, feedback_fn):
703     """Modifies a node.
704
705     """
706     node = self.node
707     old_role = self.old_role
708     new_role = self.new_role
709
710     result = []
711
712     if self.op.ndparams:
713       node.ndparams = self.new_ndparams
714
715     if self.op.powered is not None:
716       node.powered = self.op.powered
717
718     if self.op.hv_state:
719       node.hv_state_static = self.new_hv_state
720
721     if self.op.disk_state:
722       node.disk_state_static = self.new_disk_state
723
724     for attr in ["master_capable", "vm_capable"]:
725       val = getattr(self.op, attr)
726       if val is not None:
727         setattr(node, attr, val)
728         result.append((attr, str(val)))
729
730     if new_role != old_role:
731       # Tell the node to demote itself, if no longer MC and not offline
732       if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE:
733         msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
734         if msg:
735           self.LogWarning("Node failed to demote itself: %s", msg)
736
737       new_flags = self._R2F[new_role]
738       for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
739         if of != nf:
740           result.append((desc, str(nf)))
741       (node.master_candidate, node.drained, node.offline) = new_flags
742
743       # we locked all nodes, we adjust the CP before updating this node
744       if self.lock_all:
745         AdjustCandidatePool(self, [node.name])
746
747     if self.op.secondary_ip:
748       node.secondary_ip = self.op.secondary_ip
749       result.append(("secondary_ip", self.op.secondary_ip))
750
751     # this will trigger configuration file update, if needed
752     self.cfg.Update(node, feedback_fn)
753
754     # this will trigger job queue propagation or cleanup if the mc
755     # flag changed
756     if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1:
757       self.context.ReaddNode(node)
758
759     return result
760
761
762 class LUNodePowercycle(NoHooksLU):
763   """Powercycles a node.
764
765   """
766   REQ_BGL = False
767
768   def CheckArguments(self):
769     self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
770     if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
771       raise errors.OpPrereqError("The node is the master and the force"
772                                  " parameter was not set",
773                                  errors.ECODE_INVAL)
774
775   def ExpandNames(self):
776     """Locking for PowercycleNode.
777
778     This is a last-resort option and shouldn't block on other
779     jobs. Therefore, we grab no locks.
780
781     """
782     self.needed_locks = {}
783
784   def Exec(self, feedback_fn):
785     """Reboots a node.
786
787     """
788     result = self.rpc.call_node_powercycle(self.op.node_name,
789                                            self.cfg.GetHypervisorType())
790     result.Raise("Failed to schedule the reboot")
791     return result.payload
792
793
794 def _GetNodeInstancesInner(cfg, fn):
795   return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
796
797
798 def _GetNodePrimaryInstances(cfg, node_name):
799   """Returns primary instances on a node.
800
801   """
802   return _GetNodeInstancesInner(cfg,
803                                 lambda inst: node_name == inst.primary_node)
804
805
806 def _GetNodeSecondaryInstances(cfg, node_name):
807   """Returns secondary instances on a node.
808
809   """
810   return _GetNodeInstancesInner(cfg,
811                                 lambda inst: node_name in inst.secondary_nodes)
812
813
814 def _GetNodeInstances(cfg, node_name):
815   """Returns a list of all primary and secondary instances on a node.
816
817   """
818
819   return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
820
821
822 class LUNodeEvacuate(NoHooksLU):
823   """Evacuates instances off a list of nodes.
824
825   """
826   REQ_BGL = False
827
828   _MODE2IALLOCATOR = {
829     constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
830     constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
831     constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
832     }
833   assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
834   assert (frozenset(_MODE2IALLOCATOR.values()) ==
835           constants.IALLOCATOR_NEVAC_MODES)
836
837   def CheckArguments(self):
838     CheckIAllocatorOrNode(self, "iallocator", "remote_node")
839
840   def ExpandNames(self):
841     self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
842
843     if self.op.remote_node is not None:
844       self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
845       assert self.op.remote_node
846
847       if self.op.remote_node == self.op.node_name:
848         raise errors.OpPrereqError("Can not use evacuated node as a new"
849                                    " secondary node", errors.ECODE_INVAL)
850
851       if self.op.mode != constants.NODE_EVAC_SEC:
852         raise errors.OpPrereqError("Without the use of an iallocator only"
853                                    " secondary instances can be evacuated",
854                                    errors.ECODE_INVAL)
855
856     # Declare locks
857     self.share_locks = ShareAll()
858     self.needed_locks = {
859       locking.LEVEL_INSTANCE: [],
860       locking.LEVEL_NODEGROUP: [],
861       locking.LEVEL_NODE: [],
862       }
863
864     # Determine nodes (via group) optimistically, needs verification once locks
865     # have been acquired
866     self.lock_nodes = self._DetermineNodes()
867
868   def _DetermineNodes(self):
869     """Gets the list of nodes to operate on.
870
871     """
872     if self.op.remote_node is None:
873       # Iallocator will choose any node(s) in the same group
874       group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name])
875     else:
876       group_nodes = frozenset([self.op.remote_node])
877
878     # Determine nodes to be locked
879     return set([self.op.node_name]) | group_nodes
880
881   def _DetermineInstances(self):
882     """Builds list of instances to operate on.
883
884     """
885     assert self.op.mode in constants.NODE_EVAC_MODES
886
887     if self.op.mode == constants.NODE_EVAC_PRI:
888       # Primary instances only
889       inst_fn = _GetNodePrimaryInstances
890       assert self.op.remote_node is None, \
891         "Evacuating primary instances requires iallocator"
892     elif self.op.mode == constants.NODE_EVAC_SEC:
893       # Secondary instances only
894       inst_fn = _GetNodeSecondaryInstances
895     else:
896       # All instances
897       assert self.op.mode == constants.NODE_EVAC_ALL
898       inst_fn = _GetNodeInstances
899       # TODO: In 2.6, change the iallocator interface to take an evacuation mode
900       # per instance
901       raise errors.OpPrereqError("Due to an issue with the iallocator"
902                                  " interface it is not possible to evacuate"
903                                  " all instances at once; specify explicitly"
904                                  " whether to evacuate primary or secondary"
905                                  " instances",
906                                  errors.ECODE_INVAL)
907
908     return inst_fn(self.cfg, self.op.node_name)
909
910   def DeclareLocks(self, level):
911     if level == locking.LEVEL_INSTANCE:
912       # Lock instances optimistically, needs verification once node and group
913       # locks have been acquired
914       self.needed_locks[locking.LEVEL_INSTANCE] = \
915         set(i.name for i in self._DetermineInstances())
916
917     elif level == locking.LEVEL_NODEGROUP:
918       # Lock node groups for all potential target nodes optimistically, needs
919       # verification once nodes have been acquired
920       self.needed_locks[locking.LEVEL_NODEGROUP] = \
921         self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
922
923     elif level == locking.LEVEL_NODE:
924       self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
925
926   def CheckPrereq(self):
927     # Verify locks
928     owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
929     owned_nodes = self.owned_locks(locking.LEVEL_NODE)
930     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
931
932     need_nodes = self._DetermineNodes()
933
934     if not owned_nodes.issuperset(need_nodes):
935       raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
936                                  " locks were acquired, current nodes are"
937                                  " are '%s', used to be '%s'; retry the"
938                                  " operation" %
939                                  (self.op.node_name,
940                                   utils.CommaJoin(need_nodes),
941                                   utils.CommaJoin(owned_nodes)),
942                                  errors.ECODE_STATE)
943
944     wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
945     if owned_groups != wanted_groups:
946       raise errors.OpExecError("Node groups changed since locks were acquired,"
947                                " current groups are '%s', used to be '%s';"
948                                " retry the operation" %
949                                (utils.CommaJoin(wanted_groups),
950                                 utils.CommaJoin(owned_groups)))
951
952     # Determine affected instances
953     self.instances = self._DetermineInstances()
954     self.instance_names = [i.name for i in self.instances]
955
956     if set(self.instance_names) != owned_instances:
957       raise errors.OpExecError("Instances on node '%s' changed since locks"
958                                " were acquired, current instances are '%s',"
959                                " used to be '%s'; retry the operation" %
960                                (self.op.node_name,
961                                 utils.CommaJoin(self.instance_names),
962                                 utils.CommaJoin(owned_instances)))
963
964     if self.instance_names:
965       self.LogInfo("Evacuating instances from node '%s': %s",
966                    self.op.node_name,
967                    utils.CommaJoin(utils.NiceSort(self.instance_names)))
968     else:
969       self.LogInfo("No instances to evacuate from node '%s'",
970                    self.op.node_name)
971
972     if self.op.remote_node is not None:
973       for i in self.instances:
974         if i.primary_node == self.op.remote_node:
975           raise errors.OpPrereqError("Node %s is the primary node of"
976                                      " instance %s, cannot use it as"
977                                      " secondary" %
978                                      (self.op.remote_node, i.name),
979                                      errors.ECODE_INVAL)
980
981   def Exec(self, feedback_fn):
982     assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
983
984     if not self.instance_names:
985       # No instances to evacuate
986       jobs = []
987
988     elif self.op.iallocator is not None:
989       # TODO: Implement relocation to other group
990       evac_mode = self._MODE2IALLOCATOR[self.op.mode]
991       req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
992                                      instances=list(self.instance_names))
993       ial = iallocator.IAllocator(self.cfg, self.rpc, req)
994
995       ial.Run(self.op.iallocator)
996
997       if not ial.success:
998         raise errors.OpPrereqError("Can't compute node evacuation using"
999                                    " iallocator '%s': %s" %
1000                                    (self.op.iallocator, ial.info),
1001                                    errors.ECODE_NORES)
1002
1003       jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1004
1005     elif self.op.remote_node is not None:
1006       assert self.op.mode == constants.NODE_EVAC_SEC
1007       jobs = [
1008         [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1009                                         remote_node=self.op.remote_node,
1010                                         disks=[],
1011                                         mode=constants.REPLACE_DISK_CHG,
1012                                         early_release=self.op.early_release)]
1013         for instance_name in self.instance_names]
1014
1015     else:
1016       raise errors.ProgrammerError("No iallocator or remote node")
1017
1018     return ResultWithJobs(jobs)
1019
1020
1021 class LUNodeMigrate(LogicalUnit):
1022   """Migrate all instances from a node.
1023
1024   """
1025   HPATH = "node-migrate"
1026   HTYPE = constants.HTYPE_NODE
1027   REQ_BGL = False
1028
1029   def CheckArguments(self):
1030     pass
1031
1032   def ExpandNames(self):
1033     self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
1034
1035     self.share_locks = ShareAll()
1036     self.needed_locks = {
1037       locking.LEVEL_NODE: [self.op.node_name],
1038       }
1039
1040   def BuildHooksEnv(self):
1041     """Build hooks env.
1042
1043     This runs on the master, the primary and all the secondaries.
1044
1045     """
1046     return {
1047       "NODE_NAME": self.op.node_name,
1048       "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1049       }
1050
1051   def BuildHooksNodes(self):
1052     """Build hooks nodes.
1053
1054     """
1055     nl = [self.cfg.GetMasterNode()]
1056     return (nl, nl)
1057
1058   def CheckPrereq(self):
1059     pass
1060
1061   def Exec(self, feedback_fn):
1062     # Prepare jobs for migration instances
1063     allow_runtime_changes = self.op.allow_runtime_changes
1064     jobs = [
1065       [opcodes.OpInstanceMigrate(instance_name=inst.name,
1066                                  mode=self.op.mode,
1067                                  live=self.op.live,
1068                                  iallocator=self.op.iallocator,
1069                                  target_node=self.op.target_node,
1070                                  allow_runtime_changes=allow_runtime_changes,
1071                                  ignore_ipolicy=self.op.ignore_ipolicy)]
1072       for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)]
1073
1074     # TODO: Run iallocator in this opcode and pass correct placement options to
1075     # OpInstanceMigrate. Since other jobs can modify the cluster between
1076     # running the iallocator and the actual migration, a good consistency model
1077     # will have to be found.
1078
1079     assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1080             frozenset([self.op.node_name]))
1081
1082     return ResultWithJobs(jobs)
1083
1084
1085 def _GetStorageTypeArgs(cfg, storage_type):
1086   """Returns the arguments for a storage type.
1087
1088   """
1089   # Special case for file storage
1090   if storage_type == constants.ST_FILE:
1091     # storage.FileStorage wants a list of storage directories
1092     return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1093
1094   return []
1095
1096
1097 class LUNodeModifyStorage(NoHooksLU):
1098   """Logical unit for modifying a storage volume on a node.
1099
1100   """
1101   REQ_BGL = False
1102
1103   def CheckArguments(self):
1104     self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
1105
1106     storage_type = self.op.storage_type
1107
1108     try:
1109       modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1110     except KeyError:
1111       raise errors.OpPrereqError("Storage units of type '%s' can not be"
1112                                  " modified" % storage_type,
1113                                  errors.ECODE_INVAL)
1114
1115     diff = set(self.op.changes.keys()) - modifiable
1116     if diff:
1117       raise errors.OpPrereqError("The following fields can not be modified for"
1118                                  " storage units of type '%s': %r" %
1119                                  (storage_type, list(diff)),
1120                                  errors.ECODE_INVAL)
1121
1122   def ExpandNames(self):
1123     self.needed_locks = {
1124       locking.LEVEL_NODE: self.op.node_name,
1125       }
1126
1127   def Exec(self, feedback_fn):
1128     """Computes the list of nodes and their attributes.
1129
1130     """
1131     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1132     result = self.rpc.call_storage_modify(self.op.node_name,
1133                                           self.op.storage_type, st_args,
1134                                           self.op.name, self.op.changes)
1135     result.Raise("Failed to modify storage unit '%s' on %s" %
1136                  (self.op.name, self.op.node_name))
1137
1138
1139 class NodeQuery(QueryBase):
1140   FIELDS = query.NODE_FIELDS
1141
1142   def ExpandNames(self, lu):
1143     lu.needed_locks = {}
1144     lu.share_locks = ShareAll()
1145
1146     if self.names:
1147       self.wanted = GetWantedNodes(lu, self.names)
1148     else:
1149       self.wanted = locking.ALL_SET
1150
1151     self.do_locking = (self.use_locking and
1152                        query.NQ_LIVE in self.requested_data)
1153
1154     if self.do_locking:
1155       # If any non-static field is requested we need to lock the nodes
1156       lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1157       lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1158
1159   def DeclareLocks(self, lu, level):
1160     pass
1161
1162   def _GetQueryData(self, lu):
1163     """Computes the list of nodes and their attributes.
1164
1165     """
1166     all_info = lu.cfg.GetAllNodesInfo()
1167
1168     nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1169
1170     # Gather data as requested
1171     if query.NQ_LIVE in self.requested_data:
1172       # filter out non-vm_capable nodes
1173       toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
1174
1175       es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, toquery_nodes)
1176       # FIXME: This currently maps everything to lvm, this should be more
1177       # flexible
1178       vg_req = rpc.BuildVgInfoQuery(lu.cfg)
1179       node_data = lu.rpc.call_node_info(toquery_nodes, vg_req,
1180                                         [lu.cfg.GetHypervisorType()], es_flags)
1181       live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload))
1182                        for (name, nresult) in node_data.items()
1183                        if not nresult.fail_msg and nresult.payload)
1184     else:
1185       live_data = None
1186
1187     if query.NQ_INST in self.requested_data:
1188       node_to_primary = dict([(name, set()) for name in nodenames])
1189       node_to_secondary = dict([(name, set()) for name in nodenames])
1190
1191       inst_data = lu.cfg.GetAllInstancesInfo()
1192
1193       for inst in inst_data.values():
1194         if inst.primary_node in node_to_primary:
1195           node_to_primary[inst.primary_node].add(inst.name)
1196         for secnode in inst.secondary_nodes:
1197           if secnode in node_to_secondary:
1198             node_to_secondary[secnode].add(inst.name)
1199     else:
1200       node_to_primary = None
1201       node_to_secondary = None
1202
1203     if query.NQ_OOB in self.requested_data:
1204       oob_support = dict((name, bool(SupportsOob(lu.cfg, node)))
1205                          for name, node in all_info.iteritems())
1206     else:
1207       oob_support = None
1208
1209     if query.NQ_GROUP in self.requested_data:
1210       groups = lu.cfg.GetAllNodeGroupsInfo()
1211     else:
1212       groups = {}
1213
1214     return query.NodeQueryData([all_info[name] for name in nodenames],
1215                                live_data, lu.cfg.GetMasterNode(),
1216                                node_to_primary, node_to_secondary, groups,
1217                                oob_support, lu.cfg.GetClusterInfo())
1218
1219
1220 class LUNodeQuery(NoHooksLU):
1221   """Logical unit for querying nodes.
1222
1223   """
1224   # pylint: disable=W0142
1225   REQ_BGL = False
1226
1227   def CheckArguments(self):
1228     self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1229                          self.op.output_fields, self.op.use_locking)
1230
1231   def ExpandNames(self):
1232     self.nq.ExpandNames(self)
1233
1234   def DeclareLocks(self, level):
1235     self.nq.DeclareLocks(self, level)
1236
1237   def Exec(self, feedback_fn):
1238     return self.nq.OldStyleQuery(self)
1239
1240
1241 def _CheckOutputFields(static, dynamic, selected):
1242   """Checks whether all selected fields are valid.
1243
1244   @type static: L{utils.FieldSet}
1245   @param static: static fields set
1246   @type dynamic: L{utils.FieldSet}
1247   @param dynamic: dynamic fields set
1248
1249   """
1250   f = utils.FieldSet()
1251   f.Extend(static)
1252   f.Extend(dynamic)
1253
1254   delta = f.NonMatching(selected)
1255   if delta:
1256     raise errors.OpPrereqError("Unknown output fields selected: %s"
1257                                % ",".join(delta), errors.ECODE_INVAL)
1258
1259
1260 class LUNodeQueryvols(NoHooksLU):
1261   """Logical unit for getting volumes on node(s).
1262
1263   """
1264   REQ_BGL = False
1265   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1266   _FIELDS_STATIC = utils.FieldSet("node")
1267
1268   def CheckArguments(self):
1269     _CheckOutputFields(static=self._FIELDS_STATIC,
1270                        dynamic=self._FIELDS_DYNAMIC,
1271                        selected=self.op.output_fields)
1272
1273   def ExpandNames(self):
1274     self.share_locks = ShareAll()
1275
1276     if self.op.nodes:
1277       self.needed_locks = {
1278         locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes),
1279         }
1280     else:
1281       self.needed_locks = {
1282         locking.LEVEL_NODE: locking.ALL_SET,
1283         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1284         }
1285
1286   def Exec(self, feedback_fn):
1287     """Computes the list of nodes and their attributes.
1288
1289     """
1290     nodenames = self.owned_locks(locking.LEVEL_NODE)
1291     volumes = self.rpc.call_node_volumes(nodenames)
1292
1293     ilist = self.cfg.GetAllInstancesInfo()
1294     vol2inst = MapInstanceDisksToNodes(ilist.values())
1295
1296     output = []
1297     for node in nodenames:
1298       nresult = volumes[node]
1299       if nresult.offline:
1300         continue
1301       msg = nresult.fail_msg
1302       if msg:
1303         self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
1304         continue
1305
1306       node_vols = sorted(nresult.payload,
1307                          key=operator.itemgetter("dev"))
1308
1309       for vol in node_vols:
1310         node_output = []
1311         for field in self.op.output_fields:
1312           if field == "node":
1313             val = node
1314           elif field == "phys":
1315             val = vol["dev"]
1316           elif field == "vg":
1317             val = vol["vg"]
1318           elif field == "name":
1319             val = vol["name"]
1320           elif field == "size":
1321             val = int(float(vol["size"]))
1322           elif field == "instance":
1323             val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-")
1324           else:
1325             raise errors.ParameterError(field)
1326           node_output.append(str(val))
1327
1328         output.append(node_output)
1329
1330     return output
1331
1332
1333 class LUNodeQueryStorage(NoHooksLU):
1334   """Logical unit for getting information on storage units on node(s).
1335
1336   """
1337   _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1338   REQ_BGL = False
1339
1340   def CheckArguments(self):
1341     _CheckOutputFields(static=self._FIELDS_STATIC,
1342                        dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1343                        selected=self.op.output_fields)
1344
1345   def ExpandNames(self):
1346     self.share_locks = ShareAll()
1347
1348     if self.op.nodes:
1349       self.needed_locks = {
1350         locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes),
1351         }
1352     else:
1353       self.needed_locks = {
1354         locking.LEVEL_NODE: locking.ALL_SET,
1355         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1356         }
1357
1358   def Exec(self, feedback_fn):
1359     """Computes the list of nodes and their attributes.
1360
1361     """
1362     self.nodes = self.owned_locks(locking.LEVEL_NODE)
1363
1364     # Always get name to sort by
1365     if constants.SF_NAME in self.op.output_fields:
1366       fields = self.op.output_fields[:]
1367     else:
1368       fields = [constants.SF_NAME] + self.op.output_fields
1369
1370     # Never ask for node or type as it's only known to the LU
1371     for extra in [constants.SF_NODE, constants.SF_TYPE]:
1372       while extra in fields:
1373         fields.remove(extra)
1374
1375     field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1376     name_idx = field_idx[constants.SF_NAME]
1377
1378     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1379     data = self.rpc.call_storage_list(self.nodes,
1380                                       self.op.storage_type, st_args,
1381                                       self.op.name, fields)
1382
1383     result = []
1384
1385     for node in utils.NiceSort(self.nodes):
1386       nresult = data[node]
1387       if nresult.offline:
1388         continue
1389
1390       msg = nresult.fail_msg
1391       if msg:
1392         self.LogWarning("Can't get storage data from node %s: %s", node, msg)
1393         continue
1394
1395       rows = dict([(row[name_idx], row) for row in nresult.payload])
1396
1397       for name in utils.NiceSort(rows.keys()):
1398         row = rows[name]
1399
1400         out = []
1401
1402         for field in self.op.output_fields:
1403           if field == constants.SF_NODE:
1404             val = node
1405           elif field == constants.SF_TYPE:
1406             val = self.op.storage_type
1407           elif field in field_idx:
1408             val = row[field_idx[field]]
1409           else:
1410             raise errors.ParameterError(field)
1411
1412           out.append(val)
1413
1414         result.append(out)
1415
1416     return result
1417
1418
1419 class LUNodeRemove(LogicalUnit):
1420   """Logical unit for removing a node.
1421
1422   """
1423   HPATH = "node-remove"
1424   HTYPE = constants.HTYPE_NODE
1425
1426   def BuildHooksEnv(self):
1427     """Build hooks env.
1428
1429     """
1430     return {
1431       "OP_TARGET": self.op.node_name,
1432       "NODE_NAME": self.op.node_name,
1433       }
1434
1435   def BuildHooksNodes(self):
1436     """Build hooks nodes.
1437
1438     This doesn't run on the target node in the pre phase as a failed
1439     node would then be impossible to remove.
1440
1441     """
1442     all_nodes = self.cfg.GetNodeList()
1443     try:
1444       all_nodes.remove(self.op.node_name)
1445     except ValueError:
1446       pass
1447     return (all_nodes, all_nodes)
1448
1449   def CheckPrereq(self):
1450     """Check prerequisites.
1451
1452     This checks:
1453      - the node exists in the configuration
1454      - it does not have primary or secondary instances
1455      - it's not the master
1456
1457     Any errors are signaled by raising errors.OpPrereqError.
1458
1459     """
1460     self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
1461     node = self.cfg.GetNodeInfo(self.op.node_name)
1462     assert node is not None
1463
1464     masternode = self.cfg.GetMasterNode()
1465     if node.name == masternode:
1466       raise errors.OpPrereqError("Node is the master node, failover to another"
1467                                  " node is required", errors.ECODE_INVAL)
1468
1469     for instance_name, instance in self.cfg.GetAllInstancesInfo().items():
1470       if node.name in instance.all_nodes:
1471         raise errors.OpPrereqError("Instance %s is still running on the node,"
1472                                    " please remove first" % instance_name,
1473                                    errors.ECODE_INVAL)
1474     self.op.node_name = node.name
1475     self.node = node
1476
1477   def Exec(self, feedback_fn):
1478     """Removes the node from the cluster.
1479
1480     """
1481     node = self.node
1482     logging.info("Stopping the node daemon and removing configs from node %s",
1483                  node.name)
1484
1485     modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1486
1487     assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1488       "Not owning BGL"
1489
1490     # Promote nodes to master candidate as needed
1491     AdjustCandidatePool(self, exceptions=[node.name])
1492     self.context.RemoveNode(node.name)
1493
1494     # Run post hooks on the node before it's removed
1495     RunPostHook(self, node.name)
1496
1497     result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
1498     msg = result.fail_msg
1499     if msg:
1500       self.LogWarning("Errors encountered on the remote node while leaving"
1501                       " the cluster: %s", msg)
1502
1503     # Remove node from our /etc/hosts
1504     if self.cfg.GetClusterInfo().modify_etc_hosts:
1505       master_node = self.cfg.GetMasterNode()
1506       result = self.rpc.call_etc_hosts_modify(master_node,
1507                                               constants.ETC_HOSTS_REMOVE,
1508                                               node.name, None)
1509       result.Raise("Can't update hosts file with new host data")
1510       RedistributeAncillaryFiles(self)
1511
1512
1513 class LURepairNodeStorage(NoHooksLU):
1514   """Repairs the volume group on a node.
1515
1516   """
1517   REQ_BGL = False
1518
1519   def CheckArguments(self):
1520     self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
1521
1522     storage_type = self.op.storage_type
1523
1524     if (constants.SO_FIX_CONSISTENCY not in
1525         constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1526       raise errors.OpPrereqError("Storage units of type '%s' can not be"
1527                                  " repaired" % storage_type,
1528                                  errors.ECODE_INVAL)
1529
1530   def ExpandNames(self):
1531     self.needed_locks = {
1532       locking.LEVEL_NODE: [self.op.node_name],
1533       }
1534
1535   def _CheckFaultyDisks(self, instance, node_name):
1536     """Ensure faulty disks abort the opcode or at least warn."""
1537     try:
1538       if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1539                                  node_name, True):
1540         raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1541                                    " node '%s'" % (instance.name, node_name),
1542                                    errors.ECODE_STATE)
1543     except errors.OpPrereqError, err:
1544       if self.op.ignore_consistency:
1545         self.LogWarning(str(err.args[0]))
1546       else:
1547         raise
1548
1549   def CheckPrereq(self):
1550     """Check prerequisites.
1551
1552     """
1553     # Check whether any instance on this node has faulty disks
1554     for inst in _GetNodeInstances(self.cfg, self.op.node_name):
1555       if not inst.disks_active:
1556         continue
1557       check_nodes = set(inst.all_nodes)
1558       check_nodes.discard(self.op.node_name)
1559       for inst_node_name in check_nodes:
1560         self._CheckFaultyDisks(inst, inst_node_name)
1561
1562   def Exec(self, feedback_fn):
1563     feedback_fn("Repairing storage unit '%s' on %s ..." %
1564                 (self.op.name, self.op.node_name))
1565
1566     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1567     result = self.rpc.call_storage_execute(self.op.node_name,
1568                                            self.op.storage_type, st_args,
1569                                            self.op.name,
1570                                            constants.SO_FIX_CONSISTENCY)
1571     result.Raise("Failed to repair storage unit '%s' on %s" %
1572                  (self.op.name, self.op.node_name))