Extract node related logical units from cmdlib
[ganeti-local] / lib / cmdlib / node.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with nodes."""
23
24 import logging
25 import operator
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti import netutils
31 from ganeti import objects
32 from ganeti import opcodes
33 from ganeti import qlang
34 from ganeti import query
35 from ganeti import rpc
36 from ganeti import utils
37 from ganeti.masterd import iallocator
38
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, _QueryBase, \
40   ResultWithJobs
41 from ganeti.cmdlib.common import _CheckParamsNotGlobal, \
42   _MergeAndVerifyHvState, _MergeAndVerifyDiskState, \
43   _IsExclusiveStorageEnabledNode, _CheckNodePVs, \
44   _RedistributeAncillaryFiles, _ExpandNodeName, _ShareAll, _SupportsOob, \
45   _CheckInstanceState, INSTANCE_DOWN, _GetUpdatedParams, \
46   _AdjustCandidatePool, _CheckIAllocatorOrNode, _LoadNodeEvacResult, \
47   _GetWantedNodes, _MapInstanceDisksToNodes, _RunPostHook, \
48   _FindFaultyInstanceDisks
49
50
51 def _DecideSelfPromotion(lu, exceptions=None):
52   """Decide whether I should promote myself as a master candidate.
53
54   """
55   cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56   mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57   # the new node will increase mc_max with one, so:
58   mc_should = min(mc_should + 1, cp_size)
59   return mc_now < mc_should
60
61
62 def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63   """Ensure that a node has the given secondary ip.
64
65   @type lu: L{LogicalUnit}
66   @param lu: the LU on behalf of which we make the check
67   @type node: string
68   @param node: the node to check
69   @type secondary_ip: string
70   @param secondary_ip: the ip to check
71   @type prereq: boolean
72   @param prereq: whether to throw a prerequisite or an execute error
73   @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
74   @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
75
76   """
77   result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
78   result.Raise("Failure checking secondary ip on node %s" % node,
79                prereq=prereq, ecode=errors.ECODE_ENVIRON)
80   if not result.payload:
81     msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
82            " please fix and re-run this command" % secondary_ip)
83     if prereq:
84       raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
85     else:
86       raise errors.OpExecError(msg)
87
88
89 class LUNodeAdd(LogicalUnit):
90   """Logical unit for adding node to the cluster.
91
92   """
93   HPATH = "node-add"
94   HTYPE = constants.HTYPE_NODE
95   _NFLAGS = ["master_capable", "vm_capable"]
96
97   def CheckArguments(self):
98     self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
99     # validate/normalize the node name
100     self.hostname = netutils.GetHostname(name=self.op.node_name,
101                                          family=self.primary_ip_family)
102     self.op.node_name = self.hostname.name
103
104     if self.op.readd and self.op.node_name == self.cfg.GetMasterNode():
105       raise errors.OpPrereqError("Cannot readd the master node",
106                                  errors.ECODE_STATE)
107
108     if self.op.readd and self.op.group:
109       raise errors.OpPrereqError("Cannot pass a node group when a node is"
110                                  " being readded", errors.ECODE_INVAL)
111
112   def BuildHooksEnv(self):
113     """Build hooks env.
114
115     This will run on all nodes before, and on all nodes + the new node after.
116
117     """
118     return {
119       "OP_TARGET": self.op.node_name,
120       "NODE_NAME": self.op.node_name,
121       "NODE_PIP": self.op.primary_ip,
122       "NODE_SIP": self.op.secondary_ip,
123       "MASTER_CAPABLE": str(self.op.master_capable),
124       "VM_CAPABLE": str(self.op.vm_capable),
125       }
126
127   def BuildHooksNodes(self):
128     """Build hooks nodes.
129
130     """
131     # Exclude added node
132     pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name]))
133     post_nodes = pre_nodes + [self.op.node_name, ]
134
135     return (pre_nodes, post_nodes)
136
137   def CheckPrereq(self):
138     """Check prerequisites.
139
140     This checks:
141      - the new node is not already in the config
142      - it is resolvable
143      - its parameters (single/dual homed) matches the cluster
144
145     Any errors are signaled by raising errors.OpPrereqError.
146
147     """
148     cfg = self.cfg
149     hostname = self.hostname
150     node = hostname.name
151     primary_ip = self.op.primary_ip = hostname.ip
152     if self.op.secondary_ip is None:
153       if self.primary_ip_family == netutils.IP6Address.family:
154         raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
155                                    " IPv4 address must be given as secondary",
156                                    errors.ECODE_INVAL)
157       self.op.secondary_ip = primary_ip
158
159     secondary_ip = self.op.secondary_ip
160     if not netutils.IP4Address.IsValid(secondary_ip):
161       raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
162                                  " address" % secondary_ip, errors.ECODE_INVAL)
163
164     node_list = cfg.GetNodeList()
165     if not self.op.readd and node in node_list:
166       raise errors.OpPrereqError("Node %s is already in the configuration" %
167                                  node, errors.ECODE_EXISTS)
168     elif self.op.readd and node not in node_list:
169       raise errors.OpPrereqError("Node %s is not in the configuration" % node,
170                                  errors.ECODE_NOENT)
171
172     self.changed_primary_ip = False
173
174     for existing_node_name, existing_node in cfg.GetMultiNodeInfo(node_list):
175       if self.op.readd and node == existing_node_name:
176         if existing_node.secondary_ip != secondary_ip:
177           raise errors.OpPrereqError("Readded node doesn't have the same IP"
178                                      " address configuration as before",
179                                      errors.ECODE_INVAL)
180         if existing_node.primary_ip != primary_ip:
181           self.changed_primary_ip = True
182
183         continue
184
185       if (existing_node.primary_ip == primary_ip or
186           existing_node.secondary_ip == primary_ip or
187           existing_node.primary_ip == secondary_ip or
188           existing_node.secondary_ip == secondary_ip):
189         raise errors.OpPrereqError("New node ip address(es) conflict with"
190                                    " existing node %s" % existing_node.name,
191                                    errors.ECODE_NOTUNIQUE)
192
193     # After this 'if' block, None is no longer a valid value for the
194     # _capable op attributes
195     if self.op.readd:
196       old_node = self.cfg.GetNodeInfo(node)
197       assert old_node is not None, "Can't retrieve locked node %s" % node
198       for attr in self._NFLAGS:
199         if getattr(self.op, attr) is None:
200           setattr(self.op, attr, getattr(old_node, attr))
201     else:
202       for attr in self._NFLAGS:
203         if getattr(self.op, attr) is None:
204           setattr(self.op, attr, True)
205
206     if self.op.readd and not self.op.vm_capable:
207       pri, sec = cfg.GetNodeInstances(node)
208       if pri or sec:
209         raise errors.OpPrereqError("Node %s being re-added with vm_capable"
210                                    " flag set to false, but it already holds"
211                                    " instances" % node,
212                                    errors.ECODE_STATE)
213
214     # check that the type of the node (single versus dual homed) is the
215     # same as for the master
216     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
217     master_singlehomed = myself.secondary_ip == myself.primary_ip
218     newbie_singlehomed = secondary_ip == primary_ip
219     if master_singlehomed != newbie_singlehomed:
220       if master_singlehomed:
221         raise errors.OpPrereqError("The master has no secondary ip but the"
222                                    " new node has one",
223                                    errors.ECODE_INVAL)
224       else:
225         raise errors.OpPrereqError("The master has a secondary ip but the"
226                                    " new node doesn't have one",
227                                    errors.ECODE_INVAL)
228
229     # checks reachability
230     if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
231       raise errors.OpPrereqError("Node not reachable by ping",
232                                  errors.ECODE_ENVIRON)
233
234     if not newbie_singlehomed:
235       # check reachability from my secondary ip to newbie's secondary ip
236       if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
237                               source=myself.secondary_ip):
238         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
239                                    " based ping to node daemon port",
240                                    errors.ECODE_ENVIRON)
241
242     if self.op.readd:
243       exceptions = [node]
244     else:
245       exceptions = []
246
247     if self.op.master_capable:
248       self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
249     else:
250       self.master_candidate = False
251
252     if self.op.readd:
253       self.new_node = old_node
254     else:
255       node_group = cfg.LookupNodeGroup(self.op.group)
256       self.new_node = objects.Node(name=node,
257                                    primary_ip=primary_ip,
258                                    secondary_ip=secondary_ip,
259                                    master_candidate=self.master_candidate,
260                                    offline=False, drained=False,
261                                    group=node_group, ndparams={})
262
263     if self.op.ndparams:
264       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
265       _CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
266                             "node", "cluster or group")
267
268     if self.op.hv_state:
269       self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, None)
270
271     if self.op.disk_state:
272       self.new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, None)
273
274     # TODO: If we need to have multiple DnsOnlyRunner we probably should make
275     #       it a property on the base class.
276     rpcrunner = rpc.DnsOnlyRunner()
277     result = rpcrunner.call_version([node])[node]
278     result.Raise("Can't get version information from node %s" % node)
279     if constants.PROTOCOL_VERSION == result.payload:
280       logging.info("Communication to node %s fine, sw version %s match",
281                    node, result.payload)
282     else:
283       raise errors.OpPrereqError("Version mismatch master version %s,"
284                                  " node version %s" %
285                                  (constants.PROTOCOL_VERSION, result.payload),
286                                  errors.ECODE_ENVIRON)
287
288     vg_name = cfg.GetVGName()
289     if vg_name is not None:
290       vparams = {constants.NV_PVLIST: [vg_name]}
291       excl_stor = _IsExclusiveStorageEnabledNode(cfg, self.new_node)
292       cname = self.cfg.GetClusterName()
293       result = rpcrunner.call_node_verify_light([node], vparams, cname)[node]
294       (errmsgs, _) = _CheckNodePVs(result.payload, excl_stor)
295       if errmsgs:
296         raise errors.OpPrereqError("Checks on node PVs failed: %s" %
297                                    "; ".join(errmsgs), errors.ECODE_ENVIRON)
298
299   def Exec(self, feedback_fn):
300     """Adds the new node to the cluster.
301
302     """
303     new_node = self.new_node
304     node = new_node.name
305
306     assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
307       "Not owning BGL"
308
309     # We adding a new node so we assume it's powered
310     new_node.powered = True
311
312     # for re-adds, reset the offline/drained/master-candidate flags;
313     # we need to reset here, otherwise offline would prevent RPC calls
314     # later in the procedure; this also means that if the re-add
315     # fails, we are left with a non-offlined, broken node
316     if self.op.readd:
317       new_node.drained = new_node.offline = False # pylint: disable=W0201
318       self.LogInfo("Readding a node, the offline/drained flags were reset")
319       # if we demote the node, we do cleanup later in the procedure
320       new_node.master_candidate = self.master_candidate
321       if self.changed_primary_ip:
322         new_node.primary_ip = self.op.primary_ip
323
324     # copy the master/vm_capable flags
325     for attr in self._NFLAGS:
326       setattr(new_node, attr, getattr(self.op, attr))
327
328     # notify the user about any possible mc promotion
329     if new_node.master_candidate:
330       self.LogInfo("Node will be a master candidate")
331
332     if self.op.ndparams:
333       new_node.ndparams = self.op.ndparams
334     else:
335       new_node.ndparams = {}
336
337     if self.op.hv_state:
338       new_node.hv_state_static = self.new_hv_state
339
340     if self.op.disk_state:
341       new_node.disk_state_static = self.new_disk_state
342
343     # Add node to our /etc/hosts, and add key to known_hosts
344     if self.cfg.GetClusterInfo().modify_etc_hosts:
345       master_node = self.cfg.GetMasterNode()
346       result = self.rpc.call_etc_hosts_modify(master_node,
347                                               constants.ETC_HOSTS_ADD,
348                                               self.hostname.name,
349                                               self.hostname.ip)
350       result.Raise("Can't update hosts file with new host data")
351
352     if new_node.secondary_ip != new_node.primary_ip:
353       _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip,
354                                False)
355
356     node_verify_list = [self.cfg.GetMasterNode()]
357     node_verify_param = {
358       constants.NV_NODELIST: ([node], {}),
359       # TODO: do a node-net-test as well?
360     }
361
362     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
363                                        self.cfg.GetClusterName())
364     for verifier in node_verify_list:
365       result[verifier].Raise("Cannot communicate with node %s" % verifier)
366       nl_payload = result[verifier].payload[constants.NV_NODELIST]
367       if nl_payload:
368         for failed in nl_payload:
369           feedback_fn("ssh/hostname verification failed"
370                       " (checking from %s): %s" %
371                       (verifier, nl_payload[failed]))
372         raise errors.OpExecError("ssh/hostname verification failed")
373
374     if self.op.readd:
375       _RedistributeAncillaryFiles(self)
376       self.context.ReaddNode(new_node)
377       # make sure we redistribute the config
378       self.cfg.Update(new_node, feedback_fn)
379       # and make sure the new node will not have old files around
380       if not new_node.master_candidate:
381         result = self.rpc.call_node_demote_from_mc(new_node.name)
382         msg = result.fail_msg
383         if msg:
384           self.LogWarning("Node failed to demote itself from master"
385                           " candidate status: %s" % msg)
386     else:
387       _RedistributeAncillaryFiles(self, additional_nodes=[node],
388                                   additional_vm=self.op.vm_capable)
389       self.context.AddNode(new_node, self.proc.GetECId())
390
391
392 class LUNodeSetParams(LogicalUnit):
393   """Modifies the parameters of a node.
394
395   @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
396       to the node role (as _ROLE_*)
397   @cvar _R2F: a dictionary from node role to tuples of flags
398   @cvar _FLAGS: a list of attribute names corresponding to the flags
399
400   """
401   HPATH = "node-modify"
402   HTYPE = constants.HTYPE_NODE
403   REQ_BGL = False
404   (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
405   _F2R = {
406     (True, False, False): _ROLE_CANDIDATE,
407     (False, True, False): _ROLE_DRAINED,
408     (False, False, True): _ROLE_OFFLINE,
409     (False, False, False): _ROLE_REGULAR,
410     }
411   _R2F = dict((v, k) for k, v in _F2R.items())
412   _FLAGS = ["master_candidate", "drained", "offline"]
413
414   def CheckArguments(self):
415     self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
416     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
417                 self.op.master_capable, self.op.vm_capable,
418                 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
419                 self.op.disk_state]
420     if all_mods.count(None) == len(all_mods):
421       raise errors.OpPrereqError("Please pass at least one modification",
422                                  errors.ECODE_INVAL)
423     if all_mods.count(True) > 1:
424       raise errors.OpPrereqError("Can't set the node into more than one"
425                                  " state at the same time",
426                                  errors.ECODE_INVAL)
427
428     # Boolean value that tells us whether we might be demoting from MC
429     self.might_demote = (self.op.master_candidate is False or
430                          self.op.offline is True or
431                          self.op.drained is True or
432                          self.op.master_capable is False)
433
434     if self.op.secondary_ip:
435       if not netutils.IP4Address.IsValid(self.op.secondary_ip):
436         raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
437                                    " address" % self.op.secondary_ip,
438                                    errors.ECODE_INVAL)
439
440     self.lock_all = self.op.auto_promote and self.might_demote
441     self.lock_instances = self.op.secondary_ip is not None
442
443   def _InstanceFilter(self, instance):
444     """Filter for getting affected instances.
445
446     """
447     return (instance.disk_template in constants.DTS_INT_MIRROR and
448             self.op.node_name in instance.all_nodes)
449
450   def ExpandNames(self):
451     if self.lock_all:
452       self.needed_locks = {
453         locking.LEVEL_NODE: locking.ALL_SET,
454
455         # Block allocations when all nodes are locked
456         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
457         }
458     else:
459       self.needed_locks = {
460         locking.LEVEL_NODE: self.op.node_name,
461         }
462
463     # Since modifying a node can have severe effects on currently running
464     # operations the resource lock is at least acquired in shared mode
465     self.needed_locks[locking.LEVEL_NODE_RES] = \
466       self.needed_locks[locking.LEVEL_NODE]
467
468     # Get all locks except nodes in shared mode; they are not used for anything
469     # but read-only access
470     self.share_locks = _ShareAll()
471     self.share_locks[locking.LEVEL_NODE] = 0
472     self.share_locks[locking.LEVEL_NODE_RES] = 0
473     self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
474
475     if self.lock_instances:
476       self.needed_locks[locking.LEVEL_INSTANCE] = \
477         frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
478
479   def BuildHooksEnv(self):
480     """Build hooks env.
481
482     This runs on the master node.
483
484     """
485     return {
486       "OP_TARGET": self.op.node_name,
487       "MASTER_CANDIDATE": str(self.op.master_candidate),
488       "OFFLINE": str(self.op.offline),
489       "DRAINED": str(self.op.drained),
490       "MASTER_CAPABLE": str(self.op.master_capable),
491       "VM_CAPABLE": str(self.op.vm_capable),
492       }
493
494   def BuildHooksNodes(self):
495     """Build hooks nodes.
496
497     """
498     nl = [self.cfg.GetMasterNode(), self.op.node_name]
499     return (nl, nl)
500
501   def CheckPrereq(self):
502     """Check prerequisites.
503
504     This only checks the instance list against the existing names.
505
506     """
507     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
508
509     if self.lock_instances:
510       affected_instances = \
511         self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
512
513       # Verify instance locks
514       owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
515       wanted_instances = frozenset(affected_instances.keys())
516       if wanted_instances - owned_instances:
517         raise errors.OpPrereqError("Instances affected by changing node %s's"
518                                    " secondary IP address have changed since"
519                                    " locks were acquired, wanted '%s', have"
520                                    " '%s'; retry the operation" %
521                                    (self.op.node_name,
522                                     utils.CommaJoin(wanted_instances),
523                                     utils.CommaJoin(owned_instances)),
524                                    errors.ECODE_STATE)
525     else:
526       affected_instances = None
527
528     if (self.op.master_candidate is not None or
529         self.op.drained is not None or
530         self.op.offline is not None):
531       # we can't change the master's node flags
532       if self.op.node_name == self.cfg.GetMasterNode():
533         raise errors.OpPrereqError("The master role can be changed"
534                                    " only via master-failover",
535                                    errors.ECODE_INVAL)
536
537     if self.op.master_candidate and not node.master_capable:
538       raise errors.OpPrereqError("Node %s is not master capable, cannot make"
539                                  " it a master candidate" % node.name,
540                                  errors.ECODE_STATE)
541
542     if self.op.vm_capable is False:
543       (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name)
544       if ipri or isec:
545         raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
546                                    " the vm_capable flag" % node.name,
547                                    errors.ECODE_STATE)
548
549     if node.master_candidate and self.might_demote and not self.lock_all:
550       assert not self.op.auto_promote, "auto_promote set but lock_all not"
551       # check if after removing the current node, we're missing master
552       # candidates
553       (mc_remaining, mc_should, _) = \
554           self.cfg.GetMasterCandidateStats(exceptions=[node.name])
555       if mc_remaining < mc_should:
556         raise errors.OpPrereqError("Not enough master candidates, please"
557                                    " pass auto promote option to allow"
558                                    " promotion (--auto-promote or RAPI"
559                                    " auto_promote=True)", errors.ECODE_STATE)
560
561     self.old_flags = old_flags = (node.master_candidate,
562                                   node.drained, node.offline)
563     assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
564     self.old_role = old_role = self._F2R[old_flags]
565
566     # Check for ineffective changes
567     for attr in self._FLAGS:
568       if (getattr(self.op, attr) is False and getattr(node, attr) is False):
569         self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
570         setattr(self.op, attr, None)
571
572     # Past this point, any flag change to False means a transition
573     # away from the respective state, as only real changes are kept
574
575     # TODO: We might query the real power state if it supports OOB
576     if _SupportsOob(self.cfg, node):
577       if self.op.offline is False and not (node.powered or
578                                            self.op.powered is True):
579         raise errors.OpPrereqError(("Node %s needs to be turned on before its"
580                                     " offline status can be reset") %
581                                    self.op.node_name, errors.ECODE_STATE)
582     elif self.op.powered is not None:
583       raise errors.OpPrereqError(("Unable to change powered state for node %s"
584                                   " as it does not support out-of-band"
585                                   " handling") % self.op.node_name,
586                                  errors.ECODE_STATE)
587
588     # If we're being deofflined/drained, we'll MC ourself if needed
589     if (self.op.drained is False or self.op.offline is False or
590         (self.op.master_capable and not node.master_capable)):
591       if _DecideSelfPromotion(self):
592         self.op.master_candidate = True
593         self.LogInfo("Auto-promoting node to master candidate")
594
595     # If we're no longer master capable, we'll demote ourselves from MC
596     if self.op.master_capable is False and node.master_candidate:
597       self.LogInfo("Demoting from master candidate")
598       self.op.master_candidate = False
599
600     # Compute new role
601     assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
602     if self.op.master_candidate:
603       new_role = self._ROLE_CANDIDATE
604     elif self.op.drained:
605       new_role = self._ROLE_DRAINED
606     elif self.op.offline:
607       new_role = self._ROLE_OFFLINE
608     elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
609       # False is still in new flags, which means we're un-setting (the
610       # only) True flag
611       new_role = self._ROLE_REGULAR
612     else: # no new flags, nothing, keep old role
613       new_role = old_role
614
615     self.new_role = new_role
616
617     if old_role == self._ROLE_OFFLINE and new_role != old_role:
618       # Trying to transition out of offline status
619       result = self.rpc.call_version([node.name])[node.name]
620       if result.fail_msg:
621         raise errors.OpPrereqError("Node %s is being de-offlined but fails"
622                                    " to report its version: %s" %
623                                    (node.name, result.fail_msg),
624                                    errors.ECODE_STATE)
625       else:
626         self.LogWarning("Transitioning node from offline to online state"
627                         " without using re-add. Please make sure the node"
628                         " is healthy!")
629
630     # When changing the secondary ip, verify if this is a single-homed to
631     # multi-homed transition or vice versa, and apply the relevant
632     # restrictions.
633     if self.op.secondary_ip:
634       # Ok even without locking, because this can't be changed by any LU
635       master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
636       master_singlehomed = master.secondary_ip == master.primary_ip
637       if master_singlehomed and self.op.secondary_ip != node.primary_ip:
638         if self.op.force and node.name == master.name:
639           self.LogWarning("Transitioning from single-homed to multi-homed"
640                           " cluster; all nodes will require a secondary IP"
641                           " address")
642         else:
643           raise errors.OpPrereqError("Changing the secondary ip on a"
644                                      " single-homed cluster requires the"
645                                      " --force option to be passed, and the"
646                                      " target node to be the master",
647                                      errors.ECODE_INVAL)
648       elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
649         if self.op.force and node.name == master.name:
650           self.LogWarning("Transitioning from multi-homed to single-homed"
651                           " cluster; secondary IP addresses will have to be"
652                           " removed")
653         else:
654           raise errors.OpPrereqError("Cannot set the secondary IP to be the"
655                                      " same as the primary IP on a multi-homed"
656                                      " cluster, unless the --force option is"
657                                      " passed, and the target node is the"
658                                      " master", errors.ECODE_INVAL)
659
660       assert not (frozenset(affected_instances) -
661                   self.owned_locks(locking.LEVEL_INSTANCE))
662
663       if node.offline:
664         if affected_instances:
665           msg = ("Cannot change secondary IP address: offline node has"
666                  " instances (%s) configured to use it" %
667                  utils.CommaJoin(affected_instances.keys()))
668           raise errors.OpPrereqError(msg, errors.ECODE_STATE)
669       else:
670         # On online nodes, check that no instances are running, and that
671         # the node has the new ip and we can reach it.
672         for instance in affected_instances.values():
673           _CheckInstanceState(self, instance, INSTANCE_DOWN,
674                               msg="cannot change secondary ip")
675
676         _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
677         if master.name != node.name:
678           # check reachability from master secondary ip to new secondary ip
679           if not netutils.TcpPing(self.op.secondary_ip,
680                                   constants.DEFAULT_NODED_PORT,
681                                   source=master.secondary_ip):
682             raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
683                                        " based ping to node daemon port",
684                                        errors.ECODE_ENVIRON)
685
686     if self.op.ndparams:
687       new_ndparams = _GetUpdatedParams(self.node.ndparams, self.op.ndparams)
688       utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
689       _CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
690                             "node", "cluster or group")
691       self.new_ndparams = new_ndparams
692
693     if self.op.hv_state:
694       self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
695                                                  self.node.hv_state_static)
696
697     if self.op.disk_state:
698       self.new_disk_state = \
699         _MergeAndVerifyDiskState(self.op.disk_state,
700                                  self.node.disk_state_static)
701
702   def Exec(self, feedback_fn):
703     """Modifies a node.
704
705     """
706     node = self.node
707     old_role = self.old_role
708     new_role = self.new_role
709
710     result = []
711
712     if self.op.ndparams:
713       node.ndparams = self.new_ndparams
714
715     if self.op.powered is not None:
716       node.powered = self.op.powered
717
718     if self.op.hv_state:
719       node.hv_state_static = self.new_hv_state
720
721     if self.op.disk_state:
722       node.disk_state_static = self.new_disk_state
723
724     for attr in ["master_capable", "vm_capable"]:
725       val = getattr(self.op, attr)
726       if val is not None:
727         setattr(node, attr, val)
728         result.append((attr, str(val)))
729
730     if new_role != old_role:
731       # Tell the node to demote itself, if no longer MC and not offline
732       if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE:
733         msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
734         if msg:
735           self.LogWarning("Node failed to demote itself: %s", msg)
736
737       new_flags = self._R2F[new_role]
738       for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
739         if of != nf:
740           result.append((desc, str(nf)))
741       (node.master_candidate, node.drained, node.offline) = new_flags
742
743       # we locked all nodes, we adjust the CP before updating this node
744       if self.lock_all:
745         _AdjustCandidatePool(self, [node.name])
746
747     if self.op.secondary_ip:
748       node.secondary_ip = self.op.secondary_ip
749       result.append(("secondary_ip", self.op.secondary_ip))
750
751     # this will trigger configuration file update, if needed
752     self.cfg.Update(node, feedback_fn)
753
754     # this will trigger job queue propagation or cleanup if the mc
755     # flag changed
756     if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1:
757       self.context.ReaddNode(node)
758
759     return result
760
761
762 class LUNodePowercycle(NoHooksLU):
763   """Powercycles a node.
764
765   """
766   REQ_BGL = False
767
768   def CheckArguments(self):
769     self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
770     if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
771       raise errors.OpPrereqError("The node is the master and the force"
772                                  " parameter was not set",
773                                  errors.ECODE_INVAL)
774
775   def ExpandNames(self):
776     """Locking for PowercycleNode.
777
778     This is a last-resort option and shouldn't block on other
779     jobs. Therefore, we grab no locks.
780
781     """
782     self.needed_locks = {}
783
784   def Exec(self, feedback_fn):
785     """Reboots a node.
786
787     """
788     result = self.rpc.call_node_powercycle(self.op.node_name,
789                                            self.cfg.GetHypervisorType())
790     result.Raise("Failed to schedule the reboot")
791     return result.payload
792
793
794 def _GetNodeInstancesInner(cfg, fn):
795   return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
796
797
798 def _GetNodePrimaryInstances(cfg, node_name):
799   """Returns primary instances on a node.
800
801   """
802   return _GetNodeInstancesInner(cfg,
803                                 lambda inst: node_name == inst.primary_node)
804
805
806 def _GetNodeSecondaryInstances(cfg, node_name):
807   """Returns secondary instances on a node.
808
809   """
810   return _GetNodeInstancesInner(cfg,
811                                 lambda inst: node_name in inst.secondary_nodes)
812
813
814 def _GetNodeInstances(cfg, node_name):
815   """Returns a list of all primary and secondary instances on a node.
816
817   """
818
819   return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
820
821
822 class LUNodeEvacuate(NoHooksLU):
823   """Evacuates instances off a list of nodes.
824
825   """
826   REQ_BGL = False
827
828   _MODE2IALLOCATOR = {
829     constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
830     constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
831     constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
832     }
833   assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
834   assert (frozenset(_MODE2IALLOCATOR.values()) ==
835           constants.IALLOCATOR_NEVAC_MODES)
836
837   def CheckArguments(self):
838     _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
839
840   def ExpandNames(self):
841     self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
842
843     if self.op.remote_node is not None:
844       self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
845       assert self.op.remote_node
846
847       if self.op.remote_node == self.op.node_name:
848         raise errors.OpPrereqError("Can not use evacuated node as a new"
849                                    " secondary node", errors.ECODE_INVAL)
850
851       if self.op.mode != constants.NODE_EVAC_SEC:
852         raise errors.OpPrereqError("Without the use of an iallocator only"
853                                    " secondary instances can be evacuated",
854                                    errors.ECODE_INVAL)
855
856     # Declare locks
857     self.share_locks = _ShareAll()
858     self.needed_locks = {
859       locking.LEVEL_INSTANCE: [],
860       locking.LEVEL_NODEGROUP: [],
861       locking.LEVEL_NODE: [],
862       }
863
864     # Determine nodes (via group) optimistically, needs verification once locks
865     # have been acquired
866     self.lock_nodes = self._DetermineNodes()
867
868   def _DetermineNodes(self):
869     """Gets the list of nodes to operate on.
870
871     """
872     if self.op.remote_node is None:
873       # Iallocator will choose any node(s) in the same group
874       group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name])
875     else:
876       group_nodes = frozenset([self.op.remote_node])
877
878     # Determine nodes to be locked
879     return set([self.op.node_name]) | group_nodes
880
881   def _DetermineInstances(self):
882     """Builds list of instances to operate on.
883
884     """
885     assert self.op.mode in constants.NODE_EVAC_MODES
886
887     if self.op.mode == constants.NODE_EVAC_PRI:
888       # Primary instances only
889       inst_fn = _GetNodePrimaryInstances
890       assert self.op.remote_node is None, \
891         "Evacuating primary instances requires iallocator"
892     elif self.op.mode == constants.NODE_EVAC_SEC:
893       # Secondary instances only
894       inst_fn = _GetNodeSecondaryInstances
895     else:
896       # All instances
897       assert self.op.mode == constants.NODE_EVAC_ALL
898       inst_fn = _GetNodeInstances
899       # TODO: In 2.6, change the iallocator interface to take an evacuation mode
900       # per instance
901       raise errors.OpPrereqError("Due to an issue with the iallocator"
902                                  " interface it is not possible to evacuate"
903                                  " all instances at once; specify explicitly"
904                                  " whether to evacuate primary or secondary"
905                                  " instances",
906                                  errors.ECODE_INVAL)
907
908     return inst_fn(self.cfg, self.op.node_name)
909
910   def DeclareLocks(self, level):
911     if level == locking.LEVEL_INSTANCE:
912       # Lock instances optimistically, needs verification once node and group
913       # locks have been acquired
914       self.needed_locks[locking.LEVEL_INSTANCE] = \
915         set(i.name for i in self._DetermineInstances())
916
917     elif level == locking.LEVEL_NODEGROUP:
918       # Lock node groups for all potential target nodes optimistically, needs
919       # verification once nodes have been acquired
920       self.needed_locks[locking.LEVEL_NODEGROUP] = \
921         self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
922
923     elif level == locking.LEVEL_NODE:
924       self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
925
926   def CheckPrereq(self):
927     # Verify locks
928     owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
929     owned_nodes = self.owned_locks(locking.LEVEL_NODE)
930     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
931
932     need_nodes = self._DetermineNodes()
933
934     if not owned_nodes.issuperset(need_nodes):
935       raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
936                                  " locks were acquired, current nodes are"
937                                  " are '%s', used to be '%s'; retry the"
938                                  " operation" %
939                                  (self.op.node_name,
940                                   utils.CommaJoin(need_nodes),
941                                   utils.CommaJoin(owned_nodes)),
942                                  errors.ECODE_STATE)
943
944     wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
945     if owned_groups != wanted_groups:
946       raise errors.OpExecError("Node groups changed since locks were acquired,"
947                                " current groups are '%s', used to be '%s';"
948                                " retry the operation" %
949                                (utils.CommaJoin(wanted_groups),
950                                 utils.CommaJoin(owned_groups)))
951
952     # Determine affected instances
953     self.instances = self._DetermineInstances()
954     self.instance_names = [i.name for i in self.instances]
955
956     if set(self.instance_names) != owned_instances:
957       raise errors.OpExecError("Instances on node '%s' changed since locks"
958                                " were acquired, current instances are '%s',"
959                                " used to be '%s'; retry the operation" %
960                                (self.op.node_name,
961                                 utils.CommaJoin(self.instance_names),
962                                 utils.CommaJoin(owned_instances)))
963
964     if self.instance_names:
965       self.LogInfo("Evacuating instances from node '%s': %s",
966                    self.op.node_name,
967                    utils.CommaJoin(utils.NiceSort(self.instance_names)))
968     else:
969       self.LogInfo("No instances to evacuate from node '%s'",
970                    self.op.node_name)
971
972     if self.op.remote_node is not None:
973       for i in self.instances:
974         if i.primary_node == self.op.remote_node:
975           raise errors.OpPrereqError("Node %s is the primary node of"
976                                      " instance %s, cannot use it as"
977                                      " secondary" %
978                                      (self.op.remote_node, i.name),
979                                      errors.ECODE_INVAL)
980
981   def Exec(self, feedback_fn):
982     assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
983
984     if not self.instance_names:
985       # No instances to evacuate
986       jobs = []
987
988     elif self.op.iallocator is not None:
989       # TODO: Implement relocation to other group
990       evac_mode = self._MODE2IALLOCATOR[self.op.mode]
991       req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
992                                      instances=list(self.instance_names))
993       ial = iallocator.IAllocator(self.cfg, self.rpc, req)
994
995       ial.Run(self.op.iallocator)
996
997       if not ial.success:
998         raise errors.OpPrereqError("Can't compute node evacuation using"
999                                    " iallocator '%s': %s" %
1000                                    (self.op.iallocator, ial.info),
1001                                    errors.ECODE_NORES)
1002
1003       jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1004
1005     elif self.op.remote_node is not None:
1006       assert self.op.mode == constants.NODE_EVAC_SEC
1007       jobs = [
1008         [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1009                                         remote_node=self.op.remote_node,
1010                                         disks=[],
1011                                         mode=constants.REPLACE_DISK_CHG,
1012                                         early_release=self.op.early_release)]
1013         for instance_name in self.instance_names]
1014
1015     else:
1016       raise errors.ProgrammerError("No iallocator or remote node")
1017
1018     return ResultWithJobs(jobs)
1019
1020
1021 class LUNodeMigrate(LogicalUnit):
1022   """Migrate all instances from a node.
1023
1024   """
1025   HPATH = "node-migrate"
1026   HTYPE = constants.HTYPE_NODE
1027   REQ_BGL = False
1028
1029   def CheckArguments(self):
1030     pass
1031
1032   def ExpandNames(self):
1033     self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
1034
1035     self.share_locks = _ShareAll()
1036     self.needed_locks = {
1037       locking.LEVEL_NODE: [self.op.node_name],
1038       }
1039
1040   def BuildHooksEnv(self):
1041     """Build hooks env.
1042
1043     This runs on the master, the primary and all the secondaries.
1044
1045     """
1046     return {
1047       "NODE_NAME": self.op.node_name,
1048       "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1049       }
1050
1051   def BuildHooksNodes(self):
1052     """Build hooks nodes.
1053
1054     """
1055     nl = [self.cfg.GetMasterNode()]
1056     return (nl, nl)
1057
1058   def CheckPrereq(self):
1059     pass
1060
1061   def Exec(self, feedback_fn):
1062     # Prepare jobs for migration instances
1063     allow_runtime_changes = self.op.allow_runtime_changes
1064     jobs = [
1065       [opcodes.OpInstanceMigrate(instance_name=inst.name,
1066                                  mode=self.op.mode,
1067                                  live=self.op.live,
1068                                  iallocator=self.op.iallocator,
1069                                  target_node=self.op.target_node,
1070                                  allow_runtime_changes=allow_runtime_changes,
1071                                  ignore_ipolicy=self.op.ignore_ipolicy)]
1072       for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)]
1073
1074     # TODO: Run iallocator in this opcode and pass correct placement options to
1075     # OpInstanceMigrate. Since other jobs can modify the cluster between
1076     # running the iallocator and the actual migration, a good consistency model
1077     # will have to be found.
1078
1079     assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1080             frozenset([self.op.node_name]))
1081
1082     return ResultWithJobs(jobs)
1083
1084
1085 def _GetStorageTypeArgs(cfg, storage_type):
1086   """Returns the arguments for a storage type.
1087
1088   """
1089   # Special case for file storage
1090   if storage_type == constants.ST_FILE:
1091     # storage.FileStorage wants a list of storage directories
1092     return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1093
1094   return []
1095
1096
1097 class LUNodeModifyStorage(NoHooksLU):
1098   """Logical unit for modifying a storage volume on a node.
1099
1100   """
1101   REQ_BGL = False
1102
1103   def CheckArguments(self):
1104     self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
1105
1106     storage_type = self.op.storage_type
1107
1108     try:
1109       modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1110     except KeyError:
1111       raise errors.OpPrereqError("Storage units of type '%s' can not be"
1112                                  " modified" % storage_type,
1113                                  errors.ECODE_INVAL)
1114
1115     diff = set(self.op.changes.keys()) - modifiable
1116     if diff:
1117       raise errors.OpPrereqError("The following fields can not be modified for"
1118                                  " storage units of type '%s': %r" %
1119                                  (storage_type, list(diff)),
1120                                  errors.ECODE_INVAL)
1121
1122   def ExpandNames(self):
1123     self.needed_locks = {
1124       locking.LEVEL_NODE: self.op.node_name,
1125       }
1126
1127   def Exec(self, feedback_fn):
1128     """Computes the list of nodes and their attributes.
1129
1130     """
1131     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1132     result = self.rpc.call_storage_modify(self.op.node_name,
1133                                           self.op.storage_type, st_args,
1134                                           self.op.name, self.op.changes)
1135     result.Raise("Failed to modify storage unit '%s' on %s" %
1136                  (self.op.name, self.op.node_name))
1137
1138
1139 class _NodeQuery(_QueryBase):
1140   FIELDS = query.NODE_FIELDS
1141
1142   def ExpandNames(self, lu):
1143     lu.needed_locks = {}
1144     lu.share_locks = _ShareAll()
1145
1146     if self.names:
1147       self.wanted = _GetWantedNodes(lu, self.names)
1148     else:
1149       self.wanted = locking.ALL_SET
1150
1151     self.do_locking = (self.use_locking and
1152                        query.NQ_LIVE in self.requested_data)
1153
1154     if self.do_locking:
1155       # If any non-static field is requested we need to lock the nodes
1156       lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1157       lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1158
1159   def DeclareLocks(self, lu, level):
1160     pass
1161
1162   def _GetQueryData(self, lu):
1163     """Computes the list of nodes and their attributes.
1164
1165     """
1166     all_info = lu.cfg.GetAllNodesInfo()
1167
1168     nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1169
1170     # Gather data as requested
1171     if query.NQ_LIVE in self.requested_data:
1172       # filter out non-vm_capable nodes
1173       toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
1174
1175       es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, toquery_nodes)
1176       node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()],
1177                                         [lu.cfg.GetHypervisorType()], es_flags)
1178       live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload))
1179                        for (name, nresult) in node_data.items()
1180                        if not nresult.fail_msg and nresult.payload)
1181     else:
1182       live_data = None
1183
1184     if query.NQ_INST in self.requested_data:
1185       node_to_primary = dict([(name, set()) for name in nodenames])
1186       node_to_secondary = dict([(name, set()) for name in nodenames])
1187
1188       inst_data = lu.cfg.GetAllInstancesInfo()
1189
1190       for inst in inst_data.values():
1191         if inst.primary_node in node_to_primary:
1192           node_to_primary[inst.primary_node].add(inst.name)
1193         for secnode in inst.secondary_nodes:
1194           if secnode in node_to_secondary:
1195             node_to_secondary[secnode].add(inst.name)
1196     else:
1197       node_to_primary = None
1198       node_to_secondary = None
1199
1200     if query.NQ_OOB in self.requested_data:
1201       oob_support = dict((name, bool(_SupportsOob(lu.cfg, node)))
1202                          for name, node in all_info.iteritems())
1203     else:
1204       oob_support = None
1205
1206     if query.NQ_GROUP in self.requested_data:
1207       groups = lu.cfg.GetAllNodeGroupsInfo()
1208     else:
1209       groups = {}
1210
1211     return query.NodeQueryData([all_info[name] for name in nodenames],
1212                                live_data, lu.cfg.GetMasterNode(),
1213                                node_to_primary, node_to_secondary, groups,
1214                                oob_support, lu.cfg.GetClusterInfo())
1215
1216
1217 class LUNodeQuery(NoHooksLU):
1218   """Logical unit for querying nodes.
1219
1220   """
1221   # pylint: disable=W0142
1222   REQ_BGL = False
1223
1224   def CheckArguments(self):
1225     self.nq = _NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1226                          self.op.output_fields, self.op.use_locking)
1227
1228   def ExpandNames(self):
1229     self.nq.ExpandNames(self)
1230
1231   def DeclareLocks(self, level):
1232     self.nq.DeclareLocks(self, level)
1233
1234   def Exec(self, feedback_fn):
1235     return self.nq.OldStyleQuery(self)
1236
1237
1238 def _CheckOutputFields(static, dynamic, selected):
1239   """Checks whether all selected fields are valid.
1240
1241   @type static: L{utils.FieldSet}
1242   @param static: static fields set
1243   @type dynamic: L{utils.FieldSet}
1244   @param dynamic: dynamic fields set
1245
1246   """
1247   f = utils.FieldSet()
1248   f.Extend(static)
1249   f.Extend(dynamic)
1250
1251   delta = f.NonMatching(selected)
1252   if delta:
1253     raise errors.OpPrereqError("Unknown output fields selected: %s"
1254                                % ",".join(delta), errors.ECODE_INVAL)
1255
1256
1257 class LUNodeQueryvols(NoHooksLU):
1258   """Logical unit for getting volumes on node(s).
1259
1260   """
1261   REQ_BGL = False
1262   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1263   _FIELDS_STATIC = utils.FieldSet("node")
1264
1265   def CheckArguments(self):
1266     _CheckOutputFields(static=self._FIELDS_STATIC,
1267                        dynamic=self._FIELDS_DYNAMIC,
1268                        selected=self.op.output_fields)
1269
1270   def ExpandNames(self):
1271     self.share_locks = _ShareAll()
1272
1273     if self.op.nodes:
1274       self.needed_locks = {
1275         locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes),
1276         }
1277     else:
1278       self.needed_locks = {
1279         locking.LEVEL_NODE: locking.ALL_SET,
1280         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1281         }
1282
1283   def Exec(self, feedback_fn):
1284     """Computes the list of nodes and their attributes.
1285
1286     """
1287     nodenames = self.owned_locks(locking.LEVEL_NODE)
1288     volumes = self.rpc.call_node_volumes(nodenames)
1289
1290     ilist = self.cfg.GetAllInstancesInfo()
1291     vol2inst = _MapInstanceDisksToNodes(ilist.values())
1292
1293     output = []
1294     for node in nodenames:
1295       nresult = volumes[node]
1296       if nresult.offline:
1297         continue
1298       msg = nresult.fail_msg
1299       if msg:
1300         self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
1301         continue
1302
1303       node_vols = sorted(nresult.payload,
1304                          key=operator.itemgetter("dev"))
1305
1306       for vol in node_vols:
1307         node_output = []
1308         for field in self.op.output_fields:
1309           if field == "node":
1310             val = node
1311           elif field == "phys":
1312             val = vol["dev"]
1313           elif field == "vg":
1314             val = vol["vg"]
1315           elif field == "name":
1316             val = vol["name"]
1317           elif field == "size":
1318             val = int(float(vol["size"]))
1319           elif field == "instance":
1320             val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-")
1321           else:
1322             raise errors.ParameterError(field)
1323           node_output.append(str(val))
1324
1325         output.append(node_output)
1326
1327     return output
1328
1329
1330 class LUNodeQueryStorage(NoHooksLU):
1331   """Logical unit for getting information on storage units on node(s).
1332
1333   """
1334   _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1335   REQ_BGL = False
1336
1337   def CheckArguments(self):
1338     _CheckOutputFields(static=self._FIELDS_STATIC,
1339                        dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1340                        selected=self.op.output_fields)
1341
1342   def ExpandNames(self):
1343     self.share_locks = _ShareAll()
1344
1345     if self.op.nodes:
1346       self.needed_locks = {
1347         locking.LEVEL_NODE: _GetWantedNodes(self, self.op.nodes),
1348         }
1349     else:
1350       self.needed_locks = {
1351         locking.LEVEL_NODE: locking.ALL_SET,
1352         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1353         }
1354
1355   def Exec(self, feedback_fn):
1356     """Computes the list of nodes and their attributes.
1357
1358     """
1359     self.nodes = self.owned_locks(locking.LEVEL_NODE)
1360
1361     # Always get name to sort by
1362     if constants.SF_NAME in self.op.output_fields:
1363       fields = self.op.output_fields[:]
1364     else:
1365       fields = [constants.SF_NAME] + self.op.output_fields
1366
1367     # Never ask for node or type as it's only known to the LU
1368     for extra in [constants.SF_NODE, constants.SF_TYPE]:
1369       while extra in fields:
1370         fields.remove(extra)
1371
1372     field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1373     name_idx = field_idx[constants.SF_NAME]
1374
1375     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1376     data = self.rpc.call_storage_list(self.nodes,
1377                                       self.op.storage_type, st_args,
1378                                       self.op.name, fields)
1379
1380     result = []
1381
1382     for node in utils.NiceSort(self.nodes):
1383       nresult = data[node]
1384       if nresult.offline:
1385         continue
1386
1387       msg = nresult.fail_msg
1388       if msg:
1389         self.LogWarning("Can't get storage data from node %s: %s", node, msg)
1390         continue
1391
1392       rows = dict([(row[name_idx], row) for row in nresult.payload])
1393
1394       for name in utils.NiceSort(rows.keys()):
1395         row = rows[name]
1396
1397         out = []
1398
1399         for field in self.op.output_fields:
1400           if field == constants.SF_NODE:
1401             val = node
1402           elif field == constants.SF_TYPE:
1403             val = self.op.storage_type
1404           elif field in field_idx:
1405             val = row[field_idx[field]]
1406           else:
1407             raise errors.ParameterError(field)
1408
1409           out.append(val)
1410
1411         result.append(out)
1412
1413     return result
1414
1415
1416 class LUNodeRemove(LogicalUnit):
1417   """Logical unit for removing a node.
1418
1419   """
1420   HPATH = "node-remove"
1421   HTYPE = constants.HTYPE_NODE
1422
1423   def BuildHooksEnv(self):
1424     """Build hooks env.
1425
1426     """
1427     return {
1428       "OP_TARGET": self.op.node_name,
1429       "NODE_NAME": self.op.node_name,
1430       }
1431
1432   def BuildHooksNodes(self):
1433     """Build hooks nodes.
1434
1435     This doesn't run on the target node in the pre phase as a failed
1436     node would then be impossible to remove.
1437
1438     """
1439     all_nodes = self.cfg.GetNodeList()
1440     try:
1441       all_nodes.remove(self.op.node_name)
1442     except ValueError:
1443       pass
1444     return (all_nodes, all_nodes)
1445
1446   def CheckPrereq(self):
1447     """Check prerequisites.
1448
1449     This checks:
1450      - the node exists in the configuration
1451      - it does not have primary or secondary instances
1452      - it's not the master
1453
1454     Any errors are signaled by raising errors.OpPrereqError.
1455
1456     """
1457     self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
1458     node = self.cfg.GetNodeInfo(self.op.node_name)
1459     assert node is not None
1460
1461     masternode = self.cfg.GetMasterNode()
1462     if node.name == masternode:
1463       raise errors.OpPrereqError("Node is the master node, failover to another"
1464                                  " node is required", errors.ECODE_INVAL)
1465
1466     for instance_name, instance in self.cfg.GetAllInstancesInfo().items():
1467       if node.name in instance.all_nodes:
1468         raise errors.OpPrereqError("Instance %s is still running on the node,"
1469                                    " please remove first" % instance_name,
1470                                    errors.ECODE_INVAL)
1471     self.op.node_name = node.name
1472     self.node = node
1473
1474   def Exec(self, feedback_fn):
1475     """Removes the node from the cluster.
1476
1477     """
1478     node = self.node
1479     logging.info("Stopping the node daemon and removing configs from node %s",
1480                  node.name)
1481
1482     modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1483
1484     assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1485       "Not owning BGL"
1486
1487     # Promote nodes to master candidate as needed
1488     _AdjustCandidatePool(self, exceptions=[node.name])
1489     self.context.RemoveNode(node.name)
1490
1491     # Run post hooks on the node before it's removed
1492     _RunPostHook(self, node.name)
1493
1494     result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
1495     msg = result.fail_msg
1496     if msg:
1497       self.LogWarning("Errors encountered on the remote node while leaving"
1498                       " the cluster: %s", msg)
1499
1500     # Remove node from our /etc/hosts
1501     if self.cfg.GetClusterInfo().modify_etc_hosts:
1502       master_node = self.cfg.GetMasterNode()
1503       result = self.rpc.call_etc_hosts_modify(master_node,
1504                                               constants.ETC_HOSTS_REMOVE,
1505                                               node.name, None)
1506       result.Raise("Can't update hosts file with new host data")
1507       _RedistributeAncillaryFiles(self)
1508
1509
1510 class LURepairNodeStorage(NoHooksLU):
1511   """Repairs the volume group on a node.
1512
1513   """
1514   REQ_BGL = False
1515
1516   def CheckArguments(self):
1517     self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
1518
1519     storage_type = self.op.storage_type
1520
1521     if (constants.SO_FIX_CONSISTENCY not in
1522         constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1523       raise errors.OpPrereqError("Storage units of type '%s' can not be"
1524                                  " repaired" % storage_type,
1525                                  errors.ECODE_INVAL)
1526
1527   def ExpandNames(self):
1528     self.needed_locks = {
1529       locking.LEVEL_NODE: [self.op.node_name],
1530       }
1531
1532   def _CheckFaultyDisks(self, instance, node_name):
1533     """Ensure faulty disks abort the opcode or at least warn."""
1534     try:
1535       if _FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1536                                   node_name, True):
1537         raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1538                                    " node '%s'" % (instance.name, node_name),
1539                                    errors.ECODE_STATE)
1540     except errors.OpPrereqError, err:
1541       if self.op.ignore_consistency:
1542         self.LogWarning(str(err.args[0]))
1543       else:
1544         raise
1545
1546   def CheckPrereq(self):
1547     """Check prerequisites.
1548
1549     """
1550     # Check whether any instance on this node has faulty disks
1551     for inst in _GetNodeInstances(self.cfg, self.op.node_name):
1552       if inst.admin_state != constants.ADMINST_UP:
1553         continue
1554       check_nodes = set(inst.all_nodes)
1555       check_nodes.discard(self.op.node_name)
1556       for inst_node_name in check_nodes:
1557         self._CheckFaultyDisks(inst, inst_node_name)
1558
1559   def Exec(self, feedback_fn):
1560     feedback_fn("Repairing storage unit '%s' on %s ..." %
1561                 (self.op.name, self.op.node_name))
1562
1563     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1564     result = self.rpc.call_storage_execute(self.op.node_name,
1565                                            self.op.storage_type, st_args,
1566                                            self.op.name,
1567                                            constants.SO_FIX_CONSISTENCY)
1568     result.Raise("Failed to repair storage unit '%s' on %s" %
1569                  (self.op.name, self.op.node_name))