cmdlib: Add logging for tasklets
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import re
30 import platform
31 import logging
32 import copy
33
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
43
44
45 class LogicalUnit(object):
46   """Logical Unit base class.
47
48   Subclasses must follow these rules:
49     - implement ExpandNames
50     - implement CheckPrereq (except when tasklets are used)
51     - implement Exec (except when tasklets are used)
52     - implement BuildHooksEnv
53     - redefine HPATH and HTYPE
54     - optionally redefine their run requirements:
55         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56
57   Note that all commands require root permissions.
58
59   @ivar dry_run_result: the value (if any) that will be returned to the caller
60       in dry-run mode (signalled by opcode dry_run parameter)
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overridden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict.fromkeys(locking.LEVELS, 0)
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92     self.LogStep = processor.LogStep
93     # support for dry-run
94     self.dry_run_result = None
95
96     # Tasklets
97     self.tasklets = None
98
99     for attr_name in self._OP_REQP:
100       attr_val = getattr(op, attr_name, None)
101       if attr_val is None:
102         raise errors.OpPrereqError("Required parameter '%s' missing" %
103                                    attr_name)
104
105     self.CheckArguments()
106
107   def __GetSSH(self):
108     """Returns the SshRunner object
109
110     """
111     if not self.__ssh:
112       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113     return self.__ssh
114
115   ssh = property(fget=__GetSSH)
116
117   def CheckArguments(self):
118     """Check syntactic validity for the opcode arguments.
119
120     This method is for doing a simple syntactic check and ensure
121     validity of opcode parameters, without any cluster-related
122     checks. While the same can be accomplished in ExpandNames and/or
123     CheckPrereq, doing these separate is better because:
124
125       - ExpandNames is left as as purely a lock-related function
126       - CheckPrereq is run after we have acquired locks (and possible
127         waited for them)
128
129     The function is allowed to change the self.op attribute so that
130     later methods can no longer worry about missing parameters.
131
132     """
133     pass
134
135   def ExpandNames(self):
136     """Expand names for this LU.
137
138     This method is called before starting to execute the opcode, and it should
139     update all the parameters of the opcode to their canonical form (e.g. a
140     short node name must be fully expanded after this method has successfully
141     completed). This way locking, hooks, logging, ecc. can work correctly.
142
143     LUs which implement this method must also populate the self.needed_locks
144     member, as a dict with lock levels as keys, and a list of needed lock names
145     as values. Rules:
146
147       - use an empty dict if you don't need any lock
148       - if you don't need any lock at a particular level omit that level
149       - don't put anything for the BGL level
150       - if you want all locks at a level use locking.ALL_SET as a value
151
152     If you need to share locks (rather than acquire them exclusively) at one
153     level you can modify self.share_locks, setting a true value (usually 1) for
154     that level. By default locks are not shared.
155
156     This function can also define a list of tasklets, which then will be
157     executed in order instead of the usual LU-level CheckPrereq and Exec
158     functions, if those are not defined by the LU.
159
160     Examples::
161
162       # Acquire all nodes and one instance
163       self.needed_locks = {
164         locking.LEVEL_NODE: locking.ALL_SET,
165         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
166       }
167       # Acquire just two nodes
168       self.needed_locks = {
169         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
170       }
171       # Acquire no locks
172       self.needed_locks = {} # No, you can't leave it to the default value None
173
174     """
175     # The implementation of this method is mandatory only if the new LU is
176     # concurrent, so that old LUs don't need to be changed all at the same
177     # time.
178     if self.REQ_BGL:
179       self.needed_locks = {} # Exclusive LUs don't need locks.
180     else:
181       raise NotImplementedError
182
183   def DeclareLocks(self, level):
184     """Declare LU locking needs for a level
185
186     While most LUs can just declare their locking needs at ExpandNames time,
187     sometimes there's the need to calculate some locks after having acquired
188     the ones before. This function is called just before acquiring locks at a
189     particular level, but after acquiring the ones at lower levels, and permits
190     such calculations. It can be used to modify self.needed_locks, and by
191     default it does nothing.
192
193     This function is only called if you have something already set in
194     self.needed_locks for the level.
195
196     @param level: Locking level which is going to be locked
197     @type level: member of ganeti.locking.LEVELS
198
199     """
200
201   def CheckPrereq(self):
202     """Check prerequisites for this LU.
203
204     This method should check that the prerequisites for the execution
205     of this LU are fulfilled. It can do internode communication, but
206     it should be idempotent - no cluster or system changes are
207     allowed.
208
209     The method should raise errors.OpPrereqError in case something is
210     not fulfilled. Its return value is ignored.
211
212     This method should also update all the parameters of the opcode to
213     their canonical form if it hasn't been done by ExpandNames before.
214
215     """
216     if self.tasklets is not None:
217       for (idx, tl) in enumerate(self.tasklets):
218         logging.info("Checking prerequisites for tasklet %s/%s",
219                      idx + 1, len(self.tasklets))
220         tl.CheckPrereq()
221     else:
222       raise NotImplementedError
223
224   def Exec(self, feedback_fn):
225     """Execute the LU.
226
227     This method should implement the actual work. It should raise
228     errors.OpExecError for failures that are somewhat dealt with in
229     code, or expected.
230
231     """
232     if self.tasklets is not None:
233       for (idx, tl) in enumerate(self.tasklets):
234         logging.info("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
235         tl.Exec(feedback_fn)
236     else:
237       raise NotImplementedError
238
239   def BuildHooksEnv(self):
240     """Build hooks environment for this LU.
241
242     This method should return a three-node tuple consisting of: a dict
243     containing the environment that will be used for running the
244     specific hook for this LU, a list of node names on which the hook
245     should run before the execution, and a list of node names on which
246     the hook should run after the execution.
247
248     The keys of the dict must not have 'GANETI_' prefixed as this will
249     be handled in the hooks runner. Also note additional keys will be
250     added by the hooks runner. If the LU doesn't define any
251     environment, an empty dict (and not None) should be returned.
252
253     No nodes should be returned as an empty list (and not None).
254
255     Note that if the HPATH for a LU class is None, this function will
256     not be called.
257
258     """
259     raise NotImplementedError
260
261   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262     """Notify the LU about the results of its hooks.
263
264     This method is called every time a hooks phase is executed, and notifies
265     the Logical Unit about the hooks' result. The LU can then use it to alter
266     its result based on the hooks.  By default the method does nothing and the
267     previous result is passed back unchanged but any LU can define it if it
268     wants to use the local cluster hook-scripts somehow.
269
270     @param phase: one of L{constants.HOOKS_PHASE_POST} or
271         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272     @param hook_results: the results of the multi-node hooks rpc call
273     @param feedback_fn: function used send feedback back to the caller
274     @param lu_result: the previous Exec result this LU had, or None
275         in the PRE phase
276     @return: the new Exec result, based on the previous result
277         and hook results
278
279     """
280     return lu_result
281
282   def _ExpandAndLockInstance(self):
283     """Helper function to expand and lock an instance.
284
285     Many LUs that work on an instance take its name in self.op.instance_name
286     and need to expand it and then declare the expanded name for locking. This
287     function does it, and then updates self.op.instance_name to the expanded
288     name. It also initializes needed_locks as a dict, if this hasn't been done
289     before.
290
291     """
292     if self.needed_locks is None:
293       self.needed_locks = {}
294     else:
295       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296         "_ExpandAndLockInstance called with instance-level locks set"
297     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298     if expanded_name is None:
299       raise errors.OpPrereqError("Instance '%s' not known" %
300                                   self.op.instance_name)
301     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302     self.op.instance_name = expanded_name
303
304   def _LockInstancesNodes(self, primary_only=False):
305     """Helper function to declare instances' nodes for locking.
306
307     This function should be called after locking one or more instances to lock
308     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309     with all primary or secondary nodes for instances already locked and
310     present in self.needed_locks[locking.LEVEL_INSTANCE].
311
312     It should be called from DeclareLocks, and for safety only works if
313     self.recalculate_locks[locking.LEVEL_NODE] is set.
314
315     In the future it may grow parameters to just lock some instance's nodes, or
316     to just lock primaries or secondary nodes, if needed.
317
318     If should be called in DeclareLocks in a way similar to::
319
320       if level == locking.LEVEL_NODE:
321         self._LockInstancesNodes()
322
323     @type primary_only: boolean
324     @param primary_only: only lock primary nodes of locked instances
325
326     """
327     assert locking.LEVEL_NODE in self.recalculate_locks, \
328       "_LockInstancesNodes helper function called with no nodes to recalculate"
329
330     # TODO: check if we're really been called with the instance locks held
331
332     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333     # future we might want to have different behaviors depending on the value
334     # of self.recalculate_locks[locking.LEVEL_NODE]
335     wanted_nodes = []
336     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337       instance = self.context.cfg.GetInstanceInfo(instance_name)
338       wanted_nodes.append(instance.primary_node)
339       if not primary_only:
340         wanted_nodes.extend(instance.secondary_nodes)
341
342     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
346
347     del self.recalculate_locks[locking.LEVEL_NODE]
348
349
350 class NoHooksLU(LogicalUnit):
351   """Simple LU which runs no hooks.
352
353   This LU is intended as a parent for other LogicalUnits which will
354   run no hooks, in order to reduce duplicate code.
355
356   """
357   HPATH = None
358   HTYPE = None
359
360
361 class Tasklet:
362   """Tasklet base class.
363
364   Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365   they can mix legacy code with tasklets. Locking needs to be done in the LU,
366   tasklets know nothing about locks.
367
368   Subclasses must follow these rules:
369     - Implement CheckPrereq
370     - Implement Exec
371
372   """
373   def __init__(self, lu):
374     self.lu = lu
375
376     # Shortcuts
377     self.cfg = lu.cfg
378     self.rpc = lu.rpc
379
380   def CheckPrereq(self):
381     """Check prerequisites for this tasklets.
382
383     This method should check whether the prerequisites for the execution of
384     this tasklet are fulfilled. It can do internode communication, but it
385     should be idempotent - no cluster or system changes are allowed.
386
387     The method should raise errors.OpPrereqError in case something is not
388     fulfilled. Its return value is ignored.
389
390     This method should also update all parameters to their canonical form if it
391     hasn't been done before.
392
393     """
394     raise NotImplementedError
395
396   def Exec(self, feedback_fn):
397     """Execute the tasklet.
398
399     This method should implement the actual work. It should raise
400     errors.OpExecError for failures that are somewhat dealt with in code, or
401     expected.
402
403     """
404     raise NotImplementedError
405
406
407 def _GetWantedNodes(lu, nodes):
408   """Returns list of checked and expanded node names.
409
410   @type lu: L{LogicalUnit}
411   @param lu: the logical unit on whose behalf we execute
412   @type nodes: list
413   @param nodes: list of node names or None for all nodes
414   @rtype: list
415   @return: the list of nodes, sorted
416   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
417
418   """
419   if not isinstance(nodes, list):
420     raise errors.OpPrereqError("Invalid argument type 'nodes'")
421
422   if not nodes:
423     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424       " non-empty list of nodes whose name is to be expanded.")
425
426   wanted = []
427   for name in nodes:
428     node = lu.cfg.ExpandNodeName(name)
429     if node is None:
430       raise errors.OpPrereqError("No such node name '%s'" % name)
431     wanted.append(node)
432
433   return utils.NiceSort(wanted)
434
435
436 def _GetWantedInstances(lu, instances):
437   """Returns list of checked and expanded instance names.
438
439   @type lu: L{LogicalUnit}
440   @param lu: the logical unit on whose behalf we execute
441   @type instances: list
442   @param instances: list of instance names or None for all instances
443   @rtype: list
444   @return: the list of instances, sorted
445   @raise errors.OpPrereqError: if the instances parameter is wrong type
446   @raise errors.OpPrereqError: if any of the passed instances is not found
447
448   """
449   if not isinstance(instances, list):
450     raise errors.OpPrereqError("Invalid argument type 'instances'")
451
452   if instances:
453     wanted = []
454
455     for name in instances:
456       instance = lu.cfg.ExpandInstanceName(name)
457       if instance is None:
458         raise errors.OpPrereqError("No such instance name '%s'" % name)
459       wanted.append(instance)
460
461   else:
462     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
463   return wanted
464
465
466 def _CheckOutputFields(static, dynamic, selected):
467   """Checks whether all selected fields are valid.
468
469   @type static: L{utils.FieldSet}
470   @param static: static fields set
471   @type dynamic: L{utils.FieldSet}
472   @param dynamic: dynamic fields set
473
474   """
475   f = utils.FieldSet()
476   f.Extend(static)
477   f.Extend(dynamic)
478
479   delta = f.NonMatching(selected)
480   if delta:
481     raise errors.OpPrereqError("Unknown output fields selected: %s"
482                                % ",".join(delta))
483
484
485 def _CheckBooleanOpField(op, name):
486   """Validates boolean opcode parameters.
487
488   This will ensure that an opcode parameter is either a boolean value,
489   or None (but that it always exists).
490
491   """
492   val = getattr(op, name, None)
493   if not (val is None or isinstance(val, bool)):
494     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
495                                (name, str(val)))
496   setattr(op, name, val)
497
498
499 def _CheckNodeOnline(lu, node):
500   """Ensure that a given node is online.
501
502   @param lu: the LU on behalf of which we make the check
503   @param node: the node to check
504   @raise errors.OpPrereqError: if the node is offline
505
506   """
507   if lu.cfg.GetNodeInfo(node).offline:
508     raise errors.OpPrereqError("Can't use offline node %s" % node)
509
510
511 def _CheckNodeNotDrained(lu, node):
512   """Ensure that a given node is not drained.
513
514   @param lu: the LU on behalf of which we make the check
515   @param node: the node to check
516   @raise errors.OpPrereqError: if the node is drained
517
518   """
519   if lu.cfg.GetNodeInfo(node).drained:
520     raise errors.OpPrereqError("Can't use drained node %s" % node)
521
522
523 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524                           memory, vcpus, nics, disk_template, disks,
525                           bep, hvp, hypervisor_name):
526   """Builds instance related env variables for hooks
527
528   This builds the hook environment from individual variables.
529
530   @type name: string
531   @param name: the name of the instance
532   @type primary_node: string
533   @param primary_node: the name of the instance's primary node
534   @type secondary_nodes: list
535   @param secondary_nodes: list of secondary nodes as strings
536   @type os_type: string
537   @param os_type: the name of the instance's OS
538   @type status: boolean
539   @param status: the should_run status of the instance
540   @type memory: string
541   @param memory: the memory size of the instance
542   @type vcpus: string
543   @param vcpus: the count of VCPUs the instance has
544   @type nics: list
545   @param nics: list of tuples (ip, mac, mode, link) representing
546       the NICs the instance has
547   @type disk_template: string
548   @param disk_template: the disk template of the instance
549   @type disks: list
550   @param disks: the list of (size, mode) pairs
551   @type bep: dict
552   @param bep: the backend parameters for the instance
553   @type hvp: dict
554   @param hvp: the hypervisor parameters for the instance
555   @type hypervisor_name: string
556   @param hypervisor_name: the hypervisor for the instance
557   @rtype: dict
558   @return: the hook environment for this instance
559
560   """
561   if status:
562     str_status = "up"
563   else:
564     str_status = "down"
565   env = {
566     "OP_TARGET": name,
567     "INSTANCE_NAME": name,
568     "INSTANCE_PRIMARY": primary_node,
569     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570     "INSTANCE_OS_TYPE": os_type,
571     "INSTANCE_STATUS": str_status,
572     "INSTANCE_MEMORY": memory,
573     "INSTANCE_VCPUS": vcpus,
574     "INSTANCE_DISK_TEMPLATE": disk_template,
575     "INSTANCE_HYPERVISOR": hypervisor_name,
576   }
577
578   if nics:
579     nic_count = len(nics)
580     for idx, (ip, mac, mode, link) in enumerate(nics):
581       if ip is None:
582         ip = ""
583       env["INSTANCE_NIC%d_IP" % idx] = ip
584       env["INSTANCE_NIC%d_MAC" % idx] = mac
585       env["INSTANCE_NIC%d_MODE" % idx] = mode
586       env["INSTANCE_NIC%d_LINK" % idx] = link
587       if mode == constants.NIC_MODE_BRIDGED:
588         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
589   else:
590     nic_count = 0
591
592   env["INSTANCE_NIC_COUNT"] = nic_count
593
594   if disks:
595     disk_count = len(disks)
596     for idx, (size, mode) in enumerate(disks):
597       env["INSTANCE_DISK%d_SIZE" % idx] = size
598       env["INSTANCE_DISK%d_MODE" % idx] = mode
599   else:
600     disk_count = 0
601
602   env["INSTANCE_DISK_COUNT"] = disk_count
603
604   for source, kind in [(bep, "BE"), (hvp, "HV")]:
605     for key, value in source.items():
606       env["INSTANCE_%s_%s" % (kind, key)] = value
607
608   return env
609
610 def _NICListToTuple(lu, nics):
611   """Build a list of nic information tuples.
612
613   This list is suitable to be passed to _BuildInstanceHookEnv or as a return
614   value in LUQueryInstanceData.
615
616   @type lu:  L{LogicalUnit}
617   @param lu: the logical unit on whose behalf we execute
618   @type nics: list of L{objects.NIC}
619   @param nics: list of nics to convert to hooks tuples
620
621   """
622   hooks_nics = []
623   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
624   for nic in nics:
625     ip = nic.ip
626     mac = nic.mac
627     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
628     mode = filled_params[constants.NIC_MODE]
629     link = filled_params[constants.NIC_LINK]
630     hooks_nics.append((ip, mac, mode, link))
631   return hooks_nics
632
633 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
634   """Builds instance related env variables for hooks from an object.
635
636   @type lu: L{LogicalUnit}
637   @param lu: the logical unit on whose behalf we execute
638   @type instance: L{objects.Instance}
639   @param instance: the instance for which we should build the
640       environment
641   @type override: dict
642   @param override: dictionary with key/values that will override
643       our values
644   @rtype: dict
645   @return: the hook environment dictionary
646
647   """
648   cluster = lu.cfg.GetClusterInfo()
649   bep = cluster.FillBE(instance)
650   hvp = cluster.FillHV(instance)
651   args = {
652     'name': instance.name,
653     'primary_node': instance.primary_node,
654     'secondary_nodes': instance.secondary_nodes,
655     'os_type': instance.os,
656     'status': instance.admin_up,
657     'memory': bep[constants.BE_MEMORY],
658     'vcpus': bep[constants.BE_VCPUS],
659     'nics': _NICListToTuple(lu, instance.nics),
660     'disk_template': instance.disk_template,
661     'disks': [(disk.size, disk.mode) for disk in instance.disks],
662     'bep': bep,
663     'hvp': hvp,
664     'hypervisor_name': instance.hypervisor,
665   }
666   if override:
667     args.update(override)
668   return _BuildInstanceHookEnv(**args)
669
670
671 def _AdjustCandidatePool(lu):
672   """Adjust the candidate pool after node operations.
673
674   """
675   mod_list = lu.cfg.MaintainCandidatePool()
676   if mod_list:
677     lu.LogInfo("Promoted nodes to master candidate role: %s",
678                ", ".join(node.name for node in mod_list))
679     for name in mod_list:
680       lu.context.ReaddNode(name)
681   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
682   if mc_now > mc_max:
683     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
684                (mc_now, mc_max))
685
686
687 def _CheckNicsBridgesExist(lu, target_nics, target_node,
688                                profile=constants.PP_DEFAULT):
689   """Check that the brigdes needed by a list of nics exist.
690
691   """
692   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
693   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
694                 for nic in target_nics]
695   brlist = [params[constants.NIC_LINK] for params in paramslist
696             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
697   if brlist:
698     result = lu.rpc.call_bridges_exist(target_node, brlist)
699     result.Raise("Error checking bridges on destination node '%s'" %
700                  target_node, prereq=True)
701
702
703 def _CheckInstanceBridgesExist(lu, instance, node=None):
704   """Check that the brigdes needed by an instance exist.
705
706   """
707   if node is None:
708     node = instance.primary_node
709   _CheckNicsBridgesExist(lu, instance.nics, node)
710
711
712 def _GetNodeSecondaryInstances(cfg, node_name):
713   """Returns secondary instances on a node.
714
715   """
716   instances = []
717
718   for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
719     if node_name in inst.secondary_nodes:
720       instances.append(inst)
721
722   return instances
723
724
725 class LUDestroyCluster(NoHooksLU):
726   """Logical unit for destroying the cluster.
727
728   """
729   _OP_REQP = []
730
731   def CheckPrereq(self):
732     """Check prerequisites.
733
734     This checks whether the cluster is empty.
735
736     Any errors are signaled by raising errors.OpPrereqError.
737
738     """
739     master = self.cfg.GetMasterNode()
740
741     nodelist = self.cfg.GetNodeList()
742     if len(nodelist) != 1 or nodelist[0] != master:
743       raise errors.OpPrereqError("There are still %d node(s) in"
744                                  " this cluster." % (len(nodelist) - 1))
745     instancelist = self.cfg.GetInstanceList()
746     if instancelist:
747       raise errors.OpPrereqError("There are still %d instance(s) in"
748                                  " this cluster." % len(instancelist))
749
750   def Exec(self, feedback_fn):
751     """Destroys the cluster.
752
753     """
754     master = self.cfg.GetMasterNode()
755     result = self.rpc.call_node_stop_master(master, False)
756     result.Raise("Could not disable the master role")
757     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
758     utils.CreateBackup(priv_key)
759     utils.CreateBackup(pub_key)
760     return master
761
762
763 class LUVerifyCluster(LogicalUnit):
764   """Verifies the cluster status.
765
766   """
767   HPATH = "cluster-verify"
768   HTYPE = constants.HTYPE_CLUSTER
769   _OP_REQP = ["skip_checks"]
770   REQ_BGL = False
771
772   def ExpandNames(self):
773     self.needed_locks = {
774       locking.LEVEL_NODE: locking.ALL_SET,
775       locking.LEVEL_INSTANCE: locking.ALL_SET,
776     }
777     self.share_locks = dict.fromkeys(locking.LEVELS, 1)
778
779   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
780                   node_result, feedback_fn, master_files,
781                   drbd_map, vg_name):
782     """Run multiple tests against a node.
783
784     Test list:
785
786       - compares ganeti version
787       - checks vg existence and size > 20G
788       - checks config file checksum
789       - checks ssh to other nodes
790
791     @type nodeinfo: L{objects.Node}
792     @param nodeinfo: the node to check
793     @param file_list: required list of files
794     @param local_cksum: dictionary of local files and their checksums
795     @param node_result: the results from the node
796     @param feedback_fn: function used to accumulate results
797     @param master_files: list of files that only masters should have
798     @param drbd_map: the useddrbd minors for this node, in
799         form of minor: (instance, must_exist) which correspond to instances
800         and their running status
801     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
802
803     """
804     node = nodeinfo.name
805
806     # main result, node_result should be a non-empty dict
807     if not node_result or not isinstance(node_result, dict):
808       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
809       return True
810
811     # compares ganeti version
812     local_version = constants.PROTOCOL_VERSION
813     remote_version = node_result.get('version', None)
814     if not (remote_version and isinstance(remote_version, (list, tuple)) and
815             len(remote_version) == 2):
816       feedback_fn("  - ERROR: connection to %s failed" % (node))
817       return True
818
819     if local_version != remote_version[0]:
820       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
821                   " node %s %s" % (local_version, node, remote_version[0]))
822       return True
823
824     # node seems compatible, we can actually try to look into its results
825
826     bad = False
827
828     # full package version
829     if constants.RELEASE_VERSION != remote_version[1]:
830       feedback_fn("  - WARNING: software version mismatch: master %s,"
831                   " node %s %s" %
832                   (constants.RELEASE_VERSION, node, remote_version[1]))
833
834     # checks vg existence and size > 20G
835     if vg_name is not None:
836       vglist = node_result.get(constants.NV_VGLIST, None)
837       if not vglist:
838         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
839                         (node,))
840         bad = True
841       else:
842         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
843                                               constants.MIN_VG_SIZE)
844         if vgstatus:
845           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
846           bad = True
847
848     # checks config file checksum
849
850     remote_cksum = node_result.get(constants.NV_FILELIST, None)
851     if not isinstance(remote_cksum, dict):
852       bad = True
853       feedback_fn("  - ERROR: node hasn't returned file checksum data")
854     else:
855       for file_name in file_list:
856         node_is_mc = nodeinfo.master_candidate
857         must_have_file = file_name not in master_files
858         if file_name not in remote_cksum:
859           if node_is_mc or must_have_file:
860             bad = True
861             feedback_fn("  - ERROR: file '%s' missing" % file_name)
862         elif remote_cksum[file_name] != local_cksum[file_name]:
863           if node_is_mc or must_have_file:
864             bad = True
865             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
866           else:
867             # not candidate and this is not a must-have file
868             bad = True
869             feedback_fn("  - ERROR: file '%s' should not exist on non master"
870                         " candidates (and the file is outdated)" % file_name)
871         else:
872           # all good, except non-master/non-must have combination
873           if not node_is_mc and not must_have_file:
874             feedback_fn("  - ERROR: file '%s' should not exist on non master"
875                         " candidates" % file_name)
876
877     # checks ssh to any
878
879     if constants.NV_NODELIST not in node_result:
880       bad = True
881       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
882     else:
883       if node_result[constants.NV_NODELIST]:
884         bad = True
885         for node in node_result[constants.NV_NODELIST]:
886           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
887                           (node, node_result[constants.NV_NODELIST][node]))
888
889     if constants.NV_NODENETTEST not in node_result:
890       bad = True
891       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
892     else:
893       if node_result[constants.NV_NODENETTEST]:
894         bad = True
895         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
896         for node in nlist:
897           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
898                           (node, node_result[constants.NV_NODENETTEST][node]))
899
900     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
901     if isinstance(hyp_result, dict):
902       for hv_name, hv_result in hyp_result.iteritems():
903         if hv_result is not None:
904           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
905                       (hv_name, hv_result))
906
907     # check used drbd list
908     if vg_name is not None:
909       used_minors = node_result.get(constants.NV_DRBDLIST, [])
910       if not isinstance(used_minors, (tuple, list)):
911         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
912                     str(used_minors))
913       else:
914         for minor, (iname, must_exist) in drbd_map.items():
915           if minor not in used_minors and must_exist:
916             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
917                         " not active" % (minor, iname))
918             bad = True
919         for minor in used_minors:
920           if minor not in drbd_map:
921             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
922                         minor)
923             bad = True
924
925     return bad
926
927   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
928                       node_instance, feedback_fn, n_offline):
929     """Verify an instance.
930
931     This function checks to see if the required block devices are
932     available on the instance's node.
933
934     """
935     bad = False
936
937     node_current = instanceconfig.primary_node
938
939     node_vol_should = {}
940     instanceconfig.MapLVsByNode(node_vol_should)
941
942     for node in node_vol_should:
943       if node in n_offline:
944         # ignore missing volumes on offline nodes
945         continue
946       for volume in node_vol_should[node]:
947         if node not in node_vol_is or volume not in node_vol_is[node]:
948           feedback_fn("  - ERROR: volume %s missing on node %s" %
949                           (volume, node))
950           bad = True
951
952     if instanceconfig.admin_up:
953       if ((node_current not in node_instance or
954           not instance in node_instance[node_current]) and
955           node_current not in n_offline):
956         feedback_fn("  - ERROR: instance %s not running on node %s" %
957                         (instance, node_current))
958         bad = True
959
960     for node in node_instance:
961       if (not node == node_current):
962         if instance in node_instance[node]:
963           feedback_fn("  - ERROR: instance %s should not run on node %s" %
964                           (instance, node))
965           bad = True
966
967     return bad
968
969   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
970     """Verify if there are any unknown volumes in the cluster.
971
972     The .os, .swap and backup volumes are ignored. All other volumes are
973     reported as unknown.
974
975     """
976     bad = False
977
978     for node in node_vol_is:
979       for volume in node_vol_is[node]:
980         if node not in node_vol_should or volume not in node_vol_should[node]:
981           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
982                       (volume, node))
983           bad = True
984     return bad
985
986   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
987     """Verify the list of running instances.
988
989     This checks what instances are running but unknown to the cluster.
990
991     """
992     bad = False
993     for node in node_instance:
994       for runninginstance in node_instance[node]:
995         if runninginstance not in instancelist:
996           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
997                           (runninginstance, node))
998           bad = True
999     return bad
1000
1001   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
1002     """Verify N+1 Memory Resilience.
1003
1004     Check that if one single node dies we can still start all the instances it
1005     was primary for.
1006
1007     """
1008     bad = False
1009
1010     for node, nodeinfo in node_info.iteritems():
1011       # This code checks that every node which is now listed as secondary has
1012       # enough memory to host all instances it is supposed to should a single
1013       # other node in the cluster fail.
1014       # FIXME: not ready for failover to an arbitrary node
1015       # FIXME: does not support file-backed instances
1016       # WARNING: we currently take into account down instances as well as up
1017       # ones, considering that even if they're down someone might want to start
1018       # them even in the event of a node failure.
1019       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1020         needed_mem = 0
1021         for instance in instances:
1022           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1023           if bep[constants.BE_AUTO_BALANCE]:
1024             needed_mem += bep[constants.BE_MEMORY]
1025         if nodeinfo['mfree'] < needed_mem:
1026           feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
1027                       " failovers should node %s fail" % (node, prinode))
1028           bad = True
1029     return bad
1030
1031   def CheckPrereq(self):
1032     """Check prerequisites.
1033
1034     Transform the list of checks we're going to skip into a set and check that
1035     all its members are valid.
1036
1037     """
1038     self.skip_set = frozenset(self.op.skip_checks)
1039     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1040       raise errors.OpPrereqError("Invalid checks to be skipped specified")
1041
1042   def BuildHooksEnv(self):
1043     """Build hooks env.
1044
1045     Cluster-Verify hooks just ran in the post phase and their failure makes
1046     the output be logged in the verify output and the verification to fail.
1047
1048     """
1049     all_nodes = self.cfg.GetNodeList()
1050     env = {
1051       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1052       }
1053     for node in self.cfg.GetAllNodesInfo().values():
1054       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1055
1056     return env, [], all_nodes
1057
1058   def Exec(self, feedback_fn):
1059     """Verify integrity of cluster, performing various test on nodes.
1060
1061     """
1062     bad = False
1063     feedback_fn("* Verifying global settings")
1064     for msg in self.cfg.VerifyConfig():
1065       feedback_fn("  - ERROR: %s" % msg)
1066
1067     vg_name = self.cfg.GetVGName()
1068     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1069     nodelist = utils.NiceSort(self.cfg.GetNodeList())
1070     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1071     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1072     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1073                         for iname in instancelist)
1074     i_non_redundant = [] # Non redundant instances
1075     i_non_a_balanced = [] # Non auto-balanced instances
1076     n_offline = [] # List of offline nodes
1077     n_drained = [] # List of nodes being drained
1078     node_volume = {}
1079     node_instance = {}
1080     node_info = {}
1081     instance_cfg = {}
1082
1083     # FIXME: verify OS list
1084     # do local checksums
1085     master_files = [constants.CLUSTER_CONF_FILE]
1086
1087     file_names = ssconf.SimpleStore().GetFileList()
1088     file_names.append(constants.SSL_CERT_FILE)
1089     file_names.append(constants.RAPI_CERT_FILE)
1090     file_names.extend(master_files)
1091
1092     local_checksums = utils.FingerprintFiles(file_names)
1093
1094     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1095     node_verify_param = {
1096       constants.NV_FILELIST: file_names,
1097       constants.NV_NODELIST: [node.name for node in nodeinfo
1098                               if not node.offline],
1099       constants.NV_HYPERVISOR: hypervisors,
1100       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1101                                   node.secondary_ip) for node in nodeinfo
1102                                  if not node.offline],
1103       constants.NV_INSTANCELIST: hypervisors,
1104       constants.NV_VERSION: None,
1105       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1106       }
1107     if vg_name is not None:
1108       node_verify_param[constants.NV_VGLIST] = None
1109       node_verify_param[constants.NV_LVLIST] = vg_name
1110       node_verify_param[constants.NV_DRBDLIST] = None
1111     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1112                                            self.cfg.GetClusterName())
1113
1114     cluster = self.cfg.GetClusterInfo()
1115     master_node = self.cfg.GetMasterNode()
1116     all_drbd_map = self.cfg.ComputeDRBDMap()
1117
1118     for node_i in nodeinfo:
1119       node = node_i.name
1120
1121       if node_i.offline:
1122         feedback_fn("* Skipping offline node %s" % (node,))
1123         n_offline.append(node)
1124         continue
1125
1126       if node == master_node:
1127         ntype = "master"
1128       elif node_i.master_candidate:
1129         ntype = "master candidate"
1130       elif node_i.drained:
1131         ntype = "drained"
1132         n_drained.append(node)
1133       else:
1134         ntype = "regular"
1135       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1136
1137       msg = all_nvinfo[node].fail_msg
1138       if msg:
1139         feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1140         bad = True
1141         continue
1142
1143       nresult = all_nvinfo[node].payload
1144       node_drbd = {}
1145       for minor, instance in all_drbd_map[node].items():
1146         if instance not in instanceinfo:
1147           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1148                       instance)
1149           # ghost instance should not be running, but otherwise we
1150           # don't give double warnings (both ghost instance and
1151           # unallocated minor in use)
1152           node_drbd[minor] = (instance, False)
1153         else:
1154           instance = instanceinfo[instance]
1155           node_drbd[minor] = (instance.name, instance.admin_up)
1156       result = self._VerifyNode(node_i, file_names, local_checksums,
1157                                 nresult, feedback_fn, master_files,
1158                                 node_drbd, vg_name)
1159       bad = bad or result
1160
1161       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1162       if vg_name is None:
1163         node_volume[node] = {}
1164       elif isinstance(lvdata, basestring):
1165         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1166                     (node, utils.SafeEncode(lvdata)))
1167         bad = True
1168         node_volume[node] = {}
1169       elif not isinstance(lvdata, dict):
1170         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1171         bad = True
1172         continue
1173       else:
1174         node_volume[node] = lvdata
1175
1176       # node_instance
1177       idata = nresult.get(constants.NV_INSTANCELIST, None)
1178       if not isinstance(idata, list):
1179         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1180                     (node,))
1181         bad = True
1182         continue
1183
1184       node_instance[node] = idata
1185
1186       # node_info
1187       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1188       if not isinstance(nodeinfo, dict):
1189         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1190         bad = True
1191         continue
1192
1193       try:
1194         node_info[node] = {
1195           "mfree": int(nodeinfo['memory_free']),
1196           "pinst": [],
1197           "sinst": [],
1198           # dictionary holding all instances this node is secondary for,
1199           # grouped by their primary node. Each key is a cluster node, and each
1200           # value is a list of instances which have the key as primary and the
1201           # current node as secondary.  this is handy to calculate N+1 memory
1202           # availability if you can only failover from a primary to its
1203           # secondary.
1204           "sinst-by-pnode": {},
1205         }
1206         # FIXME: devise a free space model for file based instances as well
1207         if vg_name is not None:
1208           if (constants.NV_VGLIST not in nresult or
1209               vg_name not in nresult[constants.NV_VGLIST]):
1210             feedback_fn("  - ERROR: node %s didn't return data for the"
1211                         " volume group '%s' - it is either missing or broken" %
1212                         (node, vg_name))
1213             bad = True
1214             continue
1215           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1216       except (ValueError, KeyError):
1217         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1218                     " from node %s" % (node,))
1219         bad = True
1220         continue
1221
1222     node_vol_should = {}
1223
1224     for instance in instancelist:
1225       feedback_fn("* Verifying instance %s" % instance)
1226       inst_config = instanceinfo[instance]
1227       result =  self._VerifyInstance(instance, inst_config, node_volume,
1228                                      node_instance, feedback_fn, n_offline)
1229       bad = bad or result
1230       inst_nodes_offline = []
1231
1232       inst_config.MapLVsByNode(node_vol_should)
1233
1234       instance_cfg[instance] = inst_config
1235
1236       pnode = inst_config.primary_node
1237       if pnode in node_info:
1238         node_info[pnode]['pinst'].append(instance)
1239       elif pnode not in n_offline:
1240         feedback_fn("  - ERROR: instance %s, connection to primary node"
1241                     " %s failed" % (instance, pnode))
1242         bad = True
1243
1244       if pnode in n_offline:
1245         inst_nodes_offline.append(pnode)
1246
1247       # If the instance is non-redundant we cannot survive losing its primary
1248       # node, so we are not N+1 compliant. On the other hand we have no disk
1249       # templates with more than one secondary so that situation is not well
1250       # supported either.
1251       # FIXME: does not support file-backed instances
1252       if len(inst_config.secondary_nodes) == 0:
1253         i_non_redundant.append(instance)
1254       elif len(inst_config.secondary_nodes) > 1:
1255         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1256                     % instance)
1257
1258       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1259         i_non_a_balanced.append(instance)
1260
1261       for snode in inst_config.secondary_nodes:
1262         if snode in node_info:
1263           node_info[snode]['sinst'].append(instance)
1264           if pnode not in node_info[snode]['sinst-by-pnode']:
1265             node_info[snode]['sinst-by-pnode'][pnode] = []
1266           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1267         elif snode not in n_offline:
1268           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1269                       " %s failed" % (instance, snode))
1270           bad = True
1271         if snode in n_offline:
1272           inst_nodes_offline.append(snode)
1273
1274       if inst_nodes_offline:
1275         # warn that the instance lives on offline nodes, and set bad=True
1276         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1277                     ", ".join(inst_nodes_offline))
1278         bad = True
1279
1280     feedback_fn("* Verifying orphan volumes")
1281     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1282                                        feedback_fn)
1283     bad = bad or result
1284
1285     feedback_fn("* Verifying remaining instances")
1286     result = self._VerifyOrphanInstances(instancelist, node_instance,
1287                                          feedback_fn)
1288     bad = bad or result
1289
1290     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1291       feedback_fn("* Verifying N+1 Memory redundancy")
1292       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1293       bad = bad or result
1294
1295     feedback_fn("* Other Notes")
1296     if i_non_redundant:
1297       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1298                   % len(i_non_redundant))
1299
1300     if i_non_a_balanced:
1301       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1302                   % len(i_non_a_balanced))
1303
1304     if n_offline:
1305       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1306
1307     if n_drained:
1308       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1309
1310     return not bad
1311
1312   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1313     """Analyze the post-hooks' result
1314
1315     This method analyses the hook result, handles it, and sends some
1316     nicely-formatted feedback back to the user.
1317
1318     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1319         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1320     @param hooks_results: the results of the multi-node hooks rpc call
1321     @param feedback_fn: function used send feedback back to the caller
1322     @param lu_result: previous Exec result
1323     @return: the new Exec result, based on the previous result
1324         and hook results
1325
1326     """
1327     # We only really run POST phase hooks, and are only interested in
1328     # their results
1329     if phase == constants.HOOKS_PHASE_POST:
1330       # Used to change hooks' output to proper indentation
1331       indent_re = re.compile('^', re.M)
1332       feedback_fn("* Hooks Results")
1333       if not hooks_results:
1334         feedback_fn("  - ERROR: general communication failure")
1335         lu_result = 1
1336       else:
1337         for node_name in hooks_results:
1338           show_node_header = True
1339           res = hooks_results[node_name]
1340           msg = res.fail_msg
1341           if msg:
1342             if res.offline:
1343               # no need to warn or set fail return value
1344               continue
1345             feedback_fn("    Communication failure in hooks execution: %s" %
1346                         msg)
1347             lu_result = 1
1348             continue
1349           for script, hkr, output in res.payload:
1350             if hkr == constants.HKR_FAIL:
1351               # The node header is only shown once, if there are
1352               # failing hooks on that node
1353               if show_node_header:
1354                 feedback_fn("  Node %s:" % node_name)
1355                 show_node_header = False
1356               feedback_fn("    ERROR: Script %s failed, output:" % script)
1357               output = indent_re.sub('      ', output)
1358               feedback_fn("%s" % output)
1359               lu_result = 1
1360
1361       return lu_result
1362
1363
1364 class LUVerifyDisks(NoHooksLU):
1365   """Verifies the cluster disks status.
1366
1367   """
1368   _OP_REQP = []
1369   REQ_BGL = False
1370
1371   def ExpandNames(self):
1372     self.needed_locks = {
1373       locking.LEVEL_NODE: locking.ALL_SET,
1374       locking.LEVEL_INSTANCE: locking.ALL_SET,
1375     }
1376     self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1377
1378   def CheckPrereq(self):
1379     """Check prerequisites.
1380
1381     This has no prerequisites.
1382
1383     """
1384     pass
1385
1386   def Exec(self, feedback_fn):
1387     """Verify integrity of cluster disks.
1388
1389     @rtype: tuple of three items
1390     @return: a tuple of (dict of node-to-node_error, list of instances
1391         which need activate-disks, dict of instance: (node, volume) for
1392         missing volumes
1393
1394     """
1395     result = res_nodes, res_instances, res_missing = {}, [], {}
1396
1397     vg_name = self.cfg.GetVGName()
1398     nodes = utils.NiceSort(self.cfg.GetNodeList())
1399     instances = [self.cfg.GetInstanceInfo(name)
1400                  for name in self.cfg.GetInstanceList()]
1401
1402     nv_dict = {}
1403     for inst in instances:
1404       inst_lvs = {}
1405       if (not inst.admin_up or
1406           inst.disk_template not in constants.DTS_NET_MIRROR):
1407         continue
1408       inst.MapLVsByNode(inst_lvs)
1409       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1410       for node, vol_list in inst_lvs.iteritems():
1411         for vol in vol_list:
1412           nv_dict[(node, vol)] = inst
1413
1414     if not nv_dict:
1415       return result
1416
1417     node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1418
1419     for node in nodes:
1420       # node_volume
1421       node_res = node_lvs[node]
1422       if node_res.offline:
1423         continue
1424       msg = node_res.fail_msg
1425       if msg:
1426         logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1427         res_nodes[node] = msg
1428         continue
1429
1430       lvs = node_res.payload
1431       for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1432         inst = nv_dict.pop((node, lv_name), None)
1433         if (not lv_online and inst is not None
1434             and inst.name not in res_instances):
1435           res_instances.append(inst.name)
1436
1437     # any leftover items in nv_dict are missing LVs, let's arrange the
1438     # data better
1439     for key, inst in nv_dict.iteritems():
1440       if inst.name not in res_missing:
1441         res_missing[inst.name] = []
1442       res_missing[inst.name].append(key)
1443
1444     return result
1445
1446
1447 class LURenameCluster(LogicalUnit):
1448   """Rename the cluster.
1449
1450   """
1451   HPATH = "cluster-rename"
1452   HTYPE = constants.HTYPE_CLUSTER
1453   _OP_REQP = ["name"]
1454
1455   def BuildHooksEnv(self):
1456     """Build hooks env.
1457
1458     """
1459     env = {
1460       "OP_TARGET": self.cfg.GetClusterName(),
1461       "NEW_NAME": self.op.name,
1462       }
1463     mn = self.cfg.GetMasterNode()
1464     return env, [mn], [mn]
1465
1466   def CheckPrereq(self):
1467     """Verify that the passed name is a valid one.
1468
1469     """
1470     hostname = utils.HostInfo(self.op.name)
1471
1472     new_name = hostname.name
1473     self.ip = new_ip = hostname.ip
1474     old_name = self.cfg.GetClusterName()
1475     old_ip = self.cfg.GetMasterIP()
1476     if new_name == old_name and new_ip == old_ip:
1477       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1478                                  " cluster has changed")
1479     if new_ip != old_ip:
1480       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1481         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1482                                    " reachable on the network. Aborting." %
1483                                    new_ip)
1484
1485     self.op.name = new_name
1486
1487   def Exec(self, feedback_fn):
1488     """Rename the cluster.
1489
1490     """
1491     clustername = self.op.name
1492     ip = self.ip
1493
1494     # shutdown the master IP
1495     master = self.cfg.GetMasterNode()
1496     result = self.rpc.call_node_stop_master(master, False)
1497     result.Raise("Could not disable the master role")
1498
1499     try:
1500       cluster = self.cfg.GetClusterInfo()
1501       cluster.cluster_name = clustername
1502       cluster.master_ip = ip
1503       self.cfg.Update(cluster)
1504
1505       # update the known hosts file
1506       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1507       node_list = self.cfg.GetNodeList()
1508       try:
1509         node_list.remove(master)
1510       except ValueError:
1511         pass
1512       result = self.rpc.call_upload_file(node_list,
1513                                          constants.SSH_KNOWN_HOSTS_FILE)
1514       for to_node, to_result in result.iteritems():
1515         msg = to_result.fail_msg
1516         if msg:
1517           msg = ("Copy of file %s to node %s failed: %s" %
1518                  (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1519           self.proc.LogWarning(msg)
1520
1521     finally:
1522       result = self.rpc.call_node_start_master(master, False, False)
1523       msg = result.fail_msg
1524       if msg:
1525         self.LogWarning("Could not re-enable the master role on"
1526                         " the master, please restart manually: %s", msg)
1527
1528
1529 def _RecursiveCheckIfLVMBased(disk):
1530   """Check if the given disk or its children are lvm-based.
1531
1532   @type disk: L{objects.Disk}
1533   @param disk: the disk to check
1534   @rtype: boolean
1535   @return: boolean indicating whether a LD_LV dev_type was found or not
1536
1537   """
1538   if disk.children:
1539     for chdisk in disk.children:
1540       if _RecursiveCheckIfLVMBased(chdisk):
1541         return True
1542   return disk.dev_type == constants.LD_LV
1543
1544
1545 class LUSetClusterParams(LogicalUnit):
1546   """Change the parameters of the cluster.
1547
1548   """
1549   HPATH = "cluster-modify"
1550   HTYPE = constants.HTYPE_CLUSTER
1551   _OP_REQP = []
1552   REQ_BGL = False
1553
1554   def CheckArguments(self):
1555     """Check parameters
1556
1557     """
1558     if not hasattr(self.op, "candidate_pool_size"):
1559       self.op.candidate_pool_size = None
1560     if self.op.candidate_pool_size is not None:
1561       try:
1562         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1563       except (ValueError, TypeError), err:
1564         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1565                                    str(err))
1566       if self.op.candidate_pool_size < 1:
1567         raise errors.OpPrereqError("At least one master candidate needed")
1568
1569   def ExpandNames(self):
1570     # FIXME: in the future maybe other cluster params won't require checking on
1571     # all nodes to be modified.
1572     self.needed_locks = {
1573       locking.LEVEL_NODE: locking.ALL_SET,
1574     }
1575     self.share_locks[locking.LEVEL_NODE] = 1
1576
1577   def BuildHooksEnv(self):
1578     """Build hooks env.
1579
1580     """
1581     env = {
1582       "OP_TARGET": self.cfg.GetClusterName(),
1583       "NEW_VG_NAME": self.op.vg_name,
1584       }
1585     mn = self.cfg.GetMasterNode()
1586     return env, [mn], [mn]
1587
1588   def CheckPrereq(self):
1589     """Check prerequisites.
1590
1591     This checks whether the given params don't conflict and
1592     if the given volume group is valid.
1593
1594     """
1595     if self.op.vg_name is not None and not self.op.vg_name:
1596       instances = self.cfg.GetAllInstancesInfo().values()
1597       for inst in instances:
1598         for disk in inst.disks:
1599           if _RecursiveCheckIfLVMBased(disk):
1600             raise errors.OpPrereqError("Cannot disable lvm storage while"
1601                                        " lvm-based instances exist")
1602
1603     node_list = self.acquired_locks[locking.LEVEL_NODE]
1604
1605     # if vg_name not None, checks given volume group on all nodes
1606     if self.op.vg_name:
1607       vglist = self.rpc.call_vg_list(node_list)
1608       for node in node_list:
1609         msg = vglist[node].fail_msg
1610         if msg:
1611           # ignoring down node
1612           self.LogWarning("Error while gathering data on node %s"
1613                           " (ignoring node): %s", node, msg)
1614           continue
1615         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1616                                               self.op.vg_name,
1617                                               constants.MIN_VG_SIZE)
1618         if vgstatus:
1619           raise errors.OpPrereqError("Error on node '%s': %s" %
1620                                      (node, vgstatus))
1621
1622     self.cluster = cluster = self.cfg.GetClusterInfo()
1623     # validate params changes
1624     if self.op.beparams:
1625       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1626       self.new_beparams = objects.FillDict(
1627         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1628
1629     if self.op.nicparams:
1630       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1631       self.new_nicparams = objects.FillDict(
1632         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1633       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1634
1635     # hypervisor list/parameters
1636     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1637     if self.op.hvparams:
1638       if not isinstance(self.op.hvparams, dict):
1639         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1640       for hv_name, hv_dict in self.op.hvparams.items():
1641         if hv_name not in self.new_hvparams:
1642           self.new_hvparams[hv_name] = hv_dict
1643         else:
1644           self.new_hvparams[hv_name].update(hv_dict)
1645
1646     if self.op.enabled_hypervisors is not None:
1647       self.hv_list = self.op.enabled_hypervisors
1648       if not self.hv_list:
1649         raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1650                                    " least one member")
1651       invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1652       if invalid_hvs:
1653         raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1654                                    " entries: %s" % invalid_hvs)
1655     else:
1656       self.hv_list = cluster.enabled_hypervisors
1657
1658     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1659       # either the enabled list has changed, or the parameters have, validate
1660       for hv_name, hv_params in self.new_hvparams.items():
1661         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1662             (self.op.enabled_hypervisors and
1663              hv_name in self.op.enabled_hypervisors)):
1664           # either this is a new hypervisor, or its parameters have changed
1665           hv_class = hypervisor.GetHypervisor(hv_name)
1666           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1667           hv_class.CheckParameterSyntax(hv_params)
1668           _CheckHVParams(self, node_list, hv_name, hv_params)
1669
1670   def Exec(self, feedback_fn):
1671     """Change the parameters of the cluster.
1672
1673     """
1674     if self.op.vg_name is not None:
1675       new_volume = self.op.vg_name
1676       if not new_volume:
1677         new_volume = None
1678       if new_volume != self.cfg.GetVGName():
1679         self.cfg.SetVGName(new_volume)
1680       else:
1681         feedback_fn("Cluster LVM configuration already in desired"
1682                     " state, not changing")
1683     if self.op.hvparams:
1684       self.cluster.hvparams = self.new_hvparams
1685     if self.op.enabled_hypervisors is not None:
1686       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1687     if self.op.beparams:
1688       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1689     if self.op.nicparams:
1690       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1691
1692     if self.op.candidate_pool_size is not None:
1693       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1694       # we need to update the pool size here, otherwise the save will fail
1695       _AdjustCandidatePool(self)
1696
1697     self.cfg.Update(self.cluster)
1698
1699
1700 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1701   """Distribute additional files which are part of the cluster configuration.
1702
1703   ConfigWriter takes care of distributing the config and ssconf files, but
1704   there are more files which should be distributed to all nodes. This function
1705   makes sure those are copied.
1706
1707   @param lu: calling logical unit
1708   @param additional_nodes: list of nodes not in the config to distribute to
1709
1710   """
1711   # 1. Gather target nodes
1712   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1713   dist_nodes = lu.cfg.GetNodeList()
1714   if additional_nodes is not None:
1715     dist_nodes.extend(additional_nodes)
1716   if myself.name in dist_nodes:
1717     dist_nodes.remove(myself.name)
1718   # 2. Gather files to distribute
1719   dist_files = set([constants.ETC_HOSTS,
1720                     constants.SSH_KNOWN_HOSTS_FILE,
1721                     constants.RAPI_CERT_FILE,
1722                     constants.RAPI_USERS_FILE,
1723                     constants.HMAC_CLUSTER_KEY,
1724                    ])
1725
1726   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1727   for hv_name in enabled_hypervisors:
1728     hv_class = hypervisor.GetHypervisor(hv_name)
1729     dist_files.update(hv_class.GetAncillaryFiles())
1730
1731   # 3. Perform the files upload
1732   for fname in dist_files:
1733     if os.path.exists(fname):
1734       result = lu.rpc.call_upload_file(dist_nodes, fname)
1735       for to_node, to_result in result.items():
1736         msg = to_result.fail_msg
1737         if msg:
1738           msg = ("Copy of file %s to node %s failed: %s" %
1739                  (fname, to_node, msg))
1740           lu.proc.LogWarning(msg)
1741
1742
1743 class LURedistributeConfig(NoHooksLU):
1744   """Force the redistribution of cluster configuration.
1745
1746   This is a very simple LU.
1747
1748   """
1749   _OP_REQP = []
1750   REQ_BGL = False
1751
1752   def ExpandNames(self):
1753     self.needed_locks = {
1754       locking.LEVEL_NODE: locking.ALL_SET,
1755     }
1756     self.share_locks[locking.LEVEL_NODE] = 1
1757
1758   def CheckPrereq(self):
1759     """Check prerequisites.
1760
1761     """
1762
1763   def Exec(self, feedback_fn):
1764     """Redistribute the configuration.
1765
1766     """
1767     self.cfg.Update(self.cfg.GetClusterInfo())
1768     _RedistributeAncillaryFiles(self)
1769
1770
1771 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1772   """Sleep and poll for an instance's disk to sync.
1773
1774   """
1775   if not instance.disks:
1776     return True
1777
1778   if not oneshot:
1779     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1780
1781   node = instance.primary_node
1782
1783   for dev in instance.disks:
1784     lu.cfg.SetDiskID(dev, node)
1785
1786   retries = 0
1787   degr_retries = 10 # in seconds, as we sleep 1 second each time
1788   while True:
1789     max_time = 0
1790     done = True
1791     cumul_degraded = False
1792     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1793     msg = rstats.fail_msg
1794     if msg:
1795       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1796       retries += 1
1797       if retries >= 10:
1798         raise errors.RemoteError("Can't contact node %s for mirror data,"
1799                                  " aborting." % node)
1800       time.sleep(6)
1801       continue
1802     rstats = rstats.payload
1803     retries = 0
1804     for i, mstat in enumerate(rstats):
1805       if mstat is None:
1806         lu.LogWarning("Can't compute data for node %s/%s",
1807                            node, instance.disks[i].iv_name)
1808         continue
1809       # we ignore the ldisk parameter
1810       perc_done, est_time, is_degraded, _ = mstat
1811       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1812       if perc_done is not None:
1813         done = False
1814         if est_time is not None:
1815           rem_time = "%d estimated seconds remaining" % est_time
1816           max_time = est_time
1817         else:
1818           rem_time = "no time estimate"
1819         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1820                         (instance.disks[i].iv_name, perc_done, rem_time))
1821
1822     # if we're done but degraded, let's do a few small retries, to
1823     # make sure we see a stable and not transient situation; therefore
1824     # we force restart of the loop
1825     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1826       logging.info("Degraded disks found, %d retries left", degr_retries)
1827       degr_retries -= 1
1828       time.sleep(1)
1829       continue
1830
1831     if done or oneshot:
1832       break
1833
1834     time.sleep(min(60, max_time))
1835
1836   if done:
1837     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1838   return not cumul_degraded
1839
1840
1841 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1842   """Check that mirrors are not degraded.
1843
1844   The ldisk parameter, if True, will change the test from the
1845   is_degraded attribute (which represents overall non-ok status for
1846   the device(s)) to the ldisk (representing the local storage status).
1847
1848   """
1849   lu.cfg.SetDiskID(dev, node)
1850   if ldisk:
1851     idx = 6
1852   else:
1853     idx = 5
1854
1855   result = True
1856   if on_primary or dev.AssembleOnSecondary():
1857     rstats = lu.rpc.call_blockdev_find(node, dev)
1858     msg = rstats.fail_msg
1859     if msg:
1860       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1861       result = False
1862     elif not rstats.payload:
1863       lu.LogWarning("Can't find disk on node %s", node)
1864       result = False
1865     else:
1866       result = result and (not rstats.payload[idx])
1867   if dev.children:
1868     for child in dev.children:
1869       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1870
1871   return result
1872
1873
1874 class LUDiagnoseOS(NoHooksLU):
1875   """Logical unit for OS diagnose/query.
1876
1877   """
1878   _OP_REQP = ["output_fields", "names"]
1879   REQ_BGL = False
1880   _FIELDS_STATIC = utils.FieldSet()
1881   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1882
1883   def ExpandNames(self):
1884     if self.op.names:
1885       raise errors.OpPrereqError("Selective OS query not supported")
1886
1887     _CheckOutputFields(static=self._FIELDS_STATIC,
1888                        dynamic=self._FIELDS_DYNAMIC,
1889                        selected=self.op.output_fields)
1890
1891     # Lock all nodes, in shared mode
1892     # Temporary removal of locks, should be reverted later
1893     # TODO: reintroduce locks when they are lighter-weight
1894     self.needed_locks = {}
1895     #self.share_locks[locking.LEVEL_NODE] = 1
1896     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1897
1898   def CheckPrereq(self):
1899     """Check prerequisites.
1900
1901     """
1902
1903   @staticmethod
1904   def _DiagnoseByOS(node_list, rlist):
1905     """Remaps a per-node return list into an a per-os per-node dictionary
1906
1907     @param node_list: a list with the names of all nodes
1908     @param rlist: a map with node names as keys and OS objects as values
1909
1910     @rtype: dict
1911     @return: a dictionary with osnames as keys and as value another map, with
1912         nodes as keys and tuples of (path, status, diagnose) as values, eg::
1913
1914           {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1915                                      (/srv/..., False, "invalid api")],
1916                            "node2": [(/srv/..., True, "")]}
1917           }
1918
1919     """
1920     all_os = {}
1921     # we build here the list of nodes that didn't fail the RPC (at RPC
1922     # level), so that nodes with a non-responding node daemon don't
1923     # make all OSes invalid
1924     good_nodes = [node_name for node_name in rlist
1925                   if not rlist[node_name].fail_msg]
1926     for node_name, nr in rlist.items():
1927       if nr.fail_msg or not nr.payload:
1928         continue
1929       for name, path, status, diagnose in nr.payload:
1930         if name not in all_os:
1931           # build a list of nodes for this os containing empty lists
1932           # for each node in node_list
1933           all_os[name] = {}
1934           for nname in good_nodes:
1935             all_os[name][nname] = []
1936         all_os[name][node_name].append((path, status, diagnose))
1937     return all_os
1938
1939   def Exec(self, feedback_fn):
1940     """Compute the list of OSes.
1941
1942     """
1943     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1944     node_data = self.rpc.call_os_diagnose(valid_nodes)
1945     pol = self._DiagnoseByOS(valid_nodes, node_data)
1946     output = []
1947     for os_name, os_data in pol.items():
1948       row = []
1949       for field in self.op.output_fields:
1950         if field == "name":
1951           val = os_name
1952         elif field == "valid":
1953           val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1954         elif field == "node_status":
1955           # this is just a copy of the dict
1956           val = {}
1957           for node_name, nos_list in os_data.items():
1958             val[node_name] = nos_list
1959         else:
1960           raise errors.ParameterError(field)
1961         row.append(val)
1962       output.append(row)
1963
1964     return output
1965
1966
1967 class LURemoveNode(LogicalUnit):
1968   """Logical unit for removing a node.
1969
1970   """
1971   HPATH = "node-remove"
1972   HTYPE = constants.HTYPE_NODE
1973   _OP_REQP = ["node_name"]
1974
1975   def BuildHooksEnv(self):
1976     """Build hooks env.
1977
1978     This doesn't run on the target node in the pre phase as a failed
1979     node would then be impossible to remove.
1980
1981     """
1982     env = {
1983       "OP_TARGET": self.op.node_name,
1984       "NODE_NAME": self.op.node_name,
1985       }
1986     all_nodes = self.cfg.GetNodeList()
1987     all_nodes.remove(self.op.node_name)
1988     return env, all_nodes, all_nodes
1989
1990   def CheckPrereq(self):
1991     """Check prerequisites.
1992
1993     This checks:
1994      - the node exists in the configuration
1995      - it does not have primary or secondary instances
1996      - it's not the master
1997
1998     Any errors are signaled by raising errors.OpPrereqError.
1999
2000     """
2001     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2002     if node is None:
2003       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2004
2005     instance_list = self.cfg.GetInstanceList()
2006
2007     masternode = self.cfg.GetMasterNode()
2008     if node.name == masternode:
2009       raise errors.OpPrereqError("Node is the master node,"
2010                                  " you need to failover first.")
2011
2012     for instance_name in instance_list:
2013       instance = self.cfg.GetInstanceInfo(instance_name)
2014       if node.name in instance.all_nodes:
2015         raise errors.OpPrereqError("Instance %s is still running on the node,"
2016                                    " please remove first." % instance_name)
2017     self.op.node_name = node.name
2018     self.node = node
2019
2020   def Exec(self, feedback_fn):
2021     """Removes the node from the cluster.
2022
2023     """
2024     node = self.node
2025     logging.info("Stopping the node daemon and removing configs from node %s",
2026                  node.name)
2027
2028     self.context.RemoveNode(node.name)
2029
2030     result = self.rpc.call_node_leave_cluster(node.name)
2031     msg = result.fail_msg
2032     if msg:
2033       self.LogWarning("Errors encountered on the remote node while leaving"
2034                       " the cluster: %s", msg)
2035
2036     # Promote nodes to master candidate as needed
2037     _AdjustCandidatePool(self)
2038
2039
2040 class LUQueryNodes(NoHooksLU):
2041   """Logical unit for querying nodes.
2042
2043   """
2044   _OP_REQP = ["output_fields", "names", "use_locking"]
2045   REQ_BGL = False
2046   _FIELDS_DYNAMIC = utils.FieldSet(
2047     "dtotal", "dfree",
2048     "mtotal", "mnode", "mfree",
2049     "bootid",
2050     "ctotal", "cnodes", "csockets",
2051     )
2052
2053   _FIELDS_STATIC = utils.FieldSet(
2054     "name", "pinst_cnt", "sinst_cnt",
2055     "pinst_list", "sinst_list",
2056     "pip", "sip", "tags",
2057     "serial_no",
2058     "master_candidate",
2059     "master",
2060     "offline",
2061     "drained",
2062     "role",
2063     )
2064
2065   def ExpandNames(self):
2066     _CheckOutputFields(static=self._FIELDS_STATIC,
2067                        dynamic=self._FIELDS_DYNAMIC,
2068                        selected=self.op.output_fields)
2069
2070     self.needed_locks = {}
2071     self.share_locks[locking.LEVEL_NODE] = 1
2072
2073     if self.op.names:
2074       self.wanted = _GetWantedNodes(self, self.op.names)
2075     else:
2076       self.wanted = locking.ALL_SET
2077
2078     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2079     self.do_locking = self.do_node_query and self.op.use_locking
2080     if self.do_locking:
2081       # if we don't request only static fields, we need to lock the nodes
2082       self.needed_locks[locking.LEVEL_NODE] = self.wanted
2083
2084
2085   def CheckPrereq(self):
2086     """Check prerequisites.
2087
2088     """
2089     # The validation of the node list is done in the _GetWantedNodes,
2090     # if non empty, and if empty, there's no validation to do
2091     pass
2092
2093   def Exec(self, feedback_fn):
2094     """Computes the list of nodes and their attributes.
2095
2096     """
2097     all_info = self.cfg.GetAllNodesInfo()
2098     if self.do_locking:
2099       nodenames = self.acquired_locks[locking.LEVEL_NODE]
2100     elif self.wanted != locking.ALL_SET:
2101       nodenames = self.wanted
2102       missing = set(nodenames).difference(all_info.keys())
2103       if missing:
2104         raise errors.OpExecError(
2105           "Some nodes were removed before retrieving their data: %s" % missing)
2106     else:
2107       nodenames = all_info.keys()
2108
2109     nodenames = utils.NiceSort(nodenames)
2110     nodelist = [all_info[name] for name in nodenames]
2111
2112     # begin data gathering
2113
2114     if self.do_node_query:
2115       live_data = {}
2116       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2117                                           self.cfg.GetHypervisorType())
2118       for name in nodenames:
2119         nodeinfo = node_data[name]
2120         if not nodeinfo.fail_msg and nodeinfo.payload:
2121           nodeinfo = nodeinfo.payload
2122           fn = utils.TryConvert
2123           live_data[name] = {
2124             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2125             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2126             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2127             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2128             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2129             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2130             "bootid": nodeinfo.get('bootid', None),
2131             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2132             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2133             }
2134         else:
2135           live_data[name] = {}
2136     else:
2137       live_data = dict.fromkeys(nodenames, {})
2138
2139     node_to_primary = dict([(name, set()) for name in nodenames])
2140     node_to_secondary = dict([(name, set()) for name in nodenames])
2141
2142     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2143                              "sinst_cnt", "sinst_list"))
2144     if inst_fields & frozenset(self.op.output_fields):
2145       instancelist = self.cfg.GetInstanceList()
2146
2147       for instance_name in instancelist:
2148         inst = self.cfg.GetInstanceInfo(instance_name)
2149         if inst.primary_node in node_to_primary:
2150           node_to_primary[inst.primary_node].add(inst.name)
2151         for secnode in inst.secondary_nodes:
2152           if secnode in node_to_secondary:
2153             node_to_secondary[secnode].add(inst.name)
2154
2155     master_node = self.cfg.GetMasterNode()
2156
2157     # end data gathering
2158
2159     output = []
2160     for node in nodelist:
2161       node_output = []
2162       for field in self.op.output_fields:
2163         if field == "name":
2164           val = node.name
2165         elif field == "pinst_list":
2166           val = list(node_to_primary[node.name])
2167         elif field == "sinst_list":
2168           val = list(node_to_secondary[node.name])
2169         elif field == "pinst_cnt":
2170           val = len(node_to_primary[node.name])
2171         elif field == "sinst_cnt":
2172           val = len(node_to_secondary[node.name])
2173         elif field == "pip":
2174           val = node.primary_ip
2175         elif field == "sip":
2176           val = node.secondary_ip
2177         elif field == "tags":
2178           val = list(node.GetTags())
2179         elif field == "serial_no":
2180           val = node.serial_no
2181         elif field == "master_candidate":
2182           val = node.master_candidate
2183         elif field == "master":
2184           val = node.name == master_node
2185         elif field == "offline":
2186           val = node.offline
2187         elif field == "drained":
2188           val = node.drained
2189         elif self._FIELDS_DYNAMIC.Matches(field):
2190           val = live_data[node.name].get(field, None)
2191         elif field == "role":
2192           if node.name == master_node:
2193             val = "M"
2194           elif node.master_candidate:
2195             val = "C"
2196           elif node.drained:
2197             val = "D"
2198           elif node.offline:
2199             val = "O"
2200           else:
2201             val = "R"
2202         else:
2203           raise errors.ParameterError(field)
2204         node_output.append(val)
2205       output.append(node_output)
2206
2207     return output
2208
2209
2210 class LUQueryNodeVolumes(NoHooksLU):
2211   """Logical unit for getting volumes on node(s).
2212
2213   """
2214   _OP_REQP = ["nodes", "output_fields"]
2215   REQ_BGL = False
2216   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2217   _FIELDS_STATIC = utils.FieldSet("node")
2218
2219   def ExpandNames(self):
2220     _CheckOutputFields(static=self._FIELDS_STATIC,
2221                        dynamic=self._FIELDS_DYNAMIC,
2222                        selected=self.op.output_fields)
2223
2224     self.needed_locks = {}
2225     self.share_locks[locking.LEVEL_NODE] = 1
2226     if not self.op.nodes:
2227       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2228     else:
2229       self.needed_locks[locking.LEVEL_NODE] = \
2230         _GetWantedNodes(self, self.op.nodes)
2231
2232   def CheckPrereq(self):
2233     """Check prerequisites.
2234
2235     This checks that the fields required are valid output fields.
2236
2237     """
2238     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2239
2240   def Exec(self, feedback_fn):
2241     """Computes the list of nodes and their attributes.
2242
2243     """
2244     nodenames = self.nodes
2245     volumes = self.rpc.call_node_volumes(nodenames)
2246
2247     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2248              in self.cfg.GetInstanceList()]
2249
2250     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2251
2252     output = []
2253     for node in nodenames:
2254       nresult = volumes[node]
2255       if nresult.offline:
2256         continue
2257       msg = nresult.fail_msg
2258       if msg:
2259         self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2260         continue
2261
2262       node_vols = nresult.payload[:]
2263       node_vols.sort(key=lambda vol: vol['dev'])
2264
2265       for vol in node_vols:
2266         node_output = []
2267         for field in self.op.output_fields:
2268           if field == "node":
2269             val = node
2270           elif field == "phys":
2271             val = vol['dev']
2272           elif field == "vg":
2273             val = vol['vg']
2274           elif field == "name":
2275             val = vol['name']
2276           elif field == "size":
2277             val = int(float(vol['size']))
2278           elif field == "instance":
2279             for inst in ilist:
2280               if node not in lv_by_node[inst]:
2281                 continue
2282               if vol['name'] in lv_by_node[inst][node]:
2283                 val = inst.name
2284                 break
2285             else:
2286               val = '-'
2287           else:
2288             raise errors.ParameterError(field)
2289           node_output.append(str(val))
2290
2291         output.append(node_output)
2292
2293     return output
2294
2295
2296 class LUAddNode(LogicalUnit):
2297   """Logical unit for adding node to the cluster.
2298
2299   """
2300   HPATH = "node-add"
2301   HTYPE = constants.HTYPE_NODE
2302   _OP_REQP = ["node_name"]
2303
2304   def BuildHooksEnv(self):
2305     """Build hooks env.
2306
2307     This will run on all nodes before, and on all nodes + the new node after.
2308
2309     """
2310     env = {
2311       "OP_TARGET": self.op.node_name,
2312       "NODE_NAME": self.op.node_name,
2313       "NODE_PIP": self.op.primary_ip,
2314       "NODE_SIP": self.op.secondary_ip,
2315       }
2316     nodes_0 = self.cfg.GetNodeList()
2317     nodes_1 = nodes_0 + [self.op.node_name, ]
2318     return env, nodes_0, nodes_1
2319
2320   def CheckPrereq(self):
2321     """Check prerequisites.
2322
2323     This checks:
2324      - the new node is not already in the config
2325      - it is resolvable
2326      - its parameters (single/dual homed) matches the cluster
2327
2328     Any errors are signaled by raising errors.OpPrereqError.
2329
2330     """
2331     node_name = self.op.node_name
2332     cfg = self.cfg
2333
2334     dns_data = utils.HostInfo(node_name)
2335
2336     node = dns_data.name
2337     primary_ip = self.op.primary_ip = dns_data.ip
2338     secondary_ip = getattr(self.op, "secondary_ip", None)
2339     if secondary_ip is None:
2340       secondary_ip = primary_ip
2341     if not utils.IsValidIP(secondary_ip):
2342       raise errors.OpPrereqError("Invalid secondary IP given")
2343     self.op.secondary_ip = secondary_ip
2344
2345     node_list = cfg.GetNodeList()
2346     if not self.op.readd and node in node_list:
2347       raise errors.OpPrereqError("Node %s is already in the configuration" %
2348                                  node)
2349     elif self.op.readd and node not in node_list:
2350       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2351
2352     for existing_node_name in node_list:
2353       existing_node = cfg.GetNodeInfo(existing_node_name)
2354
2355       if self.op.readd and node == existing_node_name:
2356         if (existing_node.primary_ip != primary_ip or
2357             existing_node.secondary_ip != secondary_ip):
2358           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2359                                      " address configuration as before")
2360         continue
2361
2362       if (existing_node.primary_ip == primary_ip or
2363           existing_node.secondary_ip == primary_ip or
2364           existing_node.primary_ip == secondary_ip or
2365           existing_node.secondary_ip == secondary_ip):
2366         raise errors.OpPrereqError("New node ip address(es) conflict with"
2367                                    " existing node %s" % existing_node.name)
2368
2369     # check that the type of the node (single versus dual homed) is the
2370     # same as for the master
2371     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2372     master_singlehomed = myself.secondary_ip == myself.primary_ip
2373     newbie_singlehomed = secondary_ip == primary_ip
2374     if master_singlehomed != newbie_singlehomed:
2375       if master_singlehomed:
2376         raise errors.OpPrereqError("The master has no private ip but the"
2377                                    " new node has one")
2378       else:
2379         raise errors.OpPrereqError("The master has a private ip but the"
2380                                    " new node doesn't have one")
2381
2382     # checks reachability
2383     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2384       raise errors.OpPrereqError("Node not reachable by ping")
2385
2386     if not newbie_singlehomed:
2387       # check reachability from my secondary ip to newbie's secondary ip
2388       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2389                            source=myself.secondary_ip):
2390         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2391                                    " based ping to noded port")
2392
2393     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2394     if self.op.readd:
2395       exceptions = [node]
2396     else:
2397       exceptions = []
2398     mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2399     # the new node will increase mc_max with one, so:
2400     mc_max = min(mc_max + 1, cp_size)
2401     self.master_candidate = mc_now < mc_max
2402
2403     if self.op.readd:
2404       self.new_node = self.cfg.GetNodeInfo(node)
2405       assert self.new_node is not None, "Can't retrieve locked node %s" % node
2406     else:
2407       self.new_node = objects.Node(name=node,
2408                                    primary_ip=primary_ip,
2409                                    secondary_ip=secondary_ip,
2410                                    master_candidate=self.master_candidate,
2411                                    offline=False, drained=False)
2412
2413   def Exec(self, feedback_fn):
2414     """Adds the new node to the cluster.
2415
2416     """
2417     new_node = self.new_node
2418     node = new_node.name
2419
2420     # for re-adds, reset the offline/drained/master-candidate flags;
2421     # we need to reset here, otherwise offline would prevent RPC calls
2422     # later in the procedure; this also means that if the re-add
2423     # fails, we are left with a non-offlined, broken node
2424     if self.op.readd:
2425       new_node.drained = new_node.offline = False
2426       self.LogInfo("Readding a node, the offline/drained flags were reset")
2427       # if we demote the node, we do cleanup later in the procedure
2428       new_node.master_candidate = self.master_candidate
2429
2430     # notify the user about any possible mc promotion
2431     if new_node.master_candidate:
2432       self.LogInfo("Node will be a master candidate")
2433
2434     # check connectivity
2435     result = self.rpc.call_version([node])[node]
2436     result.Raise("Can't get version information from node %s" % node)
2437     if constants.PROTOCOL_VERSION == result.payload:
2438       logging.info("Communication to node %s fine, sw version %s match",
2439                    node, result.payload)
2440     else:
2441       raise errors.OpExecError("Version mismatch master version %s,"
2442                                " node version %s" %
2443                                (constants.PROTOCOL_VERSION, result.payload))
2444
2445     # setup ssh on node
2446     logging.info("Copy ssh key to node %s", node)
2447     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2448     keyarray = []
2449     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2450                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2451                 priv_key, pub_key]
2452
2453     for i in keyfiles:
2454       f = open(i, 'r')
2455       try:
2456         keyarray.append(f.read())
2457       finally:
2458         f.close()
2459
2460     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2461                                     keyarray[2],
2462                                     keyarray[3], keyarray[4], keyarray[5])
2463     result.Raise("Cannot transfer ssh keys to the new node")
2464
2465     # Add node to our /etc/hosts, and add key to known_hosts
2466     if self.cfg.GetClusterInfo().modify_etc_hosts:
2467       utils.AddHostToEtcHosts(new_node.name)
2468
2469     if new_node.secondary_ip != new_node.primary_ip:
2470       result = self.rpc.call_node_has_ip_address(new_node.name,
2471                                                  new_node.secondary_ip)
2472       result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2473                    prereq=True)
2474       if not result.payload:
2475         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2476                                  " you gave (%s). Please fix and re-run this"
2477                                  " command." % new_node.secondary_ip)
2478
2479     node_verify_list = [self.cfg.GetMasterNode()]
2480     node_verify_param = {
2481       'nodelist': [node],
2482       # TODO: do a node-net-test as well?
2483     }
2484
2485     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2486                                        self.cfg.GetClusterName())
2487     for verifier in node_verify_list:
2488       result[verifier].Raise("Cannot communicate with node %s" % verifier)
2489       nl_payload = result[verifier].payload['nodelist']
2490       if nl_payload:
2491         for failed in nl_payload:
2492           feedback_fn("ssh/hostname verification failed %s -> %s" %
2493                       (verifier, nl_payload[failed]))
2494         raise errors.OpExecError("ssh/hostname verification failed.")
2495
2496     if self.op.readd:
2497       _RedistributeAncillaryFiles(self)
2498       self.context.ReaddNode(new_node)
2499       # make sure we redistribute the config
2500       self.cfg.Update(new_node)
2501       # and make sure the new node will not have old files around
2502       if not new_node.master_candidate:
2503         result = self.rpc.call_node_demote_from_mc(new_node.name)
2504         msg = result.RemoteFailMsg()
2505         if msg:
2506           self.LogWarning("Node failed to demote itself from master"
2507                           " candidate status: %s" % msg)
2508     else:
2509       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2510       self.context.AddNode(new_node)
2511
2512
2513 class LUSetNodeParams(LogicalUnit):
2514   """Modifies the parameters of a node.
2515
2516   """
2517   HPATH = "node-modify"
2518   HTYPE = constants.HTYPE_NODE
2519   _OP_REQP = ["node_name"]
2520   REQ_BGL = False
2521
2522   def CheckArguments(self):
2523     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2524     if node_name is None:
2525       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2526     self.op.node_name = node_name
2527     _CheckBooleanOpField(self.op, 'master_candidate')
2528     _CheckBooleanOpField(self.op, 'offline')
2529     _CheckBooleanOpField(self.op, 'drained')
2530     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2531     if all_mods.count(None) == 3:
2532       raise errors.OpPrereqError("Please pass at least one modification")
2533     if all_mods.count(True) > 1:
2534       raise errors.OpPrereqError("Can't set the node into more than one"
2535                                  " state at the same time")
2536
2537   def ExpandNames(self):
2538     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2539
2540   def BuildHooksEnv(self):
2541     """Build hooks env.
2542
2543     This runs on the master node.
2544
2545     """
2546     env = {
2547       "OP_TARGET": self.op.node_name,
2548       "MASTER_CANDIDATE": str(self.op.master_candidate),
2549       "OFFLINE": str(self.op.offline),
2550       "DRAINED": str(self.op.drained),
2551       }
2552     nl = [self.cfg.GetMasterNode(),
2553           self.op.node_name]
2554     return env, nl, nl
2555
2556   def CheckPrereq(self):
2557     """Check prerequisites.
2558
2559     This only checks the instance list against the existing names.
2560
2561     """
2562     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2563
2564     if ((self.op.master_candidate == False or self.op.offline == True or
2565          self.op.drained == True) and node.master_candidate):
2566       # we will demote the node from master_candidate
2567       if self.op.node_name == self.cfg.GetMasterNode():
2568         raise errors.OpPrereqError("The master node has to be a"
2569                                    " master candidate, online and not drained")
2570       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2571       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2572       if num_candidates <= cp_size:
2573         msg = ("Not enough master candidates (desired"
2574                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2575         if self.op.force:
2576           self.LogWarning(msg)
2577         else:
2578           raise errors.OpPrereqError(msg)
2579
2580     if (self.op.master_candidate == True and
2581         ((node.offline and not self.op.offline == False) or
2582          (node.drained and not self.op.drained == False))):
2583       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2584                                  " to master_candidate" % node.name)
2585
2586     return
2587
2588   def Exec(self, feedback_fn):
2589     """Modifies a node.
2590
2591     """
2592     node = self.node
2593
2594     result = []
2595     changed_mc = False
2596
2597     if self.op.offline is not None:
2598       node.offline = self.op.offline
2599       result.append(("offline", str(self.op.offline)))
2600       if self.op.offline == True:
2601         if node.master_candidate:
2602           node.master_candidate = False
2603           changed_mc = True
2604           result.append(("master_candidate", "auto-demotion due to offline"))
2605         if node.drained:
2606           node.drained = False
2607           result.append(("drained", "clear drained status due to offline"))
2608
2609     if self.op.master_candidate is not None:
2610       node.master_candidate = self.op.master_candidate
2611       changed_mc = True
2612       result.append(("master_candidate", str(self.op.master_candidate)))
2613       if self.op.master_candidate == False:
2614         rrc = self.rpc.call_node_demote_from_mc(node.name)
2615         msg = rrc.fail_msg
2616         if msg:
2617           self.LogWarning("Node failed to demote itself: %s" % msg)
2618
2619     if self.op.drained is not None:
2620       node.drained = self.op.drained
2621       result.append(("drained", str(self.op.drained)))
2622       if self.op.drained == True:
2623         if node.master_candidate:
2624           node.master_candidate = False
2625           changed_mc = True
2626           result.append(("master_candidate", "auto-demotion due to drain"))
2627           rrc = self.rpc.call_node_demote_from_mc(node.name)
2628           msg = rrc.RemoteFailMsg()
2629           if msg:
2630             self.LogWarning("Node failed to demote itself: %s" % msg)
2631         if node.offline:
2632           node.offline = False
2633           result.append(("offline", "clear offline status due to drain"))
2634
2635     # this will trigger configuration file update, if needed
2636     self.cfg.Update(node)
2637     # this will trigger job queue propagation or cleanup
2638     if changed_mc:
2639       self.context.ReaddNode(node)
2640
2641     return result
2642
2643
2644 class LUPowercycleNode(NoHooksLU):
2645   """Powercycles a node.
2646
2647   """
2648   _OP_REQP = ["node_name", "force"]
2649   REQ_BGL = False
2650
2651   def CheckArguments(self):
2652     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2653     if node_name is None:
2654       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2655     self.op.node_name = node_name
2656     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2657       raise errors.OpPrereqError("The node is the master and the force"
2658                                  " parameter was not set")
2659
2660   def ExpandNames(self):
2661     """Locking for PowercycleNode.
2662
2663     This is a last-resource option and shouldn't block on other
2664     jobs. Therefore, we grab no locks.
2665
2666     """
2667     self.needed_locks = {}
2668
2669   def CheckPrereq(self):
2670     """Check prerequisites.
2671
2672     This LU has no prereqs.
2673
2674     """
2675     pass
2676
2677   def Exec(self, feedback_fn):
2678     """Reboots a node.
2679
2680     """
2681     result = self.rpc.call_node_powercycle(self.op.node_name,
2682                                            self.cfg.GetHypervisorType())
2683     result.Raise("Failed to schedule the reboot")
2684     return result.payload
2685
2686
2687 class LUQueryClusterInfo(NoHooksLU):
2688   """Query cluster configuration.
2689
2690   """
2691   _OP_REQP = []
2692   REQ_BGL = False
2693
2694   def ExpandNames(self):
2695     self.needed_locks = {}
2696
2697   def CheckPrereq(self):
2698     """No prerequsites needed for this LU.
2699
2700     """
2701     pass
2702
2703   def Exec(self, feedback_fn):
2704     """Return cluster config.
2705
2706     """
2707     cluster = self.cfg.GetClusterInfo()
2708     result = {
2709       "software_version": constants.RELEASE_VERSION,
2710       "protocol_version": constants.PROTOCOL_VERSION,
2711       "config_version": constants.CONFIG_VERSION,
2712       "os_api_version": max(constants.OS_API_VERSIONS),
2713       "export_version": constants.EXPORT_VERSION,
2714       "architecture": (platform.architecture()[0], platform.machine()),
2715       "name": cluster.cluster_name,
2716       "master": cluster.master_node,
2717       "default_hypervisor": cluster.enabled_hypervisors[0],
2718       "enabled_hypervisors": cluster.enabled_hypervisors,
2719       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2720                         for hypervisor_name in cluster.enabled_hypervisors]),
2721       "beparams": cluster.beparams,
2722       "nicparams": cluster.nicparams,
2723       "candidate_pool_size": cluster.candidate_pool_size,
2724       "master_netdev": cluster.master_netdev,
2725       "volume_group_name": cluster.volume_group_name,
2726       "file_storage_dir": cluster.file_storage_dir,
2727       }
2728
2729     return result
2730
2731
2732 class LUQueryConfigValues(NoHooksLU):
2733   """Return configuration values.
2734
2735   """
2736   _OP_REQP = []
2737   REQ_BGL = False
2738   _FIELDS_DYNAMIC = utils.FieldSet()
2739   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2740
2741   def ExpandNames(self):
2742     self.needed_locks = {}
2743
2744     _CheckOutputFields(static=self._FIELDS_STATIC,
2745                        dynamic=self._FIELDS_DYNAMIC,
2746                        selected=self.op.output_fields)
2747
2748   def CheckPrereq(self):
2749     """No prerequisites.
2750
2751     """
2752     pass
2753
2754   def Exec(self, feedback_fn):
2755     """Dump a representation of the cluster config to the standard output.
2756
2757     """
2758     values = []
2759     for field in self.op.output_fields:
2760       if field == "cluster_name":
2761         entry = self.cfg.GetClusterName()
2762       elif field == "master_node":
2763         entry = self.cfg.GetMasterNode()
2764       elif field == "drain_flag":
2765         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2766       else:
2767         raise errors.ParameterError(field)
2768       values.append(entry)
2769     return values
2770
2771
2772 class LUActivateInstanceDisks(NoHooksLU):
2773   """Bring up an instance's disks.
2774
2775   """
2776   _OP_REQP = ["instance_name"]
2777   REQ_BGL = False
2778
2779   def ExpandNames(self):
2780     self._ExpandAndLockInstance()
2781     self.needed_locks[locking.LEVEL_NODE] = []
2782     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2783
2784   def DeclareLocks(self, level):
2785     if level == locking.LEVEL_NODE:
2786       self._LockInstancesNodes()
2787
2788   def CheckPrereq(self):
2789     """Check prerequisites.
2790
2791     This checks that the instance is in the cluster.
2792
2793     """
2794     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2795     assert self.instance is not None, \
2796       "Cannot retrieve locked instance %s" % self.op.instance_name
2797     _CheckNodeOnline(self, self.instance.primary_node)
2798
2799   def Exec(self, feedback_fn):
2800     """Activate the disks.
2801
2802     """
2803     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2804     if not disks_ok:
2805       raise errors.OpExecError("Cannot activate block devices")
2806
2807     return disks_info
2808
2809
2810 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2811   """Prepare the block devices for an instance.
2812
2813   This sets up the block devices on all nodes.
2814
2815   @type lu: L{LogicalUnit}
2816   @param lu: the logical unit on whose behalf we execute
2817   @type instance: L{objects.Instance}
2818   @param instance: the instance for whose disks we assemble
2819   @type ignore_secondaries: boolean
2820   @param ignore_secondaries: if true, errors on secondary nodes
2821       won't result in an error return from the function
2822   @return: False if the operation failed, otherwise a list of
2823       (host, instance_visible_name, node_visible_name)
2824       with the mapping from node devices to instance devices
2825
2826   """
2827   device_info = []
2828   disks_ok = True
2829   iname = instance.name
2830   # With the two passes mechanism we try to reduce the window of
2831   # opportunity for the race condition of switching DRBD to primary
2832   # before handshaking occured, but we do not eliminate it
2833
2834   # The proper fix would be to wait (with some limits) until the
2835   # connection has been made and drbd transitions from WFConnection
2836   # into any other network-connected state (Connected, SyncTarget,
2837   # SyncSource, etc.)
2838
2839   # 1st pass, assemble on all nodes in secondary mode
2840   for inst_disk in instance.disks:
2841     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2842       lu.cfg.SetDiskID(node_disk, node)
2843       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2844       msg = result.fail_msg
2845       if msg:
2846         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2847                            " (is_primary=False, pass=1): %s",
2848                            inst_disk.iv_name, node, msg)
2849         if not ignore_secondaries:
2850           disks_ok = False
2851
2852   # FIXME: race condition on drbd migration to primary
2853
2854   # 2nd pass, do only the primary node
2855   for inst_disk in instance.disks:
2856     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2857       if node != instance.primary_node:
2858         continue
2859       lu.cfg.SetDiskID(node_disk, node)
2860       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2861       msg = result.fail_msg
2862       if msg:
2863         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2864                            " (is_primary=True, pass=2): %s",
2865                            inst_disk.iv_name, node, msg)
2866         disks_ok = False
2867     device_info.append((instance.primary_node, inst_disk.iv_name,
2868                         result.payload))
2869
2870   # leave the disks configured for the primary node
2871   # this is a workaround that would be fixed better by
2872   # improving the logical/physical id handling
2873   for disk in instance.disks:
2874     lu.cfg.SetDiskID(disk, instance.primary_node)
2875
2876   return disks_ok, device_info
2877
2878
2879 def _StartInstanceDisks(lu, instance, force):
2880   """Start the disks of an instance.
2881
2882   """
2883   disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2884                                            ignore_secondaries=force)
2885   if not disks_ok:
2886     _ShutdownInstanceDisks(lu, instance)
2887     if force is not None and not force:
2888       lu.proc.LogWarning("", hint="If the message above refers to a"
2889                          " secondary node,"
2890                          " you can retry the operation using '--force'.")
2891     raise errors.OpExecError("Disk consistency error")
2892
2893
2894 class LUDeactivateInstanceDisks(NoHooksLU):
2895   """Shutdown an instance's disks.
2896
2897   """
2898   _OP_REQP = ["instance_name"]
2899   REQ_BGL = False
2900
2901   def ExpandNames(self):
2902     self._ExpandAndLockInstance()
2903     self.needed_locks[locking.LEVEL_NODE] = []
2904     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2905
2906   def DeclareLocks(self, level):
2907     if level == locking.LEVEL_NODE:
2908       self._LockInstancesNodes()
2909
2910   def CheckPrereq(self):
2911     """Check prerequisites.
2912
2913     This checks that the instance is in the cluster.
2914
2915     """
2916     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2917     assert self.instance is not None, \
2918       "Cannot retrieve locked instance %s" % self.op.instance_name
2919
2920   def Exec(self, feedback_fn):
2921     """Deactivate the disks
2922
2923     """
2924     instance = self.instance
2925     _SafeShutdownInstanceDisks(self, instance)
2926
2927
2928 def _SafeShutdownInstanceDisks(lu, instance):
2929   """Shutdown block devices of an instance.
2930
2931   This function checks if an instance is running, before calling
2932   _ShutdownInstanceDisks.
2933
2934   """
2935   pnode = instance.primary_node
2936   ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2937   ins_l.Raise("Can't contact node %s" % pnode)
2938
2939   if instance.name in ins_l.payload:
2940     raise errors.OpExecError("Instance is running, can't shutdown"
2941                              " block devices.")
2942
2943   _ShutdownInstanceDisks(lu, instance)
2944
2945
2946 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2947   """Shutdown block devices of an instance.
2948
2949   This does the shutdown on all nodes of the instance.
2950
2951   If the ignore_primary is false, errors on the primary node are
2952   ignored.
2953
2954   """
2955   all_result = True
2956   for disk in instance.disks:
2957     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2958       lu.cfg.SetDiskID(top_disk, node)
2959       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2960       msg = result.fail_msg
2961       if msg:
2962         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2963                       disk.iv_name, node, msg)
2964         if not ignore_primary or node != instance.primary_node:
2965           all_result = False
2966   return all_result
2967
2968
2969 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2970   """Checks if a node has enough free memory.
2971
2972   This function check if a given node has the needed amount of free
2973   memory. In case the node has less memory or we cannot get the
2974   information from the node, this function raise an OpPrereqError
2975   exception.
2976
2977   @type lu: C{LogicalUnit}
2978   @param lu: a logical unit from which we get configuration data
2979   @type node: C{str}
2980   @param node: the node to check
2981   @type reason: C{str}
2982   @param reason: string to use in the error message
2983   @type requested: C{int}
2984   @param requested: the amount of memory in MiB to check for
2985   @type hypervisor_name: C{str}
2986   @param hypervisor_name: the hypervisor to ask for memory stats
2987   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2988       we cannot check the node
2989
2990   """
2991   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2992   nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2993   free_mem = nodeinfo[node].payload.get('memory_free', None)
2994   if not isinstance(free_mem, int):
2995     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2996                                " was '%s'" % (node, free_mem))
2997   if requested > free_mem:
2998     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2999                                " needed %s MiB, available %s MiB" %
3000                                (node, reason, requested, free_mem))
3001
3002
3003 class LUStartupInstance(LogicalUnit):
3004   """Starts an instance.
3005
3006   """
3007   HPATH = "instance-start"
3008   HTYPE = constants.HTYPE_INSTANCE
3009   _OP_REQP = ["instance_name", "force"]
3010   REQ_BGL = False
3011
3012   def ExpandNames(self):
3013     self._ExpandAndLockInstance()
3014
3015   def BuildHooksEnv(self):
3016     """Build hooks env.
3017
3018     This runs on master, primary and secondary nodes of the instance.
3019
3020     """
3021     env = {
3022       "FORCE": self.op.force,
3023       }
3024     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3025     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3026     return env, nl, nl
3027
3028   def CheckPrereq(self):
3029     """Check prerequisites.
3030
3031     This checks that the instance is in the cluster.
3032
3033     """
3034     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3035     assert self.instance is not None, \
3036       "Cannot retrieve locked instance %s" % self.op.instance_name
3037
3038     # extra beparams
3039     self.beparams = getattr(self.op, "beparams", {})
3040     if self.beparams:
3041       if not isinstance(self.beparams, dict):
3042         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3043                                    " dict" % (type(self.beparams), ))
3044       # fill the beparams dict
3045       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3046       self.op.beparams = self.beparams
3047
3048     # extra hvparams
3049     self.hvparams = getattr(self.op, "hvparams", {})
3050     if self.hvparams:
3051       if not isinstance(self.hvparams, dict):
3052         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3053                                    " dict" % (type(self.hvparams), ))
3054
3055       # check hypervisor parameter syntax (locally)
3056       cluster = self.cfg.GetClusterInfo()
3057       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3058       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3059                                     instance.hvparams)
3060       filled_hvp.update(self.hvparams)
3061       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3062       hv_type.CheckParameterSyntax(filled_hvp)
3063       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3064       self.op.hvparams = self.hvparams
3065
3066     _CheckNodeOnline(self, instance.primary_node)
3067
3068     bep = self.cfg.GetClusterInfo().FillBE(instance)
3069     # check bridges existence
3070     _CheckInstanceBridgesExist(self, instance)
3071
3072     remote_info = self.rpc.call_instance_info(instance.primary_node,
3073                                               instance.name,
3074                                               instance.hypervisor)
3075     remote_info.Raise("Error checking node %s" % instance.primary_node,
3076                       prereq=True)
3077     if not remote_info.payload: # not running already
3078       _CheckNodeFreeMemory(self, instance.primary_node,
3079                            "starting instance %s" % instance.name,
3080                            bep[constants.BE_MEMORY], instance.hypervisor)
3081
3082   def Exec(self, feedback_fn):
3083     """Start the instance.
3084
3085     """
3086     instance = self.instance
3087     force = self.op.force
3088
3089     self.cfg.MarkInstanceUp(instance.name)
3090
3091     node_current = instance.primary_node
3092
3093     _StartInstanceDisks(self, instance, force)
3094
3095     result = self.rpc.call_instance_start(node_current, instance,
3096                                           self.hvparams, self.beparams)
3097     msg = result.fail_msg
3098     if msg:
3099       _ShutdownInstanceDisks(self, instance)
3100       raise errors.OpExecError("Could not start instance: %s" % msg)
3101
3102
3103 class LURebootInstance(LogicalUnit):
3104   """Reboot an instance.
3105
3106   """
3107   HPATH = "instance-reboot"
3108   HTYPE = constants.HTYPE_INSTANCE
3109   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3110   REQ_BGL = False
3111
3112   def ExpandNames(self):
3113     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3114                                    constants.INSTANCE_REBOOT_HARD,
3115                                    constants.INSTANCE_REBOOT_FULL]:
3116       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3117                                   (constants.INSTANCE_REBOOT_SOFT,
3118                                    constants.INSTANCE_REBOOT_HARD,
3119                                    constants.INSTANCE_REBOOT_FULL))
3120     self._ExpandAndLockInstance()
3121
3122   def BuildHooksEnv(self):
3123     """Build hooks env.
3124
3125     This runs on master, primary and secondary nodes of the instance.
3126
3127     """
3128     env = {
3129       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3130       "REBOOT_TYPE": self.op.reboot_type,
3131       }
3132     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3133     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3134     return env, nl, nl
3135
3136   def CheckPrereq(self):
3137     """Check prerequisites.
3138
3139     This checks that the instance is in the cluster.
3140
3141     """
3142     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3143     assert self.instance is not None, \
3144       "Cannot retrieve locked instance %s" % self.op.instance_name
3145
3146     _CheckNodeOnline(self, instance.primary_node)
3147
3148     # check bridges existence
3149     _CheckInstanceBridgesExist(self, instance)
3150
3151   def Exec(self, feedback_fn):
3152     """Reboot the instance.
3153
3154     """
3155     instance = self.instance
3156     ignore_secondaries = self.op.ignore_secondaries
3157     reboot_type = self.op.reboot_type
3158
3159     node_current = instance.primary_node
3160
3161     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3162                        constants.INSTANCE_REBOOT_HARD]:
3163       for disk in instance.disks:
3164         self.cfg.SetDiskID(disk, node_current)
3165       result = self.rpc.call_instance_reboot(node_current, instance,
3166                                              reboot_type)
3167       result.Raise("Could not reboot instance")
3168     else:
3169       result = self.rpc.call_instance_shutdown(node_current, instance)
3170       result.Raise("Could not shutdown instance for full reboot")
3171       _ShutdownInstanceDisks(self, instance)
3172       _StartInstanceDisks(self, instance, ignore_secondaries)
3173       result = self.rpc.call_instance_start(node_current, instance, None, None)
3174       msg = result.fail_msg
3175       if msg:
3176         _ShutdownInstanceDisks(self, instance)
3177         raise errors.OpExecError("Could not start instance for"
3178                                  " full reboot: %s" % msg)
3179
3180     self.cfg.MarkInstanceUp(instance.name)
3181
3182
3183 class LUShutdownInstance(LogicalUnit):
3184   """Shutdown an instance.
3185
3186   """
3187   HPATH = "instance-stop"
3188   HTYPE = constants.HTYPE_INSTANCE
3189   _OP_REQP = ["instance_name"]
3190   REQ_BGL = False
3191
3192   def ExpandNames(self):
3193     self._ExpandAndLockInstance()
3194
3195   def BuildHooksEnv(self):
3196     """Build hooks env.
3197
3198     This runs on master, primary and secondary nodes of the instance.
3199
3200     """
3201     env = _BuildInstanceHookEnvByObject(self, self.instance)
3202     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3203     return env, nl, nl
3204
3205   def CheckPrereq(self):
3206     """Check prerequisites.
3207
3208     This checks that the instance is in the cluster.
3209
3210     """
3211     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3212     assert self.instance is not None, \
3213       "Cannot retrieve locked instance %s" % self.op.instance_name
3214     _CheckNodeOnline(self, self.instance.primary_node)
3215
3216   def Exec(self, feedback_fn):
3217     """Shutdown the instance.
3218
3219     """
3220     instance = self.instance
3221     node_current = instance.primary_node
3222     self.cfg.MarkInstanceDown(instance.name)
3223     result = self.rpc.call_instance_shutdown(node_current, instance)
3224     msg = result.fail_msg
3225     if msg:
3226       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3227
3228     _ShutdownInstanceDisks(self, instance)
3229
3230
3231 class LUReinstallInstance(LogicalUnit):
3232   """Reinstall an instance.
3233
3234   """
3235   HPATH = "instance-reinstall"
3236   HTYPE = constants.HTYPE_INSTANCE
3237   _OP_REQP = ["instance_name"]
3238   REQ_BGL = False
3239
3240   def ExpandNames(self):
3241     self._ExpandAndLockInstance()
3242
3243   def BuildHooksEnv(self):
3244     """Build hooks env.
3245
3246     This runs on master, primary and secondary nodes of the instance.
3247
3248     """
3249     env = _BuildInstanceHookEnvByObject(self, self.instance)
3250     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3251     return env, nl, nl
3252
3253   def CheckPrereq(self):
3254     """Check prerequisites.
3255
3256     This checks that the instance is in the cluster and is not running.
3257
3258     """
3259     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3260     assert instance is not None, \
3261       "Cannot retrieve locked instance %s" % self.op.instance_name
3262     _CheckNodeOnline(self, instance.primary_node)
3263
3264     if instance.disk_template == constants.DT_DISKLESS:
3265       raise errors.OpPrereqError("Instance '%s' has no disks" %
3266                                  self.op.instance_name)
3267     if instance.admin_up:
3268       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3269                                  self.op.instance_name)
3270     remote_info = self.rpc.call_instance_info(instance.primary_node,
3271                                               instance.name,
3272                                               instance.hypervisor)
3273     remote_info.Raise("Error checking node %s" % instance.primary_node,
3274                       prereq=True)
3275     if remote_info.payload:
3276       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3277                                  (self.op.instance_name,
3278                                   instance.primary_node))
3279
3280     self.op.os_type = getattr(self.op, "os_type", None)
3281     if self.op.os_type is not None:
3282       # OS verification
3283       pnode = self.cfg.GetNodeInfo(
3284         self.cfg.ExpandNodeName(instance.primary_node))
3285       if pnode is None:
3286         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3287                                    self.op.pnode)
3288       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3289       result.Raise("OS '%s' not in supported OS list for primary node %s" %
3290                    (self.op.os_type, pnode.name), prereq=True)
3291
3292     self.instance = instance
3293
3294   def Exec(self, feedback_fn):
3295     """Reinstall the instance.
3296
3297     """
3298     inst = self.instance
3299
3300     if self.op.os_type is not None:
3301       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3302       inst.os = self.op.os_type
3303       self.cfg.Update(inst)
3304
3305     _StartInstanceDisks(self, inst, None)
3306     try:
3307       feedback_fn("Running the instance OS create scripts...")
3308       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3309       result.Raise("Could not install OS for instance %s on node %s" %
3310                    (inst.name, inst.primary_node))
3311     finally:
3312       _ShutdownInstanceDisks(self, inst)
3313
3314
3315 class LURenameInstance(LogicalUnit):
3316   """Rename an instance.
3317
3318   """
3319   HPATH = "instance-rename"
3320   HTYPE = constants.HTYPE_INSTANCE
3321   _OP_REQP = ["instance_name", "new_name"]
3322
3323   def BuildHooksEnv(self):
3324     """Build hooks env.
3325
3326     This runs on master, primary and secondary nodes of the instance.
3327
3328     """
3329     env = _BuildInstanceHookEnvByObject(self, self.instance)
3330     env["INSTANCE_NEW_NAME"] = self.op.new_name
3331     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3332     return env, nl, nl
3333
3334   def CheckPrereq(self):
3335     """Check prerequisites.
3336
3337     This checks that the instance is in the cluster and is not running.
3338
3339     """
3340     instance = self.cfg.GetInstanceInfo(
3341       self.cfg.ExpandInstanceName(self.op.instance_name))
3342     if instance is None:
3343       raise errors.OpPrereqError("Instance '%s' not known" %
3344                                  self.op.instance_name)
3345     _CheckNodeOnline(self, instance.primary_node)
3346
3347     if instance.admin_up:
3348       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3349                                  self.op.instance_name)
3350     remote_info = self.rpc.call_instance_info(instance.primary_node,
3351                                               instance.name,
3352                                               instance.hypervisor)
3353     remote_info.Raise("Error checking node %s" % instance.primary_node,
3354                       prereq=True)
3355     if remote_info.payload:
3356       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3357                                  (self.op.instance_name,
3358                                   instance.primary_node))
3359     self.instance = instance
3360
3361     # new name verification
3362     name_info = utils.HostInfo(self.op.new_name)
3363
3364     self.op.new_name = new_name = name_info.name
3365     instance_list = self.cfg.GetInstanceList()
3366     if new_name in instance_list:
3367       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3368                                  new_name)
3369
3370     if not getattr(self.op, "ignore_ip", False):
3371       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3372         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3373                                    (name_info.ip, new_name))
3374
3375
3376   def Exec(self, feedback_fn):
3377     """Reinstall the instance.
3378
3379     """
3380     inst = self.instance
3381     old_name = inst.name
3382
3383     if inst.disk_template == constants.DT_FILE:
3384       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3385
3386     self.cfg.RenameInstance(inst.name, self.op.new_name)
3387     # Change the instance lock. This is definitely safe while we hold the BGL
3388     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3389     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3390
3391     # re-read the instance from the configuration after rename
3392     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3393
3394     if inst.disk_template == constants.DT_FILE:
3395       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3396       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3397                                                      old_file_storage_dir,
3398                                                      new_file_storage_dir)
3399       result.Raise("Could not rename on node %s directory '%s' to '%s'"
3400                    " (but the instance has been renamed in Ganeti)" %
3401                    (inst.primary_node, old_file_storage_dir,
3402                     new_file_storage_dir))
3403
3404     _StartInstanceDisks(self, inst, None)
3405     try:
3406       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3407                                                  old_name)
3408       msg = result.fail_msg
3409       if msg:
3410         msg = ("Could not run OS rename script for instance %s on node %s"
3411                " (but the instance has been renamed in Ganeti): %s" %
3412                (inst.name, inst.primary_node, msg))
3413         self.proc.LogWarning(msg)
3414     finally:
3415       _ShutdownInstanceDisks(self, inst)
3416
3417
3418 class LURemoveInstance(LogicalUnit):
3419   """Remove an instance.
3420
3421   """
3422   HPATH = "instance-remove"
3423   HTYPE = constants.HTYPE_INSTANCE
3424   _OP_REQP = ["instance_name", "ignore_failures"]
3425   REQ_BGL = False
3426
3427   def ExpandNames(self):
3428     self._ExpandAndLockInstance()
3429     self.needed_locks[locking.LEVEL_NODE] = []
3430     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3431
3432   def DeclareLocks(self, level):
3433     if level == locking.LEVEL_NODE:
3434       self._LockInstancesNodes()
3435
3436   def BuildHooksEnv(self):
3437     """Build hooks env.
3438
3439     This runs on master, primary and secondary nodes of the instance.
3440
3441     """
3442     env = _BuildInstanceHookEnvByObject(self, self.instance)
3443     nl = [self.cfg.GetMasterNode()]
3444     return env, nl, nl
3445
3446   def CheckPrereq(self):
3447     """Check prerequisites.
3448
3449     This checks that the instance is in the cluster.
3450
3451     """
3452     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3453     assert self.instance is not None, \
3454       "Cannot retrieve locked instance %s" % self.op.instance_name
3455
3456   def Exec(self, feedback_fn):
3457     """Remove the instance.
3458
3459     """
3460     instance = self.instance
3461     logging.info("Shutting down instance %s on node %s",
3462                  instance.name, instance.primary_node)
3463
3464     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3465     msg = result.fail_msg
3466     if msg:
3467       if self.op.ignore_failures:
3468         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3469       else:
3470         raise errors.OpExecError("Could not shutdown instance %s on"
3471                                  " node %s: %s" %
3472                                  (instance.name, instance.primary_node, msg))
3473
3474     logging.info("Removing block devices for instance %s", instance.name)
3475
3476     if not _RemoveDisks(self, instance):
3477       if self.op.ignore_failures:
3478         feedback_fn("Warning: can't remove instance's disks")
3479       else:
3480         raise errors.OpExecError("Can't remove instance's disks")
3481
3482     logging.info("Removing instance %s out of cluster config", instance.name)
3483
3484     self.cfg.RemoveInstance(instance.name)
3485     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3486
3487
3488 class LUQueryInstances(NoHooksLU):
3489   """Logical unit for querying instances.
3490
3491   """
3492   _OP_REQP = ["output_fields", "names", "use_locking"]
3493   REQ_BGL = False
3494   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3495                                     "admin_state",
3496                                     "disk_template", "ip", "mac", "bridge",
3497                                     "nic_mode", "nic_link",
3498                                     "sda_size", "sdb_size", "vcpus", "tags",
3499                                     "network_port", "beparams",
3500                                     r"(disk)\.(size)/([0-9]+)",
3501                                     r"(disk)\.(sizes)", "disk_usage",
3502                                     r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3503                                     r"(nic)\.(bridge)/([0-9]+)",
3504                                     r"(nic)\.(macs|ips|modes|links|bridges)",
3505                                     r"(disk|nic)\.(count)",
3506                                     "serial_no", "hypervisor", "hvparams",] +
3507                                   ["hv/%s" % name
3508                                    for name in constants.HVS_PARAMETERS] +
3509                                   ["be/%s" % name
3510                                    for name in constants.BES_PARAMETERS])
3511   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3512
3513
3514   def ExpandNames(self):
3515     _CheckOutputFields(static=self._FIELDS_STATIC,
3516                        dynamic=self._FIELDS_DYNAMIC,
3517                        selected=self.op.output_fields)
3518
3519     self.needed_locks = {}
3520     self.share_locks[locking.LEVEL_INSTANCE] = 1
3521     self.share_locks[locking.LEVEL_NODE] = 1
3522
3523     if self.op.names:
3524       self.wanted = _GetWantedInstances(self, self.op.names)
3525     else:
3526       self.wanted = locking.ALL_SET
3527
3528     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3529     self.do_locking = self.do_node_query and self.op.use_locking
3530     if self.do_locking:
3531       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3532       self.needed_locks[locking.LEVEL_NODE] = []
3533       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3534
3535   def DeclareLocks(self, level):
3536     if level == locking.LEVEL_NODE and self.do_locking:
3537       self._LockInstancesNodes()
3538
3539   def CheckPrereq(self):
3540     """Check prerequisites.
3541
3542     """
3543     pass
3544
3545   def Exec(self, feedback_fn):
3546     """Computes the list of nodes and their attributes.
3547
3548     """
3549     all_info = self.cfg.GetAllInstancesInfo()
3550     if self.wanted == locking.ALL_SET:
3551       # caller didn't specify instance names, so ordering is not important
3552       if self.do_locking:
3553         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3554       else:
3555         instance_names = all_info.keys()
3556       instance_names = utils.NiceSort(instance_names)
3557     else:
3558       # caller did specify names, so we must keep the ordering
3559       if self.do_locking:
3560         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3561       else:
3562         tgt_set = all_info.keys()
3563       missing = set(self.wanted).difference(tgt_set)
3564       if missing:
3565         raise errors.OpExecError("Some instances were removed before"
3566                                  " retrieving their data: %s" % missing)
3567       instance_names = self.wanted
3568
3569     instance_list = [all_info[iname] for iname in instance_names]
3570
3571     # begin data gathering
3572
3573     nodes = frozenset([inst.primary_node for inst in instance_list])
3574     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3575
3576     bad_nodes = []
3577     off_nodes = []
3578     if self.do_node_query:
3579       live_data = {}
3580       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3581       for name in nodes:
3582         result = node_data[name]
3583         if result.offline:
3584           # offline nodes will be in both lists
3585           off_nodes.append(name)
3586         if result.failed or result.fail_msg:
3587           bad_nodes.append(name)
3588         else:
3589           if result.payload:
3590             live_data.update(result.payload)
3591           # else no instance is alive
3592     else:
3593       live_data = dict([(name, {}) for name in instance_names])
3594
3595     # end data gathering
3596
3597     HVPREFIX = "hv/"
3598     BEPREFIX = "be/"
3599     output = []
3600     cluster = self.cfg.GetClusterInfo()
3601     for instance in instance_list:
3602       iout = []
3603       i_hv = cluster.FillHV(instance)
3604       i_be = cluster.FillBE(instance)
3605       i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
3606                                  nic.nicparams) for nic in instance.nics]
3607       for field in self.op.output_fields:
3608         st_match = self._FIELDS_STATIC.Matches(field)
3609         if field == "name":
3610           val = instance.name
3611         elif field == "os":
3612           val = instance.os
3613         elif field == "pnode":
3614           val = instance.primary_node
3615         elif field == "snodes":
3616           val = list(instance.secondary_nodes)
3617         elif field == "admin_state":
3618           val = instance.admin_up
3619         elif field == "oper_state":
3620           if instance.primary_node in bad_nodes:
3621             val = None
3622           else:
3623             val = bool(live_data.get(instance.name))
3624         elif field == "status":
3625           if instance.primary_node in off_nodes:
3626             val = "ERROR_nodeoffline"
3627           elif instance.primary_node in bad_nodes:
3628             val = "ERROR_nodedown"
3629           else:
3630             running = bool(live_data.get(instance.name))
3631             if running:
3632               if instance.admin_up:
3633                 val = "running"
3634               else:
3635                 val = "ERROR_up"
3636             else:
3637               if instance.admin_up:
3638                 val = "ERROR_down"
3639               else:
3640                 val = "ADMIN_down"
3641         elif field == "oper_ram":
3642           if instance.primary_node in bad_nodes:
3643             val = None
3644           elif instance.name in live_data:
3645             val = live_data[instance.name].get("memory", "?")
3646           else:
3647             val = "-"
3648         elif field == "vcpus":
3649           val = i_be[constants.BE_VCPUS]
3650         elif field == "disk_template":
3651           val = instance.disk_template
3652         elif field == "ip":
3653           if instance.nics:
3654             val = instance.nics[0].ip
3655           else:
3656             val = None
3657         elif field == "nic_mode":
3658           if instance.nics:
3659             val = i_nicp[0][constants.NIC_MODE]
3660           else:
3661             val = None
3662         elif field == "nic_link":
3663           if instance.nics:
3664             val = i_nicp[0][constants.NIC_LINK]
3665           else:
3666             val = None
3667         elif field == "bridge":
3668           if (instance.nics and
3669               i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
3670             val = i_nicp[0][constants.NIC_LINK]
3671           else:
3672             val = None
3673         elif field == "mac":
3674           if instance.nics:
3675             val = instance.nics[0].mac
3676           else:
3677             val = None
3678         elif field == "sda_size" or field == "sdb_size":
3679           idx = ord(field[2]) - ord('a')
3680           try:
3681             val = instance.FindDisk(idx).size
3682           except errors.OpPrereqError:
3683             val = None
3684         elif field == "disk_usage": # total disk usage per node
3685           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3686           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3687         elif field == "tags":
3688           val = list(instance.GetTags())
3689         elif field == "serial_no":
3690           val = instance.serial_no
3691         elif field == "network_port":
3692           val = instance.network_port
3693         elif field == "hypervisor":
3694           val = instance.hypervisor
3695         elif field == "hvparams":
3696           val = i_hv
3697         elif (field.startswith(HVPREFIX) and
3698               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3699           val = i_hv.get(field[len(HVPREFIX):], None)
3700         elif field == "beparams":
3701           val = i_be
3702         elif (field.startswith(BEPREFIX) and
3703               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3704           val = i_be.get(field[len(BEPREFIX):], None)
3705         elif st_match and st_match.groups():
3706           # matches a variable list
3707           st_groups = st_match.groups()
3708           if st_groups and st_groups[0] == "disk":
3709             if st_groups[1] == "count":
3710               val = len(instance.disks)
3711             elif st_groups[1] == "sizes":
3712               val = [disk.size for disk in instance.disks]
3713             elif st_groups[1] == "size":
3714               try:
3715                 val = instance.FindDisk(st_groups[2]).size
3716               except errors.OpPrereqError:
3717                 val = None
3718             else:
3719               assert False, "Unhandled disk parameter"
3720           elif st_groups[0] == "nic":
3721             if st_groups[1] == "count":
3722               val = len(instance.nics)
3723             elif st_groups[1] == "macs":
3724               val = [nic.mac for nic in instance.nics]
3725             elif st_groups[1] == "ips":
3726               val = [nic.ip for nic in instance.nics]
3727             elif st_groups[1] == "modes":
3728               val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
3729             elif st_groups[1] == "links":
3730               val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
3731             elif st_groups[1] == "bridges":
3732               val = []
3733               for nicp in i_nicp:
3734                 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3735                   val.append(nicp[constants.NIC_LINK])
3736                 else:
3737                   val.append(None)
3738             else:
3739               # index-based item
3740               nic_idx = int(st_groups[2])
3741               if nic_idx >= len(instance.nics):
3742                 val = None
3743               else:
3744                 if st_groups[1] == "mac":
3745                   val = instance.nics[nic_idx].mac
3746                 elif st_groups[1] == "ip":
3747                   val = instance.nics[nic_idx].ip
3748                 elif st_groups[1] == "mode":
3749                   val = i_nicp[nic_idx][constants.NIC_MODE]
3750                 elif st_groups[1] == "link":
3751                   val = i_nicp[nic_idx][constants.NIC_LINK]
3752                 elif st_groups[1] == "bridge":
3753                   nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
3754                   if nic_mode == constants.NIC_MODE_BRIDGED:
3755                     val = i_nicp[nic_idx][constants.NIC_LINK]
3756                   else:
3757                     val = None
3758                 else:
3759                   assert False, "Unhandled NIC parameter"
3760           else:
3761             assert False, ("Declared but unhandled variable parameter '%s'" %
3762                            field)
3763         else:
3764           assert False, "Declared but unhandled parameter '%s'" % field
3765         iout.append(val)
3766       output.append(iout)
3767
3768     return output
3769
3770
3771 class LUFailoverInstance(LogicalUnit):
3772   """Failover an instance.
3773
3774   """
3775   HPATH = "instance-failover"
3776   HTYPE = constants.HTYPE_INSTANCE
3777   _OP_REQP = ["instance_name", "ignore_consistency"]
3778   REQ_BGL = False
3779
3780   def ExpandNames(self):
3781     self._ExpandAndLockInstance()
3782     self.needed_locks[locking.LEVEL_NODE] = []
3783     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3784
3785   def DeclareLocks(self, level):
3786     if level == locking.LEVEL_NODE:
3787       self._LockInstancesNodes()
3788
3789   def BuildHooksEnv(self):
3790     """Build hooks env.
3791
3792     This runs on master, primary and secondary nodes of the instance.
3793
3794     """
3795     env = {
3796       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3797       }
3798     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3799     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3800     return env, nl, nl
3801
3802   def CheckPrereq(self):
3803     """Check prerequisites.
3804
3805     This checks that the instance is in the cluster.
3806
3807     """
3808     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3809     assert self.instance is not None, \
3810       "Cannot retrieve locked instance %s" % self.op.instance_name
3811
3812     bep = self.cfg.GetClusterInfo().FillBE(instance)
3813     if instance.disk_template not in constants.DTS_NET_MIRROR:
3814       raise errors.OpPrereqError("Instance's disk layout is not"
3815                                  " network mirrored, cannot failover.")
3816
3817     secondary_nodes = instance.secondary_nodes
3818     if not secondary_nodes:
3819       raise errors.ProgrammerError("no secondary node but using "
3820                                    "a mirrored disk template")
3821
3822     target_node = secondary_nodes[0]
3823     _CheckNodeOnline(self, target_node)
3824     _CheckNodeNotDrained(self, target_node)
3825     if instance.admin_up:
3826       # check memory requirements on the secondary node
3827       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3828                            instance.name, bep[constants.BE_MEMORY],
3829                            instance.hypervisor)
3830     else:
3831       self.LogInfo("Not checking memory on the secondary node as"
3832                    " instance will not be started")
3833
3834     # check bridge existance
3835     _CheckInstanceBridgesExist(self, instance, node=target_node)
3836
3837   def Exec(self, feedback_fn):
3838     """Failover an instance.
3839
3840     The failover is done by shutting it down on its present node and
3841     starting it on the secondary.
3842
3843     """
3844     instance = self.instance
3845
3846     source_node = instance.primary_node
3847     target_node = instance.secondary_nodes[0]
3848
3849     feedback_fn("* checking disk consistency between source and target")
3850     for dev in instance.disks:
3851       # for drbd, these are drbd over lvm
3852       if not _CheckDiskConsistency(self, dev, target_node, False):
3853         if instance.admin_up and not self.op.ignore_consistency:
3854           raise errors.OpExecError("Disk %s is degraded on target node,"
3855                                    " aborting failover." % dev.iv_name)
3856
3857     feedback_fn("* shutting down instance on source node")
3858     logging.info("Shutting down instance %s on node %s",
3859                  instance.name, source_node)
3860
3861     result = self.rpc.call_instance_shutdown(source_node, instance)
3862     msg = result.fail_msg
3863     if msg:
3864       if self.op.ignore_consistency:
3865         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3866                              " Proceeding anyway. Please make sure node"
3867                              " %s is down. Error details: %s",
3868                              instance.name, source_node, source_node, msg)
3869       else:
3870         raise errors.OpExecError("Could not shutdown instance %s on"
3871                                  " node %s: %s" %
3872                                  (instance.name, source_node, msg))
3873
3874     feedback_fn("* deactivating the instance's disks on source node")
3875     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3876       raise errors.OpExecError("Can't shut down the instance's disks.")
3877
3878     instance.primary_node = target_node
3879     # distribute new instance config to the other nodes
3880     self.cfg.Update(instance)
3881
3882     # Only start the instance if it's marked as up
3883     if instance.admin_up:
3884       feedback_fn("* activating the instance's disks on target node")
3885       logging.info("Starting instance %s on node %s",
3886                    instance.name, target_node)
3887
3888       disks_ok, _ = _AssembleInstanceDisks(self, instance,
3889                                                ignore_secondaries=True)
3890       if not disks_ok:
3891         _ShutdownInstanceDisks(self, instance)
3892         raise errors.OpExecError("Can't activate the instance's disks")
3893
3894       feedback_fn("* starting the instance on the target node")
3895       result = self.rpc.call_instance_start(target_node, instance, None, None)
3896       msg = result.fail_msg
3897       if msg:
3898         _ShutdownInstanceDisks(self, instance)
3899         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3900                                  (instance.name, target_node, msg))
3901
3902
3903 class LUMigrateInstance(LogicalUnit):
3904   """Migrate an instance.
3905
3906   This is migration without shutting down, compared to the failover,
3907   which is done with shutdown.
3908
3909   """
3910   HPATH = "instance-migrate"
3911   HTYPE = constants.HTYPE_INSTANCE
3912   _OP_REQP = ["instance_name", "live", "cleanup"]
3913
3914   REQ_BGL = False
3915
3916   def ExpandNames(self):
3917     self._ExpandAndLockInstance()
3918
3919     self.needed_locks[locking.LEVEL_NODE] = []
3920     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3921
3922     self._migrater = TLMigrateInstance(self, self.op.instance_name,
3923                                        self.op.live, self.op.cleanup)
3924     self.tasklets = [self._migrater]
3925
3926   def DeclareLocks(self, level):
3927     if level == locking.LEVEL_NODE:
3928       self._LockInstancesNodes()
3929
3930   def BuildHooksEnv(self):
3931     """Build hooks env.
3932
3933     This runs on master, primary and secondary nodes of the instance.
3934
3935     """
3936     instance = self._migrater.instance
3937     env = _BuildInstanceHookEnvByObject(self, instance)
3938     env["MIGRATE_LIVE"] = self.op.live
3939     env["MIGRATE_CLEANUP"] = self.op.cleanup
3940     nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
3941     return env, nl, nl
3942
3943
3944 class TLMigrateInstance(Tasklet):
3945   def __init__(self, lu, instance_name, live, cleanup):
3946     """Initializes this class.
3947
3948     """
3949     Tasklet.__init__(self, lu)
3950
3951     # Parameters
3952     self.instance_name = instance_name
3953     self.live = live
3954     self.cleanup = cleanup
3955
3956   def CheckPrereq(self):
3957     """Check prerequisites.
3958
3959     This checks that the instance is in the cluster.
3960
3961     """
3962     instance = self.cfg.GetInstanceInfo(
3963       self.cfg.ExpandInstanceName(self.instance_name))
3964     if instance is None:
3965       raise errors.OpPrereqError("Instance '%s' not known" %
3966                                  self.instance_name)
3967
3968     if instance.disk_template != constants.DT_DRBD8:
3969       raise errors.OpPrereqError("Instance's disk layout is not"
3970                                  " drbd8, cannot migrate.")
3971
3972     secondary_nodes = instance.secondary_nodes
3973     if not secondary_nodes:
3974       raise errors.ConfigurationError("No secondary node but using"
3975                                       " drbd8 disk template")
3976
3977     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3978
3979     target_node = secondary_nodes[0]
3980     # check memory requirements on the secondary node
3981     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3982                          instance.name, i_be[constants.BE_MEMORY],
3983                          instance.hypervisor)
3984
3985     # check bridge existance
3986     _CheckInstanceBridgesExist(self, instance, node=target_node)
3987
3988     if not self.cleanup:
3989       _CheckNodeNotDrained(self, target_node)
3990       result = self.rpc.call_instance_migratable(instance.primary_node,
3991                                                  instance)
3992       result.Raise("Can't migrate, please use failover", prereq=True)
3993
3994     self.instance = instance
3995
3996   def _WaitUntilSync(self):
3997     """Poll with custom rpc for disk sync.
3998
3999     This uses our own step-based rpc call.
4000
4001     """
4002     self.feedback_fn("* wait until resync is done")
4003     all_done = False
4004     while not all_done:
4005       all_done = True
4006       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4007                                             self.nodes_ip,
4008                                             self.instance.disks)
4009       min_percent = 100
4010       for node, nres in result.items():
4011         nres.Raise("Cannot resync disks on node %s" % node)
4012         node_done, node_percent = nres.payload
4013         all_done = all_done and node_done
4014         if node_percent is not None:
4015           min_percent = min(min_percent, node_percent)
4016       if not all_done:
4017         if min_percent < 100:
4018           self.feedback_fn("   - progress: %.1f%%" % min_percent)
4019         time.sleep(2)
4020
4021   def _EnsureSecondary(self, node):
4022     """Demote a node to secondary.
4023
4024     """
4025     self.feedback_fn("* switching node %s to secondary mode" % node)
4026
4027     for dev in self.instance.disks:
4028       self.cfg.SetDiskID(dev, node)
4029
4030     result = self.rpc.call_blockdev_close(node, self.instance.name,
4031                                           self.instance.disks)
4032     result.Raise("Cannot change disk to secondary on node %s" % node)
4033
4034   def _GoStandalone(self):
4035     """Disconnect from the network.
4036
4037     """
4038     self.feedback_fn("* changing into standalone mode")
4039     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4040                                                self.instance.disks)
4041     for node, nres in result.items():
4042       nres.Raise("Cannot disconnect disks node %s" % node)
4043
4044   def _GoReconnect(self, multimaster):
4045     """Reconnect to the network.
4046
4047     """
4048     if multimaster:
4049       msg = "dual-master"
4050     else:
4051       msg = "single-master"
4052     self.feedback_fn("* changing disks into %s mode" % msg)
4053     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4054                                            self.instance.disks,
4055                                            self.instance.name, multimaster)
4056     for node, nres in result.items():
4057       nres.Raise("Cannot change disks config on node %s" % node)
4058
4059   def _ExecCleanup(self):
4060     """Try to cleanup after a failed migration.
4061
4062     The cleanup is done by:
4063       - check that the instance is running only on one node
4064         (and update the config if needed)
4065       - change disks on its secondary node to secondary
4066       - wait until disks are fully synchronized
4067       - disconnect from the network
4068       - change disks into single-master mode
4069       - wait again until disks are fully synchronized
4070
4071     """
4072     instance = self.instance
4073     target_node = self.target_node
4074     source_node = self.source_node
4075
4076     # check running on only one node
4077     self.feedback_fn("* checking where the instance actually runs"
4078                      " (if this hangs, the hypervisor might be in"
4079                      " a bad state)")
4080     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4081     for node, result in ins_l.items():
4082       result.Raise("Can't contact node %s" % node)
4083
4084     runningon_source = instance.name in ins_l[source_node].payload
4085     runningon_target = instance.name in ins_l[target_node].payload
4086
4087     if runningon_source and runningon_target:
4088       raise errors.OpExecError("Instance seems to be running on two nodes,"
4089                                " or the hypervisor is confused. You will have"
4090                                " to ensure manually that it runs only on one"
4091                                " and restart this operation.")
4092
4093     if not (runningon_source or runningon_target):
4094       raise errors.OpExecError("Instance does not seem to be running at all."
4095                                " In this case, it's safer to repair by"
4096                                " running 'gnt-instance stop' to ensure disk"
4097                                " shutdown, and then restarting it.")
4098
4099     if runningon_target:
4100       # the migration has actually succeeded, we need to update the config
4101       self.feedback_fn("* instance running on secondary node (%s),"
4102                        " updating config" % target_node)
4103       instance.primary_node = target_node
4104       self.cfg.Update(instance)
4105       demoted_node = source_node
4106     else:
4107       self.feedback_fn("* instance confirmed to be running on its"
4108                        " primary node (%s)" % source_node)
4109       demoted_node = target_node
4110
4111     self._EnsureSecondary(demoted_node)
4112     try:
4113       self._WaitUntilSync()
4114     except errors.OpExecError:
4115       # we ignore here errors, since if the device is standalone, it
4116       # won't be able to sync
4117       pass
4118     self._GoStandalone()
4119     self._GoReconnect(False)
4120     self._WaitUntilSync()
4121
4122     self.feedback_fn("* done")
4123
4124   def _RevertDiskStatus(self):
4125     """Try to revert the disk status after a failed migration.
4126
4127     """
4128     target_node = self.target_node
4129     try:
4130       self._EnsureSecondary(target_node)
4131       self._GoStandalone()
4132       self._GoReconnect(False)
4133       self._WaitUntilSync()
4134     except errors.OpExecError, err:
4135       self.lu.LogWarning("Migration failed and I can't reconnect the"
4136                          " drives: error '%s'\n"
4137                          "Please look and recover the instance status" %
4138                          str(err))
4139
4140   def _AbortMigration(self):
4141     """Call the hypervisor code to abort a started migration.
4142
4143     """
4144     instance = self.instance
4145     target_node = self.target_node
4146     migration_info = self.migration_info
4147
4148     abort_result = self.rpc.call_finalize_migration(target_node,
4149                                                     instance,
4150                                                     migration_info,
4151                                                     False)
4152     abort_msg = abort_result.fail_msg
4153     if abort_msg:
4154       logging.error("Aborting migration failed on target node %s: %s" %
4155                     (target_node, abort_msg))
4156       # Don't raise an exception here, as we stil have to try to revert the
4157       # disk status, even if this step failed.
4158
4159   def _ExecMigration(self):
4160     """Migrate an instance.
4161
4162     The migrate is done by:
4163       - change the disks into dual-master mode
4164       - wait until disks are fully synchronized again
4165       - migrate the instance
4166       - change disks on the new secondary node (the old primary) to secondary
4167       - wait until disks are fully synchronized
4168       - change disks into single-master mode
4169
4170     """
4171     instance = self.instance
4172     target_node = self.target_node
4173     source_node = self.source_node
4174
4175     self.feedback_fn("* checking disk consistency between source and target")
4176     for dev in instance.disks:
4177       if not _CheckDiskConsistency(self, dev, target_node, False):
4178         raise errors.OpExecError("Disk %s is degraded or not fully"
4179                                  " synchronized on target node,"
4180                                  " aborting migrate." % dev.iv_name)
4181
4182     # First get the migration information from the remote node
4183     result = self.rpc.call_migration_info(source_node, instance)
4184     msg = result.fail_msg
4185     if msg:
4186       log_err = ("Failed fetching source migration information from %s: %s" %
4187                  (source_node, msg))
4188       logging.error(log_err)
4189       raise errors.OpExecError(log_err)
4190
4191     self.migration_info = migration_info = result.payload
4192
4193     # Then switch the disks to master/master mode
4194     self._EnsureSecondary(target_node)
4195     self._GoStandalone()
4196     self._GoReconnect(True)
4197     self._WaitUntilSync()
4198
4199     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4200     result = self.rpc.call_accept_instance(target_node,
4201                                            instance,
4202                                            migration_info,
4203                                            self.nodes_ip[target_node])
4204
4205     msg = result.fail_msg
4206     if msg:
4207       logging.error("Instance pre-migration failed, trying to revert"
4208                     " disk status: %s", msg)
4209       self._AbortMigration()
4210       self._RevertDiskStatus()
4211       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4212                                (instance.name, msg))
4213
4214     self.feedback_fn("* migrating instance to %s" % target_node)
4215     time.sleep(10)
4216     result = self.rpc.call_instance_migrate(source_node, instance,
4217                                             self.nodes_ip[target_node],
4218                                             self.live)
4219     msg = result.fail_msg
4220     if msg:
4221       logging.error("Instance migration failed, trying to revert"
4222                     " disk status: %s", msg)
4223       self._AbortMigration()
4224       self._RevertDiskStatus()
4225       raise errors.OpExecError("Could not migrate instance %s: %s" %
4226                                (instance.name, msg))
4227     time.sleep(10)
4228
4229     instance.primary_node = target_node
4230     # distribute new instance config to the other nodes
4231     self.cfg.Update(instance)
4232
4233     result = self.rpc.call_finalize_migration(target_node,
4234                                               instance,
4235                                               migration_info,
4236                                               True)
4237     msg = result.fail_msg
4238     if msg:
4239       logging.error("Instance migration succeeded, but finalization failed:"
4240                     " %s" % msg)
4241       raise errors.OpExecError("Could not finalize instance migration: %s" %
4242                                msg)
4243
4244     self._EnsureSecondary(source_node)
4245     self._WaitUntilSync()
4246     self._GoStandalone()
4247     self._GoReconnect(False)
4248     self._WaitUntilSync()
4249
4250     self.feedback_fn("* done")
4251
4252   def Exec(self, feedback_fn):
4253     """Perform the migration.
4254
4255     """
4256     self.feedback_fn = feedback_fn
4257
4258     self.source_node = self.instance.primary_node
4259     self.target_node = self.instance.secondary_nodes[0]
4260     self.all_nodes = [self.source_node, self.target_node]
4261     self.nodes_ip = {
4262       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4263       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4264       }
4265
4266     if self.cleanup:
4267       return self._ExecCleanup()
4268     else:
4269       return self._ExecMigration()
4270
4271
4272 def _CreateBlockDev(lu, node, instance, device, force_create,
4273                     info, force_open):
4274   """Create a tree of block devices on a given node.
4275
4276   If this device type has to be created on secondaries, create it and
4277   all its children.
4278
4279   If not, just recurse to children keeping the same 'force' value.
4280
4281   @param lu: the lu on whose behalf we execute
4282   @param node: the node on which to create the device
4283   @type instance: L{objects.Instance}
4284   @param instance: the instance which owns the device
4285   @type device: L{objects.Disk}
4286   @param device: the device to create
4287   @type force_create: boolean
4288   @param force_create: whether to force creation of this device; this
4289       will be change to True whenever we find a device which has
4290       CreateOnSecondary() attribute
4291   @param info: the extra 'metadata' we should attach to the device
4292       (this will be represented as a LVM tag)
4293   @type force_open: boolean
4294   @param force_open: this parameter will be passes to the
4295       L{backend.BlockdevCreate} function where it specifies
4296       whether we run on primary or not, and it affects both
4297       the child assembly and the device own Open() execution
4298
4299   """
4300   if device.CreateOnSecondary():
4301     force_create = True
4302
4303   if device.children:
4304     for child in device.children:
4305       _CreateBlockDev(lu, node, instance, child, force_create,
4306                       info, force_open)
4307
4308   if not force_create:
4309     return
4310
4311   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4312
4313
4314 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4315   """Create a single block device on a given node.
4316
4317   This will not recurse over children of the device, so they must be
4318   created in advance.
4319
4320   @param lu: the lu on whose behalf we execute
4321   @param node: the node on which to create the device
4322   @type instance: L{objects.Instance}
4323   @param instance: the instance which owns the device
4324   @type device: L{objects.Disk}
4325   @param device: the device to create
4326   @param info: the extra 'metadata' we should attach to the device
4327       (this will be represented as a LVM tag)
4328   @type force_open: boolean
4329   @param force_open: this parameter will be passes to the
4330       L{backend.BlockdevCreate} function where it specifies
4331       whether we run on primary or not, and it affects both
4332       the child assembly and the device own Open() execution
4333
4334   """
4335   lu.cfg.SetDiskID(device, node)
4336   result = lu.rpc.call_blockdev_create(node, device, device.size,
4337                                        instance.name, force_open, info)
4338   result.Raise("Can't create block device %s on"
4339                " node %s for instance %s" % (device, node, instance.name))
4340   if device.physical_id is None:
4341     device.physical_id = result.payload
4342
4343
4344 def _GenerateUniqueNames(lu, exts):
4345   """Generate a suitable LV name.
4346
4347   This will generate a logical volume name for the given instance.
4348
4349   """
4350   results = []
4351   for val in exts:
4352     new_id = lu.cfg.GenerateUniqueID()
4353     results.append("%s%s" % (new_id, val))
4354   return results
4355
4356
4357 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4358                          p_minor, s_minor):
4359   """Generate a drbd8 device complete with its children.
4360
4361   """
4362   port = lu.cfg.AllocatePort()
4363   vgname = lu.cfg.GetVGName()
4364   shared_secret = lu.cfg.GenerateDRBDSecret()
4365   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4366                           logical_id=(vgname, names[0]))
4367   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4368                           logical_id=(vgname, names[1]))
4369   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4370                           logical_id=(primary, secondary, port,
4371                                       p_minor, s_minor,
4372                                       shared_secret),
4373                           children=[dev_data, dev_meta],
4374                           iv_name=iv_name)
4375   return drbd_dev
4376
4377
4378 def _GenerateDiskTemplate(lu, template_name,
4379                           instance_name, primary_node,
4380                           secondary_nodes, disk_info,
4381                           file_storage_dir, file_driver,
4382                           base_index):
4383   """Generate the entire disk layout for a given template type.
4384
4385   """
4386   #TODO: compute space requirements
4387
4388   vgname = lu.cfg.GetVGName()
4389   disk_count = len(disk_info)
4390   disks = []
4391   if template_name == constants.DT_DISKLESS:
4392     pass
4393   elif template_name == constants.DT_PLAIN:
4394     if len(secondary_nodes) != 0:
4395       raise errors.ProgrammerError("Wrong template configuration")
4396
4397     names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4398                                       for i in range(disk_count)])
4399     for idx, disk in enumerate(disk_info):
4400       disk_index = idx + base_index
4401       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4402                               logical_id=(vgname, names[idx]),
4403                               iv_name="disk/%d" % disk_index,
4404                               mode=disk["mode"])
4405       disks.append(disk_dev)
4406   elif template_name == constants.DT_DRBD8:
4407     if len(secondary_nodes) != 1:
4408       raise errors.ProgrammerError("Wrong template configuration")
4409     remote_node = secondary_nodes[0]
4410     minors = lu.cfg.AllocateDRBDMinor(
4411       [primary_node, remote_node] * len(disk_info), instance_name)
4412
4413     names = []
4414     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4415                                                for i in range(disk_count)]):
4416       names.append(lv_prefix + "_data")
4417       names.append(lv_prefix + "_meta")
4418     for idx, disk in enumerate(disk_info):
4419       disk_index = idx + base_index
4420       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4421                                       disk["size"], names[idx*2:idx*2+2],
4422                                       "disk/%d" % disk_index,
4423                                       minors[idx*2], minors[idx*2+1])
4424       disk_dev.mode = disk["mode"]
4425       disks.append(disk_dev)
4426   elif template_name == constants.DT_FILE:
4427     if len(secondary_nodes) != 0:
4428       raise errors.ProgrammerError("Wrong template configuration")
4429
4430     for idx, disk in enumerate(disk_info):
4431       disk_index = idx + base_index
4432       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4433                               iv_name="disk/%d" % disk_index,
4434                               logical_id=(file_driver,
4435                                           "%s/disk%d" % (file_storage_dir,
4436                                                          disk_index)),
4437                               mode=disk["mode"])
4438       disks.append(disk_dev)
4439   else:
4440     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4441   return disks
4442
4443
4444 def _GetInstanceInfoText(instance):
4445   """Compute that text that should be added to the disk's metadata.
4446
4447   """
4448   return "originstname+%s" % instance.name
4449
4450
4451 def _CreateDisks(lu, instance):
4452   """Create all disks for an instance.
4453
4454   This abstracts away some work from AddInstance.
4455
4456   @type lu: L{LogicalUnit}
4457   @param lu: the logical unit on whose behalf we execute
4458   @type instance: L{objects.Instance}
4459   @param instance: the instance whose disks we should create
4460   @rtype: boolean
4461   @return: the success of the creation
4462
4463   """
4464   info = _GetInstanceInfoText(instance)
4465   pnode = instance.primary_node
4466
4467   if instance.disk_template == constants.DT_FILE:
4468     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4469     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4470
4471     result.Raise("Failed to create directory '%s' on"
4472                  " node %s: %s" % (file_storage_dir, pnode))
4473
4474   # Note: this needs to be kept in sync with adding of disks in
4475   # LUSetInstanceParams
4476   for device in instance.disks:
4477     logging.info("Creating volume %s for instance %s",
4478                  device.iv_name, instance.name)
4479     #HARDCODE
4480     for node in instance.all_nodes:
4481       f_create = node == pnode
4482       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4483
4484
4485 def _RemoveDisks(lu, instance):
4486   """Remove all disks for an instance.
4487
4488   This abstracts away some work from `AddInstance()` and
4489   `RemoveInstance()`. Note that in case some of the devices couldn't
4490   be removed, the removal will continue with the other ones (compare
4491   with `_CreateDisks()`).
4492
4493   @type lu: L{LogicalUnit}
4494   @param lu: the logical unit on whose behalf we execute
4495   @type instance: L{objects.Instance}
4496   @param instance: the instance whose disks we should remove
4497   @rtype: boolean
4498   @return: the success of the removal
4499
4500   """
4501   logging.info("Removing block devices for instance %s", instance.name)
4502
4503   all_result = True
4504   for device in instance.disks:
4505     for node, disk in device.ComputeNodeTree(instance.primary_node):
4506       lu.cfg.SetDiskID(disk, node)
4507       msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4508       if msg:
4509         lu.LogWarning("Could not remove block device %s on node %s,"
4510                       " continuing anyway: %s", device.iv_name, node, msg)
4511         all_result = False
4512
4513   if instance.disk_template == constants.DT_FILE:
4514     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4515     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4516                                                  file_storage_dir)
4517     msg = result.fail_msg
4518     if msg:
4519       lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4520                     file_storage_dir, instance.primary_node, msg)
4521       all_result = False
4522
4523   return all_result
4524
4525
4526 def _ComputeDiskSize(disk_template, disks):
4527   """Compute disk size requirements in the volume group
4528
4529   """
4530   # Required free disk space as a function of disk and swap space
4531   req_size_dict = {
4532     constants.DT_DISKLESS: None,
4533     constants.DT_PLAIN: sum(d["size"] for d in disks),
4534     # 128 MB are added for drbd metadata for each disk
4535     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4536     constants.DT_FILE: None,
4537   }
4538
4539   if disk_template not in req_size_dict:
4540     raise errors.ProgrammerError("Disk template '%s' size requirement"
4541                                  " is unknown" %  disk_template)
4542
4543   return req_size_dict[disk_template]
4544
4545
4546 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4547   """Hypervisor parameter validation.
4548
4549   This function abstract the hypervisor parameter validation to be
4550   used in both instance create and instance modify.
4551
4552   @type lu: L{LogicalUnit}
4553   @param lu: the logical unit for which we check
4554   @type nodenames: list
4555   @param nodenames: the list of nodes on which we should check
4556   @type hvname: string
4557   @param hvname: the name of the hypervisor we should use
4558   @type hvparams: dict
4559   @param hvparams: the parameters which we need to check
4560   @raise errors.OpPrereqError: if the parameters are not valid
4561
4562   """
4563   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4564                                                   hvname,
4565                                                   hvparams)
4566   for node in nodenames:
4567     info = hvinfo[node]
4568     if info.offline:
4569       continue
4570     info.Raise("Hypervisor parameter validation failed on node %s" % node)
4571
4572
4573 class LUCreateInstance(LogicalUnit):
4574   """Create an instance.
4575
4576   """
4577   HPATH = "instance-add"
4578   HTYPE = constants.HTYPE_INSTANCE
4579   _OP_REQP = ["instance_name", "disks", "disk_template",
4580               "mode", "start",
4581               "wait_for_sync", "ip_check", "nics",
4582               "hvparams", "beparams"]
4583   REQ_BGL = False
4584
4585   def _ExpandNode(self, node):
4586     """Expands and checks one node name.
4587
4588     """
4589     node_full = self.cfg.ExpandNodeName(node)
4590     if node_full is None:
4591       raise errors.OpPrereqError("Unknown node %s" % node)
4592     return node_full
4593
4594   def ExpandNames(self):
4595     """ExpandNames for CreateInstance.
4596
4597     Figure out the right locks for instance creation.
4598
4599     """
4600     self.needed_locks = {}
4601
4602     # set optional parameters to none if they don't exist
4603     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4604       if not hasattr(self.op, attr):
4605         setattr(self.op, attr, None)
4606
4607     # cheap checks, mostly valid constants given
4608
4609     # verify creation mode
4610     if self.op.mode not in (constants.INSTANCE_CREATE,
4611                             constants.INSTANCE_IMPORT):
4612       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4613                                  self.op.mode)
4614
4615     # disk template and mirror node verification
4616     if self.op.disk_template not in constants.DISK_TEMPLATES:
4617       raise errors.OpPrereqError("Invalid disk template name")
4618
4619     if self.op.hypervisor is None:
4620       self.op.hypervisor = self.cfg.GetHypervisorType()
4621
4622     cluster = self.cfg.GetClusterInfo()
4623     enabled_hvs = cluster.enabled_hypervisors
4624     if self.op.hypervisor not in enabled_hvs:
4625       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4626                                  " cluster (%s)" % (self.op.hypervisor,
4627                                   ",".join(enabled_hvs)))
4628
4629     # check hypervisor parameter syntax (locally)
4630     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4631     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4632                                   self.op.hvparams)
4633     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4634     hv_type.CheckParameterSyntax(filled_hvp)
4635     self.hv_full = filled_hvp
4636
4637     # fill and remember the beparams dict
4638     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4639     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4640                                     self.op.beparams)
4641
4642     #### instance parameters check
4643
4644     # instance name verification
4645     hostname1 = utils.HostInfo(self.op.instance_name)
4646     self.op.instance_name = instance_name = hostname1.name
4647
4648     # this is just a preventive check, but someone might still add this
4649     # instance in the meantime, and creation will fail at lock-add time
4650     if instance_name in self.cfg.GetInstanceList():
4651       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4652                                  instance_name)
4653
4654     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4655
4656     # NIC buildup
4657     self.nics = []
4658     for idx, nic in enumerate(self.op.nics):
4659       nic_mode_req = nic.get("mode", None)
4660       nic_mode = nic_mode_req
4661       if nic_mode is None:
4662         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4663
4664       # in routed mode, for the first nic, the default ip is 'auto'
4665       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4666         default_ip_mode = constants.VALUE_AUTO
4667       else:
4668         default_ip_mode = constants.VALUE_NONE
4669
4670       # ip validity checks
4671       ip = nic.get("ip", default_ip_mode)
4672       if ip is None or ip.lower() == constants.VALUE_NONE:
4673         nic_ip = None
4674       elif ip.lower() == constants.VALUE_AUTO:
4675         nic_ip = hostname1.ip
4676       else:
4677         if not utils.IsValidIP(ip):
4678           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4679                                      " like a valid IP" % ip)
4680         nic_ip = ip
4681
4682       # TODO: check the ip for uniqueness !!
4683       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4684         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4685
4686       # MAC address verification
4687       mac = nic.get("mac", constants.VALUE_AUTO)
4688       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4689         if not utils.IsValidMac(mac.lower()):
4690           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4691                                      mac)
4692       # bridge verification
4693       bridge = nic.get("bridge", None)
4694       link = nic.get("link", None)
4695       if bridge and link:
4696         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
4697                                    " at the same time")
4698       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4699         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4700       elif bridge:
4701         link = bridge
4702
4703       nicparams = {}
4704       if nic_mode_req:
4705         nicparams[constants.NIC_MODE] = nic_mode_req
4706       if link:
4707         nicparams[constants.NIC_LINK] = link
4708
4709       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4710                                       nicparams)
4711       objects.NIC.CheckParameterSyntax(check_params)
4712       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4713
4714     # disk checks/pre-build
4715     self.disks = []
4716     for disk in self.op.disks:
4717       mode = disk.get("mode", constants.DISK_RDWR)
4718       if mode not in constants.DISK_ACCESS_SET:
4719         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4720                                    mode)
4721       size = disk.get("size", None)
4722       if size is None:
4723         raise errors.OpPrereqError("Missing disk size")
4724       try:
4725         size = int(size)
4726       except ValueError:
4727         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4728       self.disks.append({"size": size, "mode": mode})
4729
4730     # used in CheckPrereq for ip ping check
4731     self.check_ip = hostname1.ip
4732
4733     # file storage checks
4734     if (self.op.file_driver and
4735         not self.op.file_driver in constants.FILE_DRIVER):
4736       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4737                                  self.op.file_driver)
4738
4739     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4740       raise errors.OpPrereqError("File storage directory path not absolute")
4741
4742     ### Node/iallocator related checks
4743     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4744       raise errors.OpPrereqError("One and only one of iallocator and primary"
4745                                  " node must be given")
4746
4747     if self.op.iallocator:
4748       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4749     else:
4750       self.op.pnode = self._ExpandNode(self.op.pnode)
4751       nodelist = [self.op.pnode]
4752       if self.op.snode is not None:
4753         self.op.snode = self._ExpandNode(self.op.snode)
4754         nodelist.append(self.op.snode)
4755       self.needed_locks[locking.LEVEL_NODE] = nodelist
4756
4757     # in case of import lock the source node too
4758     if self.op.mode == constants.INSTANCE_IMPORT:
4759       src_node = getattr(self.op, "src_node", None)
4760       src_path = getattr(self.op, "src_path", None)
4761
4762       if src_path is None:
4763         self.op.src_path = src_path = self.op.instance_name
4764
4765       if src_node is None:
4766         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4767         self.op.src_node = None
4768         if os.path.isabs(src_path):
4769           raise errors.OpPrereqError("Importing an instance from an absolute"
4770                                      " path requires a source node option.")
4771       else:
4772         self.op.src_node = src_node = self._ExpandNode(src_node)
4773         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4774           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4775         if not os.path.isabs(src_path):
4776           self.op.src_path = src_path = \
4777             os.path.join(constants.EXPORT_DIR, src_path)
4778
4779     else: # INSTANCE_CREATE
4780       if getattr(self.op, "os_type", None) is None:
4781         raise errors.OpPrereqError("No guest OS specified")
4782
4783   def _RunAllocator(self):
4784     """Run the allocator based on input opcode.
4785
4786     """
4787     nics = [n.ToDict() for n in self.nics]
4788     ial = IAllocator(self.cfg, self.rpc,
4789                      mode=constants.IALLOCATOR_MODE_ALLOC,
4790                      name=self.op.instance_name,
4791                      disk_template=self.op.disk_template,
4792                      tags=[],
4793                      os=self.op.os_type,
4794                      vcpus=self.be_full[constants.BE_VCPUS],
4795                      mem_size=self.be_full[constants.BE_MEMORY],
4796                      disks=self.disks,
4797                      nics=nics,
4798                      hypervisor=self.op.hypervisor,
4799                      )
4800
4801     ial.Run(self.op.iallocator)
4802
4803     if not ial.success:
4804       raise errors.OpPrereqError("Can't compute nodes using"
4805                                  " iallocator '%s': %s" % (self.op.iallocator,
4806                                                            ial.info))
4807     if len(ial.nodes) != ial.required_nodes:
4808       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4809                                  " of nodes (%s), required %s" %
4810                                  (self.op.iallocator, len(ial.nodes),
4811                                   ial.required_nodes))
4812     self.op.pnode = ial.nodes[0]
4813     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4814                  self.op.instance_name, self.op.iallocator,
4815                  ", ".join(ial.nodes))
4816     if ial.required_nodes == 2:
4817       self.op.snode = ial.nodes[1]
4818
4819   def BuildHooksEnv(self):
4820     """Build hooks env.
4821
4822     This runs on master, primary and secondary nodes of the instance.
4823
4824     """
4825     env = {
4826       "ADD_MODE": self.op.mode,
4827       }
4828     if self.op.mode == constants.INSTANCE_IMPORT:
4829       env["SRC_NODE"] = self.op.src_node
4830       env["SRC_PATH"] = self.op.src_path
4831       env["SRC_IMAGES"] = self.src_images
4832
4833     env.update(_BuildInstanceHookEnv(
4834       name=self.op.instance_name,
4835       primary_node=self.op.pnode,
4836       secondary_nodes=self.secondaries,
4837       status=self.op.start,
4838       os_type=self.op.os_type,
4839       memory=self.be_full[constants.BE_MEMORY],
4840       vcpus=self.be_full[constants.BE_VCPUS],
4841       nics=_NICListToTuple(self, self.nics),
4842       disk_template=self.op.disk_template,
4843       disks=[(d["size"], d["mode"]) for d in self.disks],
4844       bep=self.be_full,
4845       hvp=self.hv_full,
4846       hypervisor_name=self.op.hypervisor,
4847     ))
4848
4849     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4850           self.secondaries)
4851     return env, nl, nl
4852
4853
4854   def CheckPrereq(self):
4855     """Check prerequisites.
4856
4857     """
4858     if (not self.cfg.GetVGName() and
4859         self.op.disk_template not in constants.DTS_NOT_LVM):
4860       raise errors.OpPrereqError("Cluster does not support lvm-based"
4861                                  " instances")
4862
4863     if self.op.mode == constants.INSTANCE_IMPORT:
4864       src_node = self.op.src_node
4865       src_path = self.op.src_path
4866
4867       if src_node is None:
4868         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4869         exp_list = self.rpc.call_export_list(locked_nodes)
4870         found = False
4871         for node in exp_list:
4872           if exp_list[node].fail_msg:
4873             continue
4874           if src_path in exp_list[node].payload:
4875             found = True
4876             self.op.src_node = src_node = node
4877             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4878                                                        src_path)
4879             break
4880         if not found:
4881           raise errors.OpPrereqError("No export found for relative path %s" %
4882                                       src_path)
4883
4884       _CheckNodeOnline(self, src_node)
4885       result = self.rpc.call_export_info(src_node, src_path)
4886       result.Raise("No export or invalid export found in dir %s" % src_path)
4887
4888       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4889       if not export_info.has_section(constants.INISECT_EXP):
4890         raise errors.ProgrammerError("Corrupted export config")
4891
4892       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4893       if (int(ei_version) != constants.EXPORT_VERSION):
4894         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4895                                    (ei_version, constants.EXPORT_VERSION))
4896
4897       # Check that the new instance doesn't have less disks than the export
4898       instance_disks = len(self.disks)
4899       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4900       if instance_disks < export_disks:
4901         raise errors.OpPrereqError("Not enough disks to import."
4902                                    " (instance: %d, export: %d)" %
4903                                    (instance_disks, export_disks))
4904
4905       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4906       disk_images = []
4907       for idx in range(export_disks):
4908         option = 'disk%d_dump' % idx
4909         if export_info.has_option(constants.INISECT_INS, option):
4910           # FIXME: are the old os-es, disk sizes, etc. useful?
4911           export_name = export_info.get(constants.INISECT_INS, option)
4912           image = os.path.join(src_path, export_name)
4913           disk_images.append(image)
4914         else:
4915           disk_images.append(False)
4916
4917       self.src_images = disk_images
4918
4919       old_name = export_info.get(constants.INISECT_INS, 'name')
4920       # FIXME: int() here could throw a ValueError on broken exports
4921       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4922       if self.op.instance_name == old_name:
4923         for idx, nic in enumerate(self.nics):
4924           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4925             nic_mac_ini = 'nic%d_mac' % idx
4926             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4927
4928     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4929     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4930     if self.op.start and not self.op.ip_check:
4931       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4932                                  " adding an instance in start mode")
4933
4934     if self.op.ip_check:
4935       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4936         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4937                                    (self.check_ip, self.op.instance_name))
4938
4939     #### mac address generation
4940     # By generating here the mac address both the allocator and the hooks get
4941     # the real final mac address rather than the 'auto' or 'generate' value.
4942     # There is a race condition between the generation and the instance object
4943     # creation, which means that we know the mac is valid now, but we're not
4944     # sure it will be when we actually add the instance. If things go bad
4945     # adding the instance will abort because of a duplicate mac, and the
4946     # creation job will fail.
4947     for nic in self.nics:
4948       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4949         nic.mac = self.cfg.GenerateMAC()
4950
4951     #### allocator run
4952
4953     if self.op.iallocator is not None:
4954       self._RunAllocator()
4955
4956     #### node related checks
4957
4958     # check primary node
4959     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4960     assert self.pnode is not None, \
4961       "Cannot retrieve locked node %s" % self.op.pnode
4962     if pnode.offline:
4963       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4964                                  pnode.name)
4965     if pnode.drained:
4966       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4967                                  pnode.name)
4968
4969     self.secondaries = []
4970
4971     # mirror node verification
4972     if self.op.disk_template in constants.DTS_NET_MIRROR:
4973       if self.op.snode is None:
4974         raise errors.OpPrereqError("The networked disk templates need"
4975                                    " a mirror node")
4976       if self.op.snode == pnode.name:
4977         raise errors.OpPrereqError("The secondary node cannot be"
4978                                    " the primary node.")
4979       _CheckNodeOnline(self, self.op.snode)
4980       _CheckNodeNotDrained(self, self.op.snode)
4981       self.secondaries.append(self.op.snode)
4982
4983     nodenames = [pnode.name] + self.secondaries
4984
4985     req_size = _ComputeDiskSize(self.op.disk_template,
4986                                 self.disks)
4987
4988     # Check lv size requirements
4989     if req_size is not None:
4990       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4991                                          self.op.hypervisor)
4992       for node in nodenames:
4993         info = nodeinfo[node]
4994         info.Raise("Cannot get current information from node %s" % node)
4995         info = info.payload
4996         vg_free = info.get('vg_free', None)
4997         if not isinstance(vg_free, int):
4998           raise errors.OpPrereqError("Can't compute free disk space on"
4999                                      " node %s" % node)
5000         if req_size > vg_free:
5001           raise errors.OpPrereqError("Not enough disk space on target node %s."
5002                                      " %d MB available, %d MB required" %
5003                                      (node, vg_free, req_size))
5004
5005     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
5006
5007     # os verification
5008     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
5009     result.Raise("OS '%s' not in supported os list for primary node %s" %
5010                  (self.op.os_type, pnode.name), prereq=True)
5011
5012     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
5013
5014     # memory check on primary node
5015     if self.op.start:
5016       _CheckNodeFreeMemory(self, self.pnode.name,
5017                            "creating instance %s" % self.op.instance_name,
5018                            self.be_full[constants.BE_MEMORY],
5019                            self.op.hypervisor)
5020
5021     self.dry_run_result = list(nodenames)
5022
5023   def Exec(self, feedback_fn):
5024     """Create and add the instance to the cluster.
5025
5026     """
5027     instance = self.op.instance_name
5028     pnode_name = self.pnode.name
5029
5030     ht_kind = self.op.hypervisor
5031     if ht_kind in constants.HTS_REQ_PORT:
5032       network_port = self.cfg.AllocatePort()
5033     else:
5034       network_port = None
5035
5036     ##if self.op.vnc_bind_address is None:
5037     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
5038
5039     # this is needed because os.path.join does not accept None arguments
5040     if self.op.file_storage_dir is None:
5041       string_file_storage_dir = ""
5042     else:
5043       string_file_storage_dir = self.op.file_storage_dir
5044
5045     # build the full file storage dir path
5046     file_storage_dir = os.path.normpath(os.path.join(
5047                                         self.cfg.GetFileStorageDir(),
5048                                         string_file_storage_dir, instance))
5049
5050
5051     disks = _GenerateDiskTemplate(self,
5052                                   self.op.disk_template,
5053                                   instance, pnode_name,
5054                                   self.secondaries,
5055                                   self.disks,
5056                                   file_storage_dir,
5057                                   self.op.file_driver,
5058                                   0)
5059
5060     iobj = objects.Instance(name=instance, os=self.op.os_type,
5061                             primary_node=pnode_name,
5062                             nics=self.nics, disks=disks,
5063                             disk_template=self.op.disk_template,
5064                             admin_up=False,
5065                             network_port=network_port,
5066                             beparams=self.op.beparams,
5067                             hvparams=self.op.hvparams,
5068                             hypervisor=self.op.hypervisor,
5069                             )
5070
5071     feedback_fn("* creating instance disks...")
5072     try:
5073       _CreateDisks(self, iobj)
5074     except errors.OpExecError:
5075       self.LogWarning("Device creation failed, reverting...")
5076       try:
5077         _RemoveDisks(self, iobj)
5078       finally:
5079         self.cfg.ReleaseDRBDMinors(instance)
5080         raise
5081
5082     feedback_fn("adding instance %s to cluster config" % instance)
5083
5084     self.cfg.AddInstance(iobj)
5085     # Declare that we don't want to remove the instance lock anymore, as we've
5086     # added the instance to the config
5087     del self.remove_locks[locking.LEVEL_INSTANCE]
5088     # Unlock all the nodes
5089     if self.op.mode == constants.INSTANCE_IMPORT:
5090       nodes_keep = [self.op.src_node]
5091       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5092                        if node != self.op.src_node]
5093       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5094       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5095     else:
5096       self.context.glm.release(locking.LEVEL_NODE)
5097       del self.acquired_locks[locking.LEVEL_NODE]
5098
5099     if self.op.wait_for_sync:
5100       disk_abort = not _WaitForSync(self, iobj)
5101     elif iobj.disk_template in constants.DTS_NET_MIRROR:
5102       # make sure the disks are not degraded (still sync-ing is ok)
5103       time.sleep(15)
5104       feedback_fn("* checking mirrors status")
5105       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5106     else:
5107       disk_abort = False
5108
5109     if disk_abort:
5110       _RemoveDisks(self, iobj)
5111       self.cfg.RemoveInstance(iobj.name)
5112       # Make sure the instance lock gets removed
5113       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5114       raise errors.OpExecError("There are some degraded disks for"
5115                                " this instance")
5116
5117     feedback_fn("creating os for instance %s on node %s" %
5118                 (instance, pnode_name))
5119
5120     if iobj.disk_template != constants.DT_DISKLESS:
5121       if self.op.mode == constants.INSTANCE_CREATE:
5122         feedback_fn("* running the instance OS create scripts...")
5123         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5124         result.Raise("Could not add os for instance %s"
5125                      " on node %s" % (instance, pnode_name))
5126
5127       elif self.op.mode == constants.INSTANCE_IMPORT:
5128         feedback_fn("* running the instance OS import scripts...")
5129         src_node = self.op.src_node
5130         src_images = self.src_images
5131         cluster_name = self.cfg.GetClusterName()
5132         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5133                                                          src_node, src_images,
5134                                                          cluster_name)
5135         msg = import_result.fail_msg
5136         if msg:
5137           self.LogWarning("Error while importing the disk images for instance"
5138                           " %s on node %s: %s" % (instance, pnode_name, msg))
5139       else:
5140         # also checked in the prereq part
5141         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5142                                      % self.op.mode)
5143
5144     if self.op.start:
5145       iobj.admin_up = True
5146       self.cfg.Update(iobj)
5147       logging.info("Starting instance %s on node %s", instance, pnode_name)
5148       feedback_fn("* starting instance...")
5149       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5150       result.Raise("Could not start instance")
5151
5152     return list(iobj.all_nodes)
5153
5154
5155 class LUConnectConsole(NoHooksLU):
5156   """Connect to an instance's console.
5157
5158   This is somewhat special in that it returns the command line that
5159   you need to run on the master node in order to connect to the
5160   console.
5161
5162   """
5163   _OP_REQP = ["instance_name"]
5164   REQ_BGL = False
5165
5166   def ExpandNames(self):
5167     self._ExpandAndLockInstance()
5168
5169   def CheckPrereq(self):
5170     """Check prerequisites.
5171
5172     This checks that the instance is in the cluster.
5173
5174     """
5175     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5176     assert self.instance is not None, \
5177       "Cannot retrieve locked instance %s" % self.op.instance_name
5178     _CheckNodeOnline(self, self.instance.primary_node)
5179
5180   def Exec(self, feedback_fn):
5181     """Connect to the console of an instance
5182
5183     """
5184     instance = self.instance
5185     node = instance.primary_node
5186
5187     node_insts = self.rpc.call_instance_list([node],
5188                                              [instance.hypervisor])[node]
5189     node_insts.Raise("Can't get node information from %s" % node)
5190
5191     if instance.name not in node_insts.payload:
5192       raise errors.OpExecError("Instance %s is not running." % instance.name)
5193
5194     logging.debug("Connecting to console of %s on %s", instance.name, node)
5195
5196     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5197     cluster = self.cfg.GetClusterInfo()
5198     # beparams and hvparams are passed separately, to avoid editing the
5199     # instance and then saving the defaults in the instance itself.
5200     hvparams = cluster.FillHV(instance)
5201     beparams = cluster.FillBE(instance)
5202     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5203
5204     # build ssh cmdline
5205     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5206
5207
5208 class LUReplaceDisks(LogicalUnit):
5209   """Replace the disks of an instance.
5210
5211   """
5212   HPATH = "mirrors-replace"
5213   HTYPE = constants.HTYPE_INSTANCE
5214   _OP_REQP = ["instance_name", "mode", "disks"]
5215   REQ_BGL = False
5216
5217   def CheckArguments(self):
5218     if not hasattr(self.op, "remote_node"):
5219       self.op.remote_node = None
5220     if not hasattr(self.op, "iallocator"):
5221       self.op.iallocator = None
5222
5223     TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
5224                                   self.op.iallocator)
5225
5226   def ExpandNames(self):
5227     self._ExpandAndLockInstance()
5228
5229     if self.op.iallocator is not None:
5230       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5231
5232     elif self.op.remote_node is not None:
5233       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5234       if remote_node is None:
5235         raise errors.OpPrereqError("Node '%s' not known" %
5236                                    self.op.remote_node)
5237
5238       self.op.remote_node = remote_node
5239
5240       # Warning: do not remove the locking of the new secondary here
5241       # unless DRBD8.AddChildren is changed to work in parallel;
5242       # currently it doesn't since parallel invocations of
5243       # FindUnusedMinor will conflict
5244       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5245       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5246
5247     else:
5248       self.needed_locks[locking.LEVEL_NODE] = []
5249       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5250
5251     self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
5252                                    self.op.iallocator, self.op.remote_node,
5253                                    self.op.disks)
5254
5255     self.tasklets = [self.replacer]
5256
5257   def DeclareLocks(self, level):
5258     # If we're not already locking all nodes in the set we have to declare the
5259     # instance's primary/secondary nodes.
5260     if (level == locking.LEVEL_NODE and
5261         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5262       self._LockInstancesNodes()
5263
5264   def BuildHooksEnv(self):
5265     """Build hooks env.
5266
5267     This runs on the master, the primary and all the secondaries.
5268
5269     """
5270     instance = self.replacer.instance
5271     env = {
5272       "MODE": self.op.mode,
5273       "NEW_SECONDARY": self.op.remote_node,
5274       "OLD_SECONDARY": instance.secondary_nodes[0],
5275       }
5276     env.update(_BuildInstanceHookEnvByObject(self, instance))
5277     nl = [
5278       self.cfg.GetMasterNode(),
5279       instance.primary_node,
5280       ]
5281     if self.op.remote_node is not None:
5282       nl.append(self.op.remote_node)
5283     return env, nl, nl
5284
5285
5286 class LUEvacuateNode(LogicalUnit):
5287   """Relocate the secondary instances from a node.
5288
5289   """
5290   HPATH = "node-evacuate"
5291   HTYPE = constants.HTYPE_NODE
5292   _OP_REQP = ["node_name"]
5293   REQ_BGL = False
5294
5295   def CheckArguments(self):
5296     if not hasattr(self.op, "remote_node"):
5297       self.op.remote_node = None
5298     if not hasattr(self.op, "iallocator"):
5299       self.op.iallocator = None
5300
5301     TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
5302                                   self.op.remote_node,
5303                                   self.op.iallocator)
5304
5305   def ExpandNames(self):
5306     self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
5307     if self.op.node_name is None:
5308       raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
5309
5310     self.needed_locks = {}
5311
5312     # Declare node locks
5313     if self.op.iallocator is not None:
5314       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5315
5316     elif self.op.remote_node is not None:
5317       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5318       if remote_node is None:
5319         raise errors.OpPrereqError("Node '%s' not known" %
5320                                    self.op.remote_node)
5321
5322       self.op.remote_node = remote_node
5323
5324       # Warning: do not remove the locking of the new secondary here
5325       # unless DRBD8.AddChildren is changed to work in parallel;
5326       # currently it doesn't since parallel invocations of
5327       # FindUnusedMinor will conflict
5328       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5329       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5330
5331     else:
5332       raise errors.OpPrereqError("Invalid parameters")
5333
5334     # Create tasklets for replacing disks for all secondary instances on this
5335     # node
5336     names = []
5337     tasklets = []
5338
5339     for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
5340       logging.debug("Replacing disks for instance %s", inst.name)
5341       names.append(inst.name)
5342
5343       replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
5344                                 self.op.iallocator, self.op.remote_node, [])
5345       tasklets.append(replacer)
5346
5347     self.tasklets = tasklets
5348     self.instance_names = names
5349
5350     # Declare instance locks
5351     self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
5352
5353   def DeclareLocks(self, level):
5354     # If we're not already locking all nodes in the set we have to declare the
5355     # instance's primary/secondary nodes.
5356     if (level == locking.LEVEL_NODE and
5357         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5358       self._LockInstancesNodes()
5359
5360   def BuildHooksEnv(self):
5361     """Build hooks env.
5362
5363     This runs on the master, the primary and all the secondaries.
5364
5365     """
5366     env = {
5367       "NODE_NAME": self.op.node_name,
5368       }
5369
5370     nl = [self.cfg.GetMasterNode()]
5371
5372     if self.op.remote_node is not None:
5373       env["NEW_SECONDARY"] = self.op.remote_node
5374       nl.append(self.op.remote_node)
5375
5376     return (env, nl, nl)
5377
5378
5379 class TLReplaceDisks(Tasklet):
5380   """Replaces disks for an instance.
5381
5382   Note: Locking is not within the scope of this class.
5383
5384   """
5385   def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
5386                disks):
5387     """Initializes this class.
5388
5389     """
5390     Tasklet.__init__(self, lu)
5391
5392     # Parameters
5393     self.instance_name = instance_name
5394     self.mode = mode
5395     self.iallocator_name = iallocator_name
5396     self.remote_node = remote_node
5397     self.disks = disks
5398
5399     # Runtime data
5400     self.instance = None
5401     self.new_node = None
5402     self.target_node = None
5403     self.other_node = None
5404     self.remote_node_info = None
5405     self.node_secondary_ip = None
5406
5407   @staticmethod
5408   def CheckArguments(mode, remote_node, iallocator):
5409     """Helper function for users of this class.
5410
5411     """
5412     # check for valid parameter combination
5413     cnt = [remote_node, iallocator].count(None)
5414     if mode == constants.REPLACE_DISK_CHG:
5415       if cnt == 2:
5416         raise errors.OpPrereqError("When changing the secondary either an"
5417                                    " iallocator script must be used or the"
5418                                    " new node given")
5419       elif cnt == 0:
5420         raise errors.OpPrereqError("Give either the iallocator or the new"
5421                                    " secondary, not both")
5422     else: # not replacing the secondary
5423       if cnt != 2:
5424         raise errors.OpPrereqError("The iallocator and new node options can"
5425                                    " be used only when changing the"
5426                                    " secondary node")
5427
5428   @staticmethod
5429   def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
5430     """Compute a new secondary node using an IAllocator.
5431
5432     """
5433     ial = IAllocator(lu.cfg, lu.rpc,
5434                      mode=constants.IALLOCATOR_MODE_RELOC,
5435                      name=instance_name,
5436                      relocate_from=relocate_from)
5437
5438     ial.Run(iallocator_name)
5439
5440     if not ial.success:
5441       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
5442                                  " %s" % (iallocator_name, ial.info))
5443
5444     if len(ial.nodes) != ial.required_nodes:
5445       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5446                                  " of nodes (%s), required %s" %
5447                                  (len(ial.nodes), ial.required_nodes))
5448
5449     remote_node_name = ial.nodes[0]
5450
5451     lu.LogInfo("Selected new secondary for instance '%s': %s",
5452                instance_name, remote_node_name)
5453
5454     return remote_node_name
5455
5456   def CheckPrereq(self):
5457     """Check prerequisites.
5458
5459     This checks that the instance is in the cluster.
5460
5461     """
5462     self.instance = self.cfg.GetInstanceInfo(self.instance_name)
5463     assert self.instance is not None, \
5464       "Cannot retrieve locked instance %s" % self.instance_name
5465
5466     if self.instance.disk_template != constants.DT_DRBD8:
5467       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5468                                  " instances")
5469
5470     if len(self.instance.secondary_nodes) != 1:
5471       raise errors.OpPrereqError("The instance has a strange layout,"
5472                                  " expected one secondary but found %d" %
5473                                  len(self.instance.secondary_nodes))
5474
5475     secondary_node = self.instance.secondary_nodes[0]
5476
5477     if self.iallocator_name is None:
5478       remote_node = self.remote_node
5479     else:
5480       remote_node = self._RunAllocator(self.lu, self.iallocator_name,
5481                                        self.instance.name, secondary_node)
5482
5483     if remote_node is not None:
5484       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5485       assert self.remote_node_info is not None, \
5486         "Cannot retrieve locked node %s" % remote_node
5487     else:
5488       self.remote_node_info = None
5489
5490     if remote_node == self.instance.primary_node:
5491       raise errors.OpPrereqError("The specified node is the primary node of"
5492                                  " the instance.")
5493
5494     if remote_node == secondary_node:
5495       raise errors.OpPrereqError("The specified node is already the"
5496                                  " secondary node of the instance.")
5497
5498     if self.mode == constants.REPLACE_DISK_PRI:
5499       self.target_node = self.instance.primary_node
5500       self.other_node = secondary_node
5501       check_nodes = [self.target_node, self.other_node]
5502
5503     elif self.mode == constants.REPLACE_DISK_SEC:
5504       self.target_node = secondary_node
5505       self.other_node = self.instance.primary_node
5506       check_nodes = [self.target_node, self.other_node]
5507
5508     elif self.mode == constants.REPLACE_DISK_CHG:
5509       self.new_node = remote_node
5510       self.other_node = self.instance.primary_node
5511       self.target_node = secondary_node
5512       check_nodes = [self.new_node, self.other_node]
5513
5514       _CheckNodeNotDrained(self.lu, remote_node)
5515
5516     else:
5517       raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
5518                                    self.mode)
5519
5520     for node in check_nodes:
5521       _CheckNodeOnline(self.lu, node)
5522
5523     # If not specified all disks should be replaced
5524     if not self.disks:
5525       self.disks = range(len(self.instance.disks))
5526
5527     # Check whether disks are valid
5528     for disk_idx in self.disks:
5529       self.instance.FindDisk(disk_idx)
5530
5531     # Get secondary node IP addresses
5532     node_2nd_ip = {}
5533
5534     for node_name in [self.target_node, self.other_node, self.new_node]:
5535       if node_name is not None:
5536         node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
5537
5538     self.node_secondary_ip = node_2nd_ip
5539
5540   def Exec(self, feedback_fn):
5541     """Execute disk replacement.
5542
5543     This dispatches the disk replacement to the appropriate handler.
5544
5545     """
5546     feedback_fn("Replacing disks for %s" % self.instance.name)
5547
5548     activate_disks = (not self.instance.admin_up)
5549
5550     # Activate the instance disks if we're replacing them on a down instance
5551     if activate_disks:
5552       _StartInstanceDisks(self.lu, self.instance, True)
5553
5554     try:
5555       if self.mode == constants.REPLACE_DISK_CHG:
5556         return self._ExecDrbd8Secondary()
5557       else:
5558         return self._ExecDrbd8DiskOnly()
5559
5560     finally:
5561       # Deactivate the instance disks if we're replacing them on a down instance
5562       if activate_disks:
5563         _SafeShutdownInstanceDisks(self.lu, self.instance)
5564
5565   def _CheckVolumeGroup(self, nodes):
5566     self.lu.LogInfo("Checking volume groups")
5567
5568     vgname = self.cfg.GetVGName()
5569
5570     # Make sure volume group exists on all involved nodes
5571     results = self.rpc.call_vg_list(nodes)
5572     if not results:
5573       raise errors.OpExecError("Can't list volume groups on the nodes")
5574
5575     for node in nodes:
5576       res = results[node]
5577       res.Raise("Error checking node %s" % node)
5578       if vgname not in res.payload:
5579         raise errors.OpExecError("Volume group '%s' not found on node %s" %
5580                                  (vgname, node))
5581
5582   def _CheckDisksExistence(self, nodes):
5583     # Check disk existence
5584     for idx, dev in enumerate(self.instance.disks):
5585       if idx not in self.disks:
5586         continue
5587
5588       for node in nodes:
5589         self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
5590         self.cfg.SetDiskID(dev, node)
5591
5592         result = self.rpc.call_blockdev_find(node, dev)
5593
5594         msg = result.fail_msg
5595         if msg or not result.payload:
5596           if not msg:
5597             msg = "disk not found"
5598           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5599                                    (idx, node, msg))
5600
5601   def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
5602     for idx, dev in enumerate(self.instance.disks):
5603       if idx not in self.disks:
5604         continue
5605
5606       self.lu.LogInfo("Checking disk/%d consistency on node %s" %
5607                       (idx, node_name))
5608
5609       if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
5610                                    ldisk=ldisk):
5611         raise errors.OpExecError("Node %s has degraded storage, unsafe to"
5612                                  " replace disks for instance %s" %
5613                                  (node_name, self.instance.name))
5614
5615   def _CreateNewStorage(self, node_name):
5616     vgname = self.cfg.GetVGName()
5617     iv_names = {}
5618
5619     for idx, dev in enumerate(self.instance.disks):
5620       if idx not in self.disks:
5621         continue
5622
5623       self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
5624
5625       self.cfg.SetDiskID(dev, node_name)
5626
5627       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
5628       names = _GenerateUniqueNames(self.lu, lv_names)
5629
5630       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
5631                              logical_id=(vgname, names[0]))
5632       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5633                              logical_id=(vgname, names[1]))
5634
5635       new_lvs = [lv_data, lv_meta]
5636       old_lvs = dev.children
5637       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5638
5639       # we pass force_create=True to force the LVM creation
5640       for new_lv in new_lvs:
5641         _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
5642                         _GetInstanceInfoText(self.instance), False)
5643
5644     return iv_names
5645
5646   def _CheckDevices(self, node_name, iv_names):
5647     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5648       self.cfg.SetDiskID(dev, node_name)
5649
5650       result = self.rpc.call_blockdev_find(node_name, dev)
5651
5652       msg = result.fail_msg
5653       if msg or not result.payload:
5654         if not msg:
5655           msg = "disk not found"
5656         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5657                                  (name, msg))
5658
5659       if result.payload[5]:
5660         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5661
5662   def _RemoveOldStorage(self, node_name, iv_names):
5663     for name, (dev, old_lvs, _) in iv_names.iteritems():
5664       self.lu.LogInfo("Remove logical volumes for %s" % name)
5665
5666       for lv in old_lvs:
5667         self.cfg.SetDiskID(lv, node_name)
5668
5669         msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
5670         if msg:
5671           self.lu.LogWarning("Can't remove old LV: %s" % msg,
5672                              hint="remove unused LVs manually")
5673
5674   def _ExecDrbd8DiskOnly(self):
5675     """Replace a disk on the primary or secondary for DRBD 8.
5676
5677     The algorithm for replace is quite complicated:
5678
5679       1. for each disk to be replaced:
5680
5681         1. create new LVs on the target node with unique names
5682         1. detach old LVs from the drbd device
5683         1. rename old LVs to name_replaced.<time_t>
5684         1. rename new LVs to old LVs
5685         1. attach the new LVs (with the old names now) to the drbd device
5686
5687       1. wait for sync across all devices
5688
5689       1. for each modified disk:
5690
5691         1. remove old LVs (which have the name name_replaces.<time_t>)
5692
5693     Failures are not very well handled.
5694
5695     """
5696     steps_total = 6
5697
5698     # Step: check device activation
5699     self.lu.LogStep(1, steps_total, "Check device existence")
5700     self._CheckDisksExistence([self.other_node, self.target_node])
5701     self._CheckVolumeGroup([self.target_node, self.other_node])
5702
5703     # Step: check other node consistency
5704     self.lu.LogStep(2, steps_total, "Check peer consistency")
5705     self._CheckDisksConsistency(self.other_node,
5706                                 self.other_node == self.instance.primary_node,
5707                                 False)
5708
5709     # Step: create new storage
5710     self.lu.LogStep(3, steps_total, "Allocate new storage")
5711     iv_names = self._CreateNewStorage(self.target_node)
5712
5713     # Step: for each lv, detach+rename*2+attach
5714     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
5715     for dev, old_lvs, new_lvs in iv_names.itervalues():
5716       self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
5717
5718       result = self.rpc.call_blockdev_removechildren(self.target_node, dev, old_lvs)
5719       result.Raise("Can't detach drbd from local storage on node"
5720                    " %s for device %s" % (self.target_node, dev.iv_name))
5721       #dev.children = []
5722       #cfg.Update(instance)
5723
5724       # ok, we created the new LVs, so now we know we have the needed
5725       # storage; as such, we proceed on the target node to rename
5726       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5727       # using the assumption that logical_id == physical_id (which in
5728       # turn is the unique_id on that node)
5729
5730       # FIXME(iustin): use a better name for the replaced LVs
5731       temp_suffix = int(time.time())
5732       ren_fn = lambda d, suff: (d.physical_id[0],
5733                                 d.physical_id[1] + "_replaced-%s" % suff)
5734
5735       # Build the rename list based on what LVs exist on the node
5736       rename_old_to_new = []
5737       for to_ren in old_lvs:
5738         result = self.rpc.call_blockdev_find(self.target_node, to_ren)
5739         if not result.fail_msg and result.payload:
5740           # device exists
5741           rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
5742
5743       self.lu.LogInfo("Renaming the old LVs on the target node")
5744       result = self.rpc.call_blockdev_rename(self.target_node, rename_old_to_new)
5745       result.Raise("Can't rename old LVs on node %s" % self.target_node)
5746
5747       # Now we rename the new LVs to the old LVs
5748       self.lu.LogInfo("Renaming the new LVs on the target node")
5749       rename_new_to_old = [(new, old.physical_id)
5750                            for old, new in zip(old_lvs, new_lvs)]
5751       result = self.rpc.call_blockdev_rename(self.target_node, rename_new_to_old)
5752       result.Raise("Can't rename new LVs on node %s" % self.target_node)
5753
5754       for old, new in zip(old_lvs, new_lvs):
5755         new.logical_id = old.logical_id
5756         self.cfg.SetDiskID(new, self.target_node)
5757
5758       for disk in old_lvs:
5759         disk.logical_id = ren_fn(disk, temp_suffix)
5760         self.cfg.SetDiskID(disk, self.target_node)
5761
5762       # Now that the new lvs have the old name, we can add them to the device
5763       self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
5764       result = self.rpc.call_blockdev_addchildren(self.target_node, dev, new_lvs)
5765       msg = result.fail_msg
5766       if msg:
5767         for new_lv in new_lvs:
5768           msg2 = self.rpc.call_blockdev_remove(self.target_node, new_lv).fail_msg
5769           if msg2:
5770             self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
5771                                hint=("cleanup manually the unused logical"
5772                                      "volumes"))
5773         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5774
5775       dev.children = new_lvs
5776
5777       self.cfg.Update(self.instance)
5778
5779     # Wait for sync
5780     # This can fail as the old devices are degraded and _WaitForSync
5781     # does a combined result over all disks, so we don't check its return value
5782     self.lu.LogStep(5, steps_total, "Sync devices")
5783     _WaitForSync(self.lu, self.instance, unlock=True)
5784
5785     # Check all devices manually
5786     self._CheckDevices(self.instance.primary_node, iv_names)
5787
5788     # Step: remove old storage
5789     self.lu.LogStep(6, steps_total, "Removing old storage")
5790     self._RemoveOldStorage(self.target_node, iv_names)
5791
5792   def _ExecDrbd8Secondary(self):
5793     """Replace the secondary node for DRBD 8.
5794
5795     The algorithm for replace is quite complicated:
5796       - for all disks of the instance:
5797         - create new LVs on the new node with same names
5798         - shutdown the drbd device on the old secondary
5799         - disconnect the drbd network on the primary
5800         - create the drbd device on the new secondary
5801         - network attach the drbd on the primary, using an artifice:
5802           the drbd code for Attach() will connect to the network if it
5803           finds a device which is connected to the good local disks but
5804           not network enabled
5805       - wait for sync across all devices
5806       - remove all disks from the old secondary
5807
5808     Failures are not very well handled.
5809
5810     """
5811     steps_total = 6
5812
5813     # Step: check device activation
5814     self.lu.LogStep(1, steps_total, "Check device existence")
5815     self._CheckDisksExistence([self.instance.primary_node])
5816     self._CheckVolumeGroup([self.instance.primary_node])
5817
5818     # Step: check other node consistency
5819     self.lu.LogStep(2, steps_total, "Check peer consistency")
5820     self._CheckDisksConsistency(self.instance.primary_node, True, True)
5821
5822     # Step: create new storage
5823     self.lu.LogStep(3, steps_total, "Allocate new storage")
5824     for idx, dev in enumerate(self.instance.disks):
5825       self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
5826                       (self.new_node, idx))
5827       # we pass force_create=True to force LVM creation
5828       for new_lv in dev.children:
5829         _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
5830                         _GetInstanceInfoText(self.instance), False)
5831
5832     # Step 4: dbrd minors and drbd setups changes
5833     # after this, we must manually remove the drbd minors on both the
5834     # error and the success paths
5835     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
5836     minors = self.cfg.AllocateDRBDMinor([self.new_node for dev in self.instance.disks],
5837                                         self.instance.name)
5838     logging.debug("Allocated minors %r" % (minors,))
5839
5840     iv_names = {}
5841     for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
5842       self.lu.LogInfo("activating a new drbd on %s for disk/%d" % (self.new_node, idx))
5843       # create new devices on new_node; note that we create two IDs:
5844       # one without port, so the drbd will be activated without
5845       # networking information on the new node at this stage, and one
5846       # with network, for the latter activation in step 4
5847       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5848       if self.instance.primary_node == o_node1:
5849         p_minor = o_minor1
5850       else:
5851         p_minor = o_minor2
5852
5853       new_alone_id = (self.instance.primary_node, self.new_node, None, p_minor, new_minor, o_secret)
5854       new_net_id = (self.instance.primary_node, self.new_node, o_port, p_minor, new_minor, o_secret)
5855
5856       iv_names[idx] = (dev, dev.children, new_net_id)
5857       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5858                     new_net_id)
5859       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5860                               logical_id=new_alone_id,
5861                               children=dev.children,
5862                               size=dev.size)
5863       try:
5864         _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
5865                               _GetInstanceInfoText(self.instance), False)
5866       except errors.GenericError:
5867         self.cfg.ReleaseDRBDMinors(self.instance.name)
5868         raise
5869
5870     # We have new devices, shutdown the drbd on the old secondary
5871     for idx, dev in enumerate(self.instance.disks):
5872       self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
5873       self.cfg.SetDiskID(dev, self.target_node)
5874       msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
5875       if msg:
5876         self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
5877                            "node: %s" % (idx, msg),
5878                            hint=("Please cleanup this device manually as"
5879                                  " soon as possible"))
5880
5881     self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
5882     result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], self.node_secondary_ip,
5883                                                self.instance.disks)[self.instance.primary_node]
5884
5885     msg = result.fail_msg
5886     if msg:
5887       # detaches didn't succeed (unlikely)
5888       self.cfg.ReleaseDRBDMinors(self.instance.name)
5889       raise errors.OpExecError("Can't detach the disks from the network on"
5890                                " old node: %s" % (msg,))
5891
5892     # if we managed to detach at least one, we update all the disks of
5893     # the instance to point to the new secondary
5894     self.lu.LogInfo("Updating instance configuration")
5895     for dev, _, new_logical_id in iv_names.itervalues():
5896       dev.logical_id = new_logical_id
5897       self.cfg.SetDiskID(dev, self.instance.primary_node)
5898
5899     self.cfg.Update(self.instance)
5900
5901     # and now perform the drbd attach
5902     self.lu.LogInfo("Attaching primary drbds to new secondary"
5903                     " (standalone => connected)")
5904     result = self.rpc.call_drbd_attach_net([self.instance.primary_node, self.new_node], self.node_secondary_ip,
5905                                            self.instance.disks, self.instance.name,
5906                                            False)
5907     for to_node, to_result in result.items():
5908       msg = to_result.fail_msg
5909       if msg:
5910         self.lu.LogWarning("Can't attach drbd disks on node %s: %s", to_node, msg,
5911                            hint=("please do a gnt-instance info to see the"
5912                                  " status of disks"))
5913
5914     # Wait for sync
5915     # This can fail as the old devices are degraded and _WaitForSync
5916     # does a combined result over all disks, so we don't check its return value
5917     self.lu.LogStep(5, steps_total, "Sync devices")
5918     _WaitForSync(self.lu, self.instance, unlock=True)
5919
5920     # Check all devices manually
5921     self._CheckDevices(self.instance.primary_node, iv_names)
5922
5923     # Step: remove old storage
5924     self.lu.LogStep(6, steps_total, "Removing old storage")
5925     self._RemoveOldStorage(self.target_node, iv_names)
5926
5927
5928 class LUGrowDisk(LogicalUnit):
5929   """Grow a disk of an instance.
5930
5931   """
5932   HPATH = "disk-grow"
5933   HTYPE = constants.HTYPE_INSTANCE
5934   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5935   REQ_BGL = False
5936
5937   def ExpandNames(self):
5938     self._ExpandAndLockInstance()
5939     self.needed_locks[locking.LEVEL_NODE] = []
5940     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5941
5942   def DeclareLocks(self, level):
5943     if level == locking.LEVEL_NODE:
5944       self._LockInstancesNodes()
5945
5946   def BuildHooksEnv(self):
5947     """Build hooks env.
5948
5949     This runs on the master, the primary and all the secondaries.
5950
5951     """
5952     env = {
5953       "DISK": self.op.disk,
5954       "AMOUNT": self.op.amount,
5955       }
5956     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5957     nl = [
5958       self.cfg.GetMasterNode(),
5959       self.instance.primary_node,
5960       ]
5961     return env, nl, nl
5962
5963   def CheckPrereq(self):
5964     """Check prerequisites.
5965
5966     This checks that the instance is in the cluster.
5967
5968     """
5969     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5970     assert instance is not None, \
5971       "Cannot retrieve locked instance %s" % self.op.instance_name
5972     nodenames = list(instance.all_nodes)
5973     for node in nodenames:
5974       _CheckNodeOnline(self, node)
5975
5976
5977     self.instance = instance
5978
5979     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5980       raise errors.OpPrereqError("Instance's disk layout does not support"
5981                                  " growing.")
5982
5983     self.disk = instance.FindDisk(self.op.disk)
5984
5985     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5986                                        instance.hypervisor)
5987     for node in nodenames:
5988       info = nodeinfo[node]
5989       info.Raise("Cannot get current information from node %s" % node)
5990       vg_free = info.payload.get('vg_free', None)
5991       if not isinstance(vg_free, int):
5992         raise errors.OpPrereqError("Can't compute free disk space on"
5993                                    " node %s" % node)
5994       if self.op.amount > vg_free:
5995         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5996                                    " %d MiB available, %d MiB required" %
5997                                    (node, vg_free, self.op.amount))
5998
5999   def Exec(self, feedback_fn):
6000     """Execute disk grow.
6001
6002     """
6003     instance = self.instance
6004     disk = self.disk
6005     for node in instance.all_nodes:
6006       self.cfg.SetDiskID(disk, node)
6007       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
6008       result.Raise("Grow request failed to node %s" % node)
6009     disk.RecordGrow(self.op.amount)
6010     self.cfg.Update(instance)
6011     if self.op.wait_for_sync:
6012       disk_abort = not _WaitForSync(self, instance)
6013       if disk_abort:
6014         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
6015                              " status.\nPlease check the instance.")
6016
6017
6018 class LUQueryInstanceData(NoHooksLU):
6019   """Query runtime instance data.
6020
6021   """
6022   _OP_REQP = ["instances", "static"]
6023   REQ_BGL = False
6024
6025   def ExpandNames(self):
6026     self.needed_locks = {}
6027     self.share_locks = dict.fromkeys(locking.LEVELS, 1)
6028
6029     if not isinstance(self.op.instances, list):
6030       raise errors.OpPrereqError("Invalid argument type 'instances'")
6031
6032     if self.op.instances:
6033       self.wanted_names = []
6034       for name in self.op.instances:
6035         full_name = self.cfg.ExpandInstanceName(name)
6036         if full_name is None:
6037           raise errors.OpPrereqError("Instance '%s' not known" % name)
6038         self.wanted_names.append(full_name)
6039       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
6040     else:
6041       self.wanted_names = None
6042       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
6043
6044     self.needed_locks[locking.LEVEL_NODE] = []
6045     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6046
6047   def DeclareLocks(self, level):
6048     if level == locking.LEVEL_NODE:
6049       self._LockInstancesNodes()
6050
6051   def CheckPrereq(self):
6052     """Check prerequisites.
6053
6054     This only checks the optional instance list against the existing names.
6055
6056     """
6057     if self.wanted_names is None:
6058       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
6059
6060     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
6061                              in self.wanted_names]
6062     return
6063
6064   def _ComputeDiskStatus(self, instance, snode, dev):
6065     """Compute block device status.
6066
6067     """
6068     static = self.op.static
6069     if not static:
6070       self.cfg.SetDiskID(dev, instance.primary_node)
6071       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
6072       if dev_pstatus.offline:
6073         dev_pstatus = None
6074       else:
6075         dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
6076         dev_pstatus = dev_pstatus.payload
6077     else:
6078       dev_pstatus = None
6079
6080     if dev.dev_type in constants.LDS_DRBD:
6081       # we change the snode then (otherwise we use the one passed in)
6082       if dev.logical_id[0] == instance.primary_node:
6083         snode = dev.logical_id[1]
6084       else:
6085         snode = dev.logical_id[0]
6086
6087     if snode and not static:
6088       self.cfg.SetDiskID(dev, snode)
6089       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
6090       if dev_sstatus.offline:
6091         dev_sstatus = None
6092       else:
6093         dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
6094         dev_sstatus = dev_sstatus.payload
6095     else:
6096       dev_sstatus = None
6097
6098     if dev.children:
6099       dev_children = [self._ComputeDiskStatus(instance, snode, child)
6100                       for child in dev.children]
6101     else:
6102       dev_children = []
6103
6104     data = {
6105       "iv_name": dev.iv_name,
6106       "dev_type": dev.dev_type,
6107       "logical_id": dev.logical_id,
6108       "physical_id": dev.physical_id,
6109       "pstatus": dev_pstatus,
6110       "sstatus": dev_sstatus,
6111       "children": dev_children,
6112       "mode": dev.mode,
6113       "size": dev.size,
6114       }
6115
6116     return data
6117
6118   def Exec(self, feedback_fn):
6119     """Gather and return data"""
6120     result = {}
6121
6122     cluster = self.cfg.GetClusterInfo()
6123
6124     for instance in self.wanted_instances:
6125       if not self.op.static:
6126         remote_info = self.rpc.call_instance_info(instance.primary_node,
6127                                                   instance.name,
6128                                                   instance.hypervisor)
6129         remote_info.Raise("Error checking node %s" % instance.primary_node)
6130         remote_info = remote_info.payload
6131         if remote_info and "state" in remote_info:
6132           remote_state = "up"
6133         else:
6134           remote_state = "down"
6135       else:
6136         remote_state = None
6137       if instance.admin_up:
6138         config_state = "up"
6139       else:
6140         config_state = "down"
6141
6142       disks = [self._ComputeDiskStatus(instance, None, device)
6143                for device in instance.disks]
6144
6145       idict = {
6146         "name": instance.name,
6147         "config_state": config_state,
6148         "run_state": remote_state,
6149         "pnode": instance.primary_node,
6150         "snodes": instance.secondary_nodes,
6151         "os": instance.os,
6152         # this happens to be the same format used for hooks
6153         "nics": _NICListToTuple(self, instance.nics),
6154         "disks": disks,
6155         "hypervisor": instance.hypervisor,
6156         "network_port": instance.network_port,
6157         "hv_instance": instance.hvparams,
6158         "hv_actual": cluster.FillHV(instance),
6159         "be_instance": instance.beparams,
6160         "be_actual": cluster.FillBE(instance),
6161         }
6162
6163       result[instance.name] = idict
6164
6165     return result
6166
6167
6168 class LUSetInstanceParams(LogicalUnit):
6169   """Modifies an instances's parameters.
6170
6171   """
6172   HPATH = "instance-modify"
6173   HTYPE = constants.HTYPE_INSTANCE
6174   _OP_REQP = ["instance_name"]
6175   REQ_BGL = False
6176
6177   def CheckArguments(self):
6178     if not hasattr(self.op, 'nics'):
6179       self.op.nics = []
6180     if not hasattr(self.op, 'disks'):
6181       self.op.disks = []
6182     if not hasattr(self.op, 'beparams'):
6183       self.op.beparams = {}
6184     if not hasattr(self.op, 'hvparams'):
6185       self.op.hvparams = {}
6186     self.op.force = getattr(self.op, "force", False)
6187     if not (self.op.nics or self.op.disks or
6188             self.op.hvparams or self.op.beparams):
6189       raise errors.OpPrereqError("No changes submitted")
6190
6191     # Disk validation
6192     disk_addremove = 0
6193     for disk_op, disk_dict in self.op.disks:
6194       if disk_op == constants.DDM_REMOVE:
6195         disk_addremove += 1
6196         continue
6197       elif disk_op == constants.DDM_ADD:
6198         disk_addremove += 1
6199       else:
6200         if not isinstance(disk_op, int):
6201           raise errors.OpPrereqError("Invalid disk index")
6202         if not isinstance(disk_dict, dict):
6203           msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
6204           raise errors.OpPrereqError(msg)
6205
6206       if disk_op == constants.DDM_ADD:
6207         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
6208         if mode not in constants.DISK_ACCESS_SET:
6209           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
6210         size = disk_dict.get('size', None)
6211         if size is None:
6212           raise errors.OpPrereqError("Required disk parameter size missing")
6213         try:
6214           size = int(size)
6215         except ValueError, err:
6216           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
6217                                      str(err))
6218         disk_dict['size'] = size
6219       else:
6220         # modification of disk
6221         if 'size' in disk_dict:
6222           raise errors.OpPrereqError("Disk size change not possible, use"
6223                                      " grow-disk")
6224
6225     if disk_addremove > 1:
6226       raise errors.OpPrereqError("Only one disk add or remove operation"
6227                                  " supported at a time")
6228
6229     # NIC validation
6230     nic_addremove = 0
6231     for nic_op, nic_dict in self.op.nics:
6232       if nic_op == constants.DDM_REMOVE:
6233         nic_addremove += 1
6234         continue
6235       elif nic_op == constants.DDM_ADD:
6236         nic_addremove += 1
6237       else:
6238         if not isinstance(nic_op, int):
6239           raise errors.OpPrereqError("Invalid nic index")
6240         if not isinstance(nic_dict, dict):
6241           msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
6242           raise errors.OpPrereqError(msg)
6243
6244       # nic_dict should be a dict
6245       nic_ip = nic_dict.get('ip', None)
6246       if nic_ip is not None:
6247         if nic_ip.lower() == constants.VALUE_NONE:
6248           nic_dict['ip'] = None
6249         else:
6250           if not utils.IsValidIP(nic_ip):
6251             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
6252
6253       nic_bridge = nic_dict.get('bridge', None)
6254       nic_link = nic_dict.get('link', None)
6255       if nic_bridge and nic_link:
6256         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
6257                                    " at the same time")
6258       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
6259         nic_dict['bridge'] = None
6260       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
6261         nic_dict['link'] = None
6262
6263       if nic_op == constants.DDM_ADD:
6264         nic_mac = nic_dict.get('mac', None)
6265         if nic_mac is None:
6266           nic_dict['mac'] = constants.VALUE_AUTO
6267
6268       if 'mac' in nic_dict:
6269         nic_mac = nic_dict['mac']
6270         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6271           if not utils.IsValidMac(nic_mac):
6272             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6273         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6274           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6275                                      " modifying an existing nic")
6276
6277     if nic_addremove > 1:
6278       raise errors.OpPrereqError("Only one NIC add or remove operation"
6279                                  " supported at a time")
6280
6281   def ExpandNames(self):
6282     self._ExpandAndLockInstance()
6283     self.needed_locks[locking.LEVEL_NODE] = []
6284     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6285
6286   def DeclareLocks(self, level):
6287     if level == locking.LEVEL_NODE:
6288       self._LockInstancesNodes()
6289
6290   def BuildHooksEnv(self):
6291     """Build hooks env.
6292
6293     This runs on the master, primary and secondaries.
6294
6295     """
6296     args = dict()
6297     if constants.BE_MEMORY in self.be_new:
6298       args['memory'] = self.be_new[constants.BE_MEMORY]
6299     if constants.BE_VCPUS in self.be_new:
6300       args['vcpus'] = self.be_new[constants.BE_VCPUS]
6301     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6302     # information at all.
6303     if self.op.nics:
6304       args['nics'] = []
6305       nic_override = dict(self.op.nics)
6306       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6307       for idx, nic in enumerate(self.instance.nics):
6308         if idx in nic_override:
6309           this_nic_override = nic_override[idx]
6310         else:
6311           this_nic_override = {}
6312         if 'ip' in this_nic_override:
6313           ip = this_nic_override['ip']
6314         else:
6315           ip = nic.ip
6316         if 'mac' in this_nic_override:
6317           mac = this_nic_override['mac']
6318         else:
6319           mac = nic.mac
6320         if idx in self.nic_pnew:
6321           nicparams = self.nic_pnew[idx]
6322         else:
6323           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6324         mode = nicparams[constants.NIC_MODE]
6325         link = nicparams[constants.NIC_LINK]
6326         args['nics'].append((ip, mac, mode, link))
6327       if constants.DDM_ADD in nic_override:
6328         ip = nic_override[constants.DDM_ADD].get('ip', None)
6329         mac = nic_override[constants.DDM_ADD]['mac']
6330         nicparams = self.nic_pnew[constants.DDM_ADD]
6331         mode = nicparams[constants.NIC_MODE]
6332         link = nicparams[constants.NIC_LINK]
6333         args['nics'].append((ip, mac, mode, link))
6334       elif constants.DDM_REMOVE in nic_override:
6335         del args['nics'][-1]
6336
6337     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6338     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6339     return env, nl, nl
6340
6341   def _GetUpdatedParams(self, old_params, update_dict,
6342                         default_values, parameter_types):
6343     """Return the new params dict for the given params.
6344
6345     @type old_params: dict
6346     @param old_params: old parameters
6347     @type update_dict: dict
6348     @param update_dict: dict containing new parameter values,
6349                         or constants.VALUE_DEFAULT to reset the
6350                         parameter to its default value
6351     @type default_values: dict
6352     @param default_values: default values for the filled parameters
6353     @type parameter_types: dict
6354     @param parameter_types: dict mapping target dict keys to types
6355                             in constants.ENFORCEABLE_TYPES
6356     @rtype: (dict, dict)
6357     @return: (new_parameters, filled_parameters)
6358
6359     """
6360     params_copy = copy.deepcopy(old_params)
6361     for key, val in update_dict.iteritems():
6362       if val == constants.VALUE_DEFAULT:
6363         try:
6364           del params_copy[key]
6365         except KeyError:
6366           pass
6367       else:
6368         params_copy[key] = val
6369     utils.ForceDictType(params_copy, parameter_types)
6370     params_filled = objects.FillDict(default_values, params_copy)
6371     return (params_copy, params_filled)
6372
6373   def CheckPrereq(self):
6374     """Check prerequisites.
6375
6376     This only checks the instance list against the existing names.
6377
6378     """
6379     self.force = self.op.force
6380
6381     # checking the new params on the primary/secondary nodes
6382
6383     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6384     cluster = self.cluster = self.cfg.GetClusterInfo()
6385     assert self.instance is not None, \
6386       "Cannot retrieve locked instance %s" % self.op.instance_name
6387     pnode = instance.primary_node
6388     nodelist = list(instance.all_nodes)
6389
6390     # hvparams processing
6391     if self.op.hvparams:
6392       i_hvdict, hv_new = self._GetUpdatedParams(
6393                              instance.hvparams, self.op.hvparams,
6394                              cluster.hvparams[instance.hypervisor],
6395                              constants.HVS_PARAMETER_TYPES)
6396       # local check
6397       hypervisor.GetHypervisor(
6398         instance.hypervisor).CheckParameterSyntax(hv_new)
6399       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6400       self.hv_new = hv_new # the new actual values
6401       self.hv_inst = i_hvdict # the new dict (without defaults)
6402     else:
6403       self.hv_new = self.hv_inst = {}
6404
6405     # beparams processing
6406     if self.op.beparams:
6407       i_bedict, be_new = self._GetUpdatedParams(
6408                              instance.beparams, self.op.beparams,
6409                              cluster.beparams[constants.PP_DEFAULT],
6410                              constants.BES_PARAMETER_TYPES)
6411       self.be_new = be_new # the new actual values
6412       self.be_inst = i_bedict # the new dict (without defaults)
6413     else:
6414       self.be_new = self.be_inst = {}
6415
6416     self.warn = []
6417
6418     if constants.BE_MEMORY in self.op.beparams and not self.force:
6419       mem_check_list = [pnode]
6420       if be_new[constants.BE_AUTO_BALANCE]:
6421         # either we changed auto_balance to yes or it was from before
6422         mem_check_list.extend(instance.secondary_nodes)
6423       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6424                                                   instance.hypervisor)
6425       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6426                                          instance.hypervisor)
6427       pninfo = nodeinfo[pnode]
6428       msg = pninfo.fail_msg
6429       if msg:
6430         # Assume the primary node is unreachable and go ahead
6431         self.warn.append("Can't get info from primary node %s: %s" %
6432                          (pnode,  msg))
6433       elif not isinstance(pninfo.payload.get('memory_free', None), int):
6434         self.warn.append("Node data from primary node %s doesn't contain"
6435                          " free memory information" % pnode)
6436       elif instance_info.fail_msg:
6437         self.warn.append("Can't get instance runtime information: %s" %
6438                         instance_info.fail_msg)
6439       else:
6440         if instance_info.payload:
6441           current_mem = int(instance_info.payload['memory'])
6442         else:
6443           # Assume instance not running
6444           # (there is a slight race condition here, but it's not very probable,
6445           # and we have no other way to check)
6446           current_mem = 0
6447         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6448                     pninfo.payload['memory_free'])
6449         if miss_mem > 0:
6450           raise errors.OpPrereqError("This change will prevent the instance"
6451                                      " from starting, due to %d MB of memory"
6452                                      " missing on its primary node" % miss_mem)
6453
6454       if be_new[constants.BE_AUTO_BALANCE]:
6455         for node, nres in nodeinfo.items():
6456           if node not in instance.secondary_nodes:
6457             continue
6458           msg = nres.fail_msg
6459           if msg:
6460             self.warn.append("Can't get info from secondary node %s: %s" %
6461                              (node, msg))
6462           elif not isinstance(nres.payload.get('memory_free', None), int):
6463             self.warn.append("Secondary node %s didn't return free"
6464                              " memory information" % node)
6465           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6466             self.warn.append("Not enough memory to failover instance to"
6467                              " secondary node %s" % node)
6468
6469     # NIC processing
6470     self.nic_pnew = {}
6471     self.nic_pinst = {}
6472     for nic_op, nic_dict in self.op.nics:
6473       if nic_op == constants.DDM_REMOVE:
6474         if not instance.nics:
6475           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6476         continue
6477       if nic_op != constants.DDM_ADD:
6478         # an existing nic
6479         if nic_op < 0 or nic_op >= len(instance.nics):
6480           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6481                                      " are 0 to %d" %
6482                                      (nic_op, len(instance.nics)))
6483         old_nic_params = instance.nics[nic_op].nicparams
6484         old_nic_ip = instance.nics[nic_op].ip
6485       else:
6486         old_nic_params = {}
6487         old_nic_ip = None
6488
6489       update_params_dict = dict([(key, nic_dict[key])
6490                                  for key in constants.NICS_PARAMETERS
6491                                  if key in nic_dict])
6492
6493       if 'bridge' in nic_dict:
6494         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6495
6496       new_nic_params, new_filled_nic_params = \
6497           self._GetUpdatedParams(old_nic_params, update_params_dict,
6498                                  cluster.nicparams[constants.PP_DEFAULT],
6499                                  constants.NICS_PARAMETER_TYPES)
6500       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6501       self.nic_pinst[nic_op] = new_nic_params
6502       self.nic_pnew[nic_op] = new_filled_nic_params
6503       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6504
6505       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6506         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6507         msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6508         if msg:
6509           msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6510           if self.force:
6511             self.warn.append(msg)
6512           else:
6513             raise errors.OpPrereqError(msg)
6514       if new_nic_mode == constants.NIC_MODE_ROUTED:
6515         if 'ip' in nic_dict:
6516           nic_ip = nic_dict['ip']
6517         else:
6518           nic_ip = old_nic_ip
6519         if nic_ip is None:
6520           raise errors.OpPrereqError('Cannot set the nic ip to None'
6521                                      ' on a routed nic')
6522       if 'mac' in nic_dict:
6523         nic_mac = nic_dict['mac']
6524         if nic_mac is None:
6525           raise errors.OpPrereqError('Cannot set the nic mac to None')
6526         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6527           # otherwise generate the mac
6528           nic_dict['mac'] = self.cfg.GenerateMAC()
6529         else:
6530           # or validate/reserve the current one
6531           if self.cfg.IsMacInUse(nic_mac):
6532             raise errors.OpPrereqError("MAC address %s already in use"
6533                                        " in cluster" % nic_mac)
6534
6535     # DISK processing
6536     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6537       raise errors.OpPrereqError("Disk operations not supported for"
6538                                  " diskless instances")
6539     for disk_op, disk_dict in self.op.disks:
6540       if disk_op == constants.DDM_REMOVE:
6541         if len(instance.disks) == 1:
6542           raise errors.OpPrereqError("Cannot remove the last disk of"
6543                                      " an instance")
6544         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6545         ins_l = ins_l[pnode]
6546         msg = ins_l.fail_msg
6547         if msg:
6548           raise errors.OpPrereqError("Can't contact node %s: %s" %
6549                                      (pnode, msg))
6550         if instance.name in ins_l.payload:
6551           raise errors.OpPrereqError("Instance is running, can't remove"
6552                                      " disks.")
6553
6554       if (disk_op == constants.DDM_ADD and
6555           len(instance.nics) >= constants.MAX_DISKS):
6556         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6557                                    " add more" % constants.MAX_DISKS)
6558       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6559         # an existing disk
6560         if disk_op < 0 or disk_op >= len(instance.disks):
6561           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6562                                      " are 0 to %d" %
6563                                      (disk_op, len(instance.disks)))
6564
6565     return
6566
6567   def Exec(self, feedback_fn):
6568     """Modifies an instance.
6569
6570     All parameters take effect only at the next restart of the instance.
6571
6572     """
6573     # Process here the warnings from CheckPrereq, as we don't have a
6574     # feedback_fn there.
6575     for warn in self.warn:
6576       feedback_fn("WARNING: %s" % warn)
6577
6578     result = []
6579     instance = self.instance
6580     cluster = self.cluster
6581     # disk changes
6582     for disk_op, disk_dict in self.op.disks:
6583       if disk_op == constants.DDM_REMOVE:
6584         # remove the last disk
6585         device = instance.disks.pop()
6586         device_idx = len(instance.disks)
6587         for node, disk in device.ComputeNodeTree(instance.primary_node):
6588           self.cfg.SetDiskID(disk, node)
6589           msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6590           if msg:
6591             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6592                             " continuing anyway", device_idx, node, msg)
6593         result.append(("disk/%d" % device_idx, "remove"))
6594       elif disk_op == constants.DDM_ADD:
6595         # add a new disk
6596         if instance.disk_template == constants.DT_FILE:
6597           file_driver, file_path = instance.disks[0].logical_id
6598           file_path = os.path.dirname(file_path)
6599         else:
6600           file_driver = file_path = None
6601         disk_idx_base = len(instance.disks)
6602         new_disk = _GenerateDiskTemplate(self,
6603                                          instance.disk_template,
6604                                          instance.name, instance.primary_node,
6605                                          instance.secondary_nodes,
6606                                          [disk_dict],
6607                                          file_path,
6608                                          file_driver,
6609                                          disk_idx_base)[0]
6610         instance.disks.append(new_disk)
6611         info = _GetInstanceInfoText(instance)
6612
6613         logging.info("Creating volume %s for instance %s",
6614                      new_disk.iv_name, instance.name)
6615         # Note: this needs to be kept in sync with _CreateDisks
6616         #HARDCODE
6617         for node in instance.all_nodes:
6618           f_create = node == instance.primary_node
6619           try:
6620             _CreateBlockDev(self, node, instance, new_disk,
6621                             f_create, info, f_create)
6622           except errors.OpExecError, err:
6623             self.LogWarning("Failed to create volume %s (%s) on"
6624                             " node %s: %s",
6625                             new_disk.iv_name, new_disk, node, err)
6626         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6627                        (new_disk.size, new_disk.mode)))
6628       else:
6629         # change a given disk
6630         instance.disks[disk_op].mode = disk_dict['mode']
6631         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6632     # NIC changes
6633     for nic_op, nic_dict in self.op.nics:
6634       if nic_op == constants.DDM_REMOVE:
6635         # remove the last nic
6636         del instance.nics[-1]
6637         result.append(("nic.%d" % len(instance.nics), "remove"))
6638       elif nic_op == constants.DDM_ADD:
6639         # mac and bridge should be set, by now
6640         mac = nic_dict['mac']
6641         ip = nic_dict.get('ip', None)
6642         nicparams = self.nic_pinst[constants.DDM_ADD]
6643         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6644         instance.nics.append(new_nic)
6645         result.append(("nic.%d" % (len(instance.nics) - 1),
6646                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6647                        (new_nic.mac, new_nic.ip,
6648                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6649                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6650                        )))
6651       else:
6652         for key in 'mac', 'ip':
6653           if key in nic_dict:
6654             setattr(instance.nics[nic_op], key, nic_dict[key])
6655         if nic_op in self.nic_pnew:
6656           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6657         for key, val in nic_dict.iteritems():
6658           result.append(("nic.%s/%d" % (key, nic_op), val))
6659
6660     # hvparams changes
6661     if self.op.hvparams:
6662       instance.hvparams = self.hv_inst
6663       for key, val in self.op.hvparams.iteritems():
6664         result.append(("hv/%s" % key, val))
6665
6666     # beparams changes
6667     if self.op.beparams:
6668       instance.beparams = self.be_inst
6669       for key, val in self.op.beparams.iteritems():
6670         result.append(("be/%s" % key, val))
6671
6672     self.cfg.Update(instance)
6673
6674     return result
6675
6676
6677 class LUQueryExports(NoHooksLU):
6678   """Query the exports list
6679
6680   """
6681   _OP_REQP = ['nodes']
6682   REQ_BGL = False
6683
6684   def ExpandNames(self):
6685     self.needed_locks = {}
6686     self.share_locks[locking.LEVEL_NODE] = 1
6687     if not self.op.nodes:
6688       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6689     else:
6690       self.needed_locks[locking.LEVEL_NODE] = \
6691         _GetWantedNodes(self, self.op.nodes)
6692
6693   def CheckPrereq(self):
6694     """Check prerequisites.
6695
6696     """
6697     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6698
6699   def Exec(self, feedback_fn):
6700     """Compute the list of all the exported system images.
6701
6702     @rtype: dict
6703     @return: a dictionary with the structure node->(export-list)
6704         where export-list is a list of the instances exported on
6705         that node.
6706
6707     """
6708     rpcresult = self.rpc.call_export_list(self.nodes)
6709     result = {}
6710     for node in rpcresult:
6711       if rpcresult[node].fail_msg:
6712         result[node] = False
6713       else:
6714         result[node] = rpcresult[node].payload
6715
6716     return result
6717
6718
6719 class LUExportInstance(LogicalUnit):
6720   """Export an instance to an image in the cluster.
6721
6722   """
6723   HPATH = "instance-export"
6724   HTYPE = constants.HTYPE_INSTANCE
6725   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6726   REQ_BGL = False
6727
6728   def ExpandNames(self):
6729     self._ExpandAndLockInstance()
6730     # FIXME: lock only instance primary and destination node
6731     #
6732     # Sad but true, for now we have do lock all nodes, as we don't know where
6733     # the previous export might be, and and in this LU we search for it and
6734     # remove it from its current node. In the future we could fix this by:
6735     #  - making a tasklet to search (share-lock all), then create the new one,
6736     #    then one to remove, after
6737     #  - removing the removal operation altogether
6738     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6739
6740   def DeclareLocks(self, level):
6741     """Last minute lock declaration."""
6742     # All nodes are locked anyway, so nothing to do here.
6743
6744   def BuildHooksEnv(self):
6745     """Build hooks env.
6746
6747     This will run on the master, primary node and target node.
6748
6749     """
6750     env = {
6751       "EXPORT_NODE": self.op.target_node,
6752       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6753       }
6754     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6755     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6756           self.op.target_node]
6757     return env, nl, nl
6758
6759   def CheckPrereq(self):
6760     """Check prerequisites.
6761
6762     This checks that the instance and node names are valid.
6763
6764     """
6765     instance_name = self.op.instance_name
6766     self.instance = self.cfg.GetInstanceInfo(instance_name)
6767     assert self.instance is not None, \
6768           "Cannot retrieve locked instance %s" % self.op.instance_name
6769     _CheckNodeOnline(self, self.instance.primary_node)
6770
6771     self.dst_node = self.cfg.GetNodeInfo(
6772       self.cfg.ExpandNodeName(self.op.target_node))
6773
6774     if self.dst_node is None:
6775       # This is wrong node name, not a non-locked node
6776       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6777     _CheckNodeOnline(self, self.dst_node.name)
6778     _CheckNodeNotDrained(self, self.dst_node.name)
6779
6780     # instance disk type verification
6781     for disk in self.instance.disks:
6782       if disk.dev_type == constants.LD_FILE:
6783         raise errors.OpPrereqError("Export not supported for instances with"
6784                                    " file-based disks")
6785
6786   def Exec(self, feedback_fn):
6787     """Export an instance to an image in the cluster.
6788
6789     """
6790     instance = self.instance
6791     dst_node = self.dst_node
6792     src_node = instance.primary_node
6793     if self.op.shutdown:
6794       # shutdown the instance, but not the disks
6795       result = self.rpc.call_instance_shutdown(src_node, instance)
6796       result.Raise("Could not shutdown instance %s on"
6797                    " node %s" % (instance.name, src_node))
6798
6799     vgname = self.cfg.GetVGName()
6800
6801     snap_disks = []
6802
6803     # set the disks ID correctly since call_instance_start needs the
6804     # correct drbd minor to create the symlinks
6805     for disk in instance.disks:
6806       self.cfg.SetDiskID(disk, src_node)
6807
6808     try:
6809       for idx, disk in enumerate(instance.disks):
6810         # result.payload will be a snapshot of an lvm leaf of the one we passed
6811         result = self.rpc.call_blockdev_snapshot(src_node, disk)
6812         msg = result.fail_msg
6813         if msg:
6814           self.LogWarning("Could not snapshot disk/%s on node %s: %s",
6815                           idx, src_node, msg)
6816           snap_disks.append(False)
6817         else:
6818           disk_id = (vgname, result.payload)
6819           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6820                                  logical_id=disk_id, physical_id=disk_id,
6821                                  iv_name=disk.iv_name)
6822           snap_disks.append(new_dev)
6823
6824     finally:
6825       if self.op.shutdown and instance.admin_up:
6826         result = self.rpc.call_instance_start(src_node, instance, None, None)
6827         msg = result.fail_msg
6828         if msg:
6829           _ShutdownInstanceDisks(self, instance)
6830           raise errors.OpExecError("Could not start instance: %s" % msg)
6831
6832     # TODO: check for size
6833
6834     cluster_name = self.cfg.GetClusterName()
6835     for idx, dev in enumerate(snap_disks):
6836       if dev:
6837         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6838                                                instance, cluster_name, idx)
6839         msg = result.fail_msg
6840         if msg:
6841           self.LogWarning("Could not export disk/%s from node %s to"
6842                           " node %s: %s", idx, src_node, dst_node.name, msg)
6843         msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
6844         if msg:
6845           self.LogWarning("Could not remove snapshot for disk/%d from node"
6846                           " %s: %s", idx, src_node, msg)
6847
6848     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6849     msg = result.fail_msg
6850     if msg:
6851       self.LogWarning("Could not finalize export for instance %s"
6852                       " on node %s: %s", instance.name, dst_node.name, msg)
6853
6854     nodelist = self.cfg.GetNodeList()
6855     nodelist.remove(dst_node.name)
6856
6857     # on one-node clusters nodelist will be empty after the removal
6858     # if we proceed the backup would be removed because OpQueryExports
6859     # substitutes an empty list with the full cluster node list.
6860     iname = instance.name
6861     if nodelist:
6862       exportlist = self.rpc.call_export_list(nodelist)
6863       for node in exportlist:
6864         if exportlist[node].fail_msg:
6865           continue
6866         if iname in exportlist[node].payload:
6867           msg = self.rpc.call_export_remove(node, iname).fail_msg
6868           if msg:
6869             self.LogWarning("Could not remove older export for instance %s"
6870                             " on node %s: %s", iname, node, msg)
6871
6872
6873 class LURemoveExport(NoHooksLU):
6874   """Remove exports related to the named instance.
6875
6876   """
6877   _OP_REQP = ["instance_name"]
6878   REQ_BGL = False
6879
6880   def ExpandNames(self):
6881     self.needed_locks = {}
6882     # We need all nodes to be locked in order for RemoveExport to work, but we
6883     # don't need to lock the instance itself, as nothing will happen to it (and
6884     # we can remove exports also for a removed instance)
6885     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6886
6887   def CheckPrereq(self):
6888     """Check prerequisites.
6889     """
6890     pass
6891
6892   def Exec(self, feedback_fn):
6893     """Remove any export.
6894
6895     """
6896     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6897     # If the instance was not found we'll try with the name that was passed in.
6898     # This will only work if it was an FQDN, though.
6899     fqdn_warn = False
6900     if not instance_name:
6901       fqdn_warn = True
6902       instance_name = self.op.instance_name
6903
6904     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6905     exportlist = self.rpc.call_export_list(locked_nodes)
6906     found = False
6907     for node in exportlist:
6908       msg = exportlist[node].fail_msg
6909       if msg:
6910         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6911         continue
6912       if instance_name in exportlist[node].payload:
6913         found = True
6914         result = self.rpc.call_export_remove(node, instance_name)
6915         msg = result.fail_msg
6916         if msg:
6917           logging.error("Could not remove export for instance %s"
6918                         " on node %s: %s", instance_name, node, msg)
6919
6920     if fqdn_warn and not found:
6921       feedback_fn("Export not found. If trying to remove an export belonging"
6922                   " to a deleted instance please use its Fully Qualified"
6923                   " Domain Name.")
6924
6925
6926 class TagsLU(NoHooksLU):
6927   """Generic tags LU.
6928
6929   This is an abstract class which is the parent of all the other tags LUs.
6930
6931   """
6932
6933   def ExpandNames(self):
6934     self.needed_locks = {}
6935     if self.op.kind == constants.TAG_NODE:
6936       name = self.cfg.ExpandNodeName(self.op.name)
6937       if name is None:
6938         raise errors.OpPrereqError("Invalid node name (%s)" %
6939                                    (self.op.name,))
6940       self.op.name = name
6941       self.needed_locks[locking.LEVEL_NODE] = name
6942     elif self.op.kind == constants.TAG_INSTANCE:
6943       name = self.cfg.ExpandInstanceName(self.op.name)
6944       if name is None:
6945         raise errors.OpPrereqError("Invalid instance name (%s)" %
6946                                    (self.op.name,))
6947       self.op.name = name
6948       self.needed_locks[locking.LEVEL_INSTANCE] = name
6949
6950   def CheckPrereq(self):
6951     """Check prerequisites.
6952
6953     """
6954     if self.op.kind == constants.TAG_CLUSTER:
6955       self.target = self.cfg.GetClusterInfo()
6956     elif self.op.kind == constants.TAG_NODE:
6957       self.target = self.cfg.GetNodeInfo(self.op.name)
6958     elif self.op.kind == constants.TAG_INSTANCE:
6959       self.target = self.cfg.GetInstanceInfo(self.op.name)
6960     else:
6961       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6962                                  str(self.op.kind))
6963
6964
6965 class LUGetTags(TagsLU):
6966   """Returns the tags of a given object.
6967
6968   """
6969   _OP_REQP = ["kind", "name"]
6970   REQ_BGL = False
6971
6972   def Exec(self, feedback_fn):
6973     """Returns the tag list.
6974
6975     """
6976     return list(self.target.GetTags())
6977
6978
6979 class LUSearchTags(NoHooksLU):
6980   """Searches the tags for a given pattern.
6981
6982   """
6983   _OP_REQP = ["pattern"]
6984   REQ_BGL = False
6985
6986   def ExpandNames(self):
6987     self.needed_locks = {}
6988
6989   def CheckPrereq(self):
6990     """Check prerequisites.
6991
6992     This checks the pattern passed for validity by compiling it.
6993
6994     """
6995     try:
6996       self.re = re.compile(self.op.pattern)
6997     except re.error, err:
6998       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6999                                  (self.op.pattern, err))
7000
7001   def Exec(self, feedback_fn):
7002     """Returns the tag list.
7003
7004     """
7005     cfg = self.cfg
7006     tgts = [("/cluster", cfg.GetClusterInfo())]
7007     ilist = cfg.GetAllInstancesInfo().values()
7008     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
7009     nlist = cfg.GetAllNodesInfo().values()
7010     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
7011     results = []
7012     for path, target in tgts:
7013       for tag in target.GetTags():
7014         if self.re.search(tag):
7015           results.append((path, tag))
7016     return results
7017
7018
7019 class LUAddTags(TagsLU):
7020   """Sets a tag on a given object.
7021
7022   """
7023   _OP_REQP = ["kind", "name", "tags"]
7024   REQ_BGL = False
7025
7026   def CheckPrereq(self):
7027     """Check prerequisites.
7028
7029     This checks the type and length of the tag name and value.
7030
7031     """
7032     TagsLU.CheckPrereq(self)
7033     for tag in self.op.tags:
7034       objects.TaggableObject.ValidateTag(tag)
7035
7036   def Exec(self, feedback_fn):
7037     """Sets the tag.
7038
7039     """
7040     try:
7041       for tag in self.op.tags:
7042         self.target.AddTag(tag)
7043     except errors.TagError, err:
7044       raise errors.OpExecError("Error while setting tag: %s" % str(err))
7045     try:
7046       self.cfg.Update(self.target)
7047     except errors.ConfigurationError:
7048       raise errors.OpRetryError("There has been a modification to the"
7049                                 " config file and the operation has been"
7050                                 " aborted. Please retry.")
7051
7052
7053 class LUDelTags(TagsLU):
7054   """Delete a list of tags from a given object.
7055
7056   """
7057   _OP_REQP = ["kind", "name", "tags"]
7058   REQ_BGL = False
7059
7060   def CheckPrereq(self):
7061     """Check prerequisites.
7062
7063     This checks that we have the given tag.
7064
7065     """
7066     TagsLU.CheckPrereq(self)
7067     for tag in self.op.tags:
7068       objects.TaggableObject.ValidateTag(tag)
7069     del_tags = frozenset(self.op.tags)
7070     cur_tags = self.target.GetTags()
7071     if not del_tags <= cur_tags:
7072       diff_tags = del_tags - cur_tags
7073       diff_names = ["'%s'" % tag for tag in diff_tags]
7074       diff_names.sort()
7075       raise errors.OpPrereqError("Tag(s) %s not found" %
7076                                  (",".join(diff_names)))
7077
7078   def Exec(self, feedback_fn):
7079     """Remove the tag from the object.
7080
7081     """
7082     for tag in self.op.tags:
7083       self.target.RemoveTag(tag)
7084     try:
7085       self.cfg.Update(self.target)
7086     except errors.ConfigurationError:
7087       raise errors.OpRetryError("There has been a modification to the"
7088                                 " config file and the operation has been"
7089                                 " aborted. Please retry.")
7090
7091
7092 class LUTestDelay(NoHooksLU):
7093   """Sleep for a specified amount of time.
7094
7095   This LU sleeps on the master and/or nodes for a specified amount of
7096   time.
7097
7098   """
7099   _OP_REQP = ["duration", "on_master", "on_nodes"]
7100   REQ_BGL = False
7101
7102   def ExpandNames(self):
7103     """Expand names and set required locks.
7104
7105     This expands the node list, if any.
7106
7107     """
7108     self.needed_locks = {}
7109     if self.op.on_nodes:
7110       # _GetWantedNodes can be used here, but is not always appropriate to use
7111       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
7112       # more information.
7113       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
7114       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
7115
7116   def CheckPrereq(self):
7117     """Check prerequisites.
7118
7119     """
7120
7121   def Exec(self, feedback_fn):
7122     """Do the actual sleep.
7123
7124     """
7125     if self.op.on_master:
7126       if not utils.TestDelay(self.op.duration):
7127         raise errors.OpExecError("Error during master delay test")
7128     if self.op.on_nodes:
7129       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
7130       for node, node_result in result.items():
7131         node_result.Raise("Failure during rpc call to node %s" % node)
7132
7133
7134 class IAllocator(object):
7135   """IAllocator framework.
7136
7137   An IAllocator instance has three sets of attributes:
7138     - cfg that is needed to query the cluster
7139     - input data (all members of the _KEYS class attribute are required)
7140     - four buffer attributes (in|out_data|text), that represent the
7141       input (to the external script) in text and data structure format,
7142       and the output from it, again in two formats
7143     - the result variables from the script (success, info, nodes) for
7144       easy usage
7145
7146   """
7147   _ALLO_KEYS = [
7148     "mem_size", "disks", "disk_template",
7149     "os", "tags", "nics", "vcpus", "hypervisor",
7150     ]
7151   _RELO_KEYS = [
7152     "relocate_from",
7153     ]
7154
7155   def __init__(self, cfg, rpc, mode, name, **kwargs):
7156     self.cfg = cfg
7157     self.rpc = rpc
7158     # init buffer variables
7159     self.in_text = self.out_text = self.in_data = self.out_data = None
7160     # init all input fields so that pylint is happy
7161     self.mode = mode
7162     self.name = name
7163     self.mem_size = self.disks = self.disk_template = None
7164     self.os = self.tags = self.nics = self.vcpus = None
7165     self.hypervisor = None
7166     self.relocate_from = None
7167     # computed fields
7168     self.required_nodes = None
7169     # init result fields
7170     self.success = self.info = self.nodes = None
7171     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7172       keyset = self._ALLO_KEYS
7173     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
7174       keyset = self._RELO_KEYS
7175     else:
7176       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
7177                                    " IAllocator" % self.mode)
7178     for key in kwargs:
7179       if key not in keyset:
7180         raise errors.ProgrammerError("Invalid input parameter '%s' to"
7181                                      " IAllocator" % key)
7182       setattr(self, key, kwargs[key])
7183     for key in keyset:
7184       if key not in kwargs:
7185         raise errors.ProgrammerError("Missing input parameter '%s' to"
7186                                      " IAllocator" % key)
7187     self._BuildInputData()
7188
7189   def _ComputeClusterData(self):
7190     """Compute the generic allocator input data.
7191
7192     This is the data that is independent of the actual operation.
7193
7194     """
7195     cfg = self.cfg
7196     cluster_info = cfg.GetClusterInfo()
7197     # cluster data
7198     data = {
7199       "version": constants.IALLOCATOR_VERSION,
7200       "cluster_name": cfg.GetClusterName(),
7201       "cluster_tags": list(cluster_info.GetTags()),
7202       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
7203       # we don't have job IDs
7204       }
7205     iinfo = cfg.GetAllInstancesInfo().values()
7206     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
7207
7208     # node data
7209     node_results = {}
7210     node_list = cfg.GetNodeList()
7211
7212     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7213       hypervisor_name = self.hypervisor
7214     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
7215       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
7216
7217     node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
7218                                         hypervisor_name)
7219     node_iinfo = \
7220       self.rpc.call_all_instances_info(node_list,
7221                                        cluster_info.enabled_hypervisors)
7222     for nname, nresult in node_data.items():
7223       # first fill in static (config-based) values
7224       ninfo = cfg.GetNodeInfo(nname)
7225       pnr = {
7226         "tags": list(ninfo.GetTags()),
7227         "primary_ip": ninfo.primary_ip,
7228         "secondary_ip": ninfo.secondary_ip,
7229         "offline": ninfo.offline,
7230         "drained": ninfo.drained,
7231         "master_candidate": ninfo.master_candidate,
7232         }
7233
7234       if not ninfo.offline:
7235         nresult.Raise("Can't get data for node %s" % nname)
7236         node_iinfo[nname].Raise("Can't get node instance info from node %s" %
7237                                 nname)
7238         remote_info = nresult.payload
7239         for attr in ['memory_total', 'memory_free', 'memory_dom0',
7240                      'vg_size', 'vg_free', 'cpu_total']:
7241           if attr not in remote_info:
7242             raise errors.OpExecError("Node '%s' didn't return attribute"
7243                                      " '%s'" % (nname, attr))
7244           if not isinstance(remote_info[attr], int):
7245             raise errors.OpExecError("Node '%s' returned invalid value"
7246                                      " for '%s': %s" %
7247                                      (nname, attr, remote_info[attr]))
7248         # compute memory used by primary instances
7249         i_p_mem = i_p_up_mem = 0
7250         for iinfo, beinfo in i_list:
7251           if iinfo.primary_node == nname:
7252             i_p_mem += beinfo[constants.BE_MEMORY]
7253             if iinfo.name not in node_iinfo[nname].payload:
7254               i_used_mem = 0
7255             else:
7256               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
7257             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
7258             remote_info['memory_free'] -= max(0, i_mem_diff)
7259
7260             if iinfo.admin_up:
7261               i_p_up_mem += beinfo[constants.BE_MEMORY]
7262
7263         # compute memory used by instances
7264         pnr_dyn = {
7265           "total_memory": remote_info['memory_total'],
7266           "reserved_memory": remote_info['memory_dom0'],
7267           "free_memory": remote_info['memory_free'],
7268           "total_disk": remote_info['vg_size'],
7269           "free_disk": remote_info['vg_free'],
7270           "total_cpus": remote_info['cpu_total'],
7271           "i_pri_memory": i_p_mem,
7272           "i_pri_up_memory": i_p_up_mem,
7273           }
7274         pnr.update(pnr_dyn)
7275
7276       node_results[nname] = pnr
7277     data["nodes"] = node_results
7278
7279     # instance data
7280     instance_data = {}
7281     for iinfo, beinfo in i_list:
7282       nic_data = []
7283       for nic in iinfo.nics:
7284         filled_params = objects.FillDict(
7285             cluster_info.nicparams[constants.PP_DEFAULT],
7286             nic.nicparams)
7287         nic_dict = {"mac": nic.mac,
7288                     "ip": nic.ip,
7289                     "mode": filled_params[constants.NIC_MODE],
7290                     "link": filled_params[constants.NIC_LINK],
7291                    }
7292         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7293           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7294         nic_data.append(nic_dict)
7295       pir = {
7296         "tags": list(iinfo.GetTags()),
7297         "admin_up": iinfo.admin_up,
7298         "vcpus": beinfo[constants.BE_VCPUS],
7299         "memory": beinfo[constants.BE_MEMORY],
7300         "os": iinfo.os,
7301         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7302         "nics": nic_data,
7303         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7304         "disk_template": iinfo.disk_template,
7305         "hypervisor": iinfo.hypervisor,
7306         }
7307       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7308                                                  pir["disks"])
7309       instance_data[iinfo.name] = pir
7310
7311     data["instances"] = instance_data
7312
7313     self.in_data = data
7314
7315   def _AddNewInstance(self):
7316     """Add new instance data to allocator structure.
7317
7318     This in combination with _AllocatorGetClusterData will create the
7319     correct structure needed as input for the allocator.
7320
7321     The checks for the completeness of the opcode must have already been
7322     done.
7323
7324     """
7325     data = self.in_data
7326
7327     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7328
7329     if self.disk_template in constants.DTS_NET_MIRROR:
7330       self.required_nodes = 2
7331     else:
7332       self.required_nodes = 1
7333     request = {
7334       "type": "allocate",
7335       "name": self.name,
7336       "disk_template": self.disk_template,
7337       "tags": self.tags,
7338       "os": self.os,
7339       "vcpus": self.vcpus,
7340       "memory": self.mem_size,
7341       "disks": self.disks,
7342       "disk_space_total": disk_space,
7343       "nics": self.nics,
7344       "required_nodes": self.required_nodes,
7345       }
7346     data["request"] = request
7347
7348   def _AddRelocateInstance(self):
7349     """Add relocate instance data to allocator structure.
7350
7351     This in combination with _IAllocatorGetClusterData will create the
7352     correct structure needed as input for the allocator.
7353
7354     The checks for the completeness of the opcode must have already been
7355     done.
7356
7357     """
7358     instance = self.cfg.GetInstanceInfo(self.name)
7359     if instance is None:
7360       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7361                                    " IAllocator" % self.name)
7362
7363     if instance.disk_template not in constants.DTS_NET_MIRROR:
7364       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7365
7366     if len(instance.secondary_nodes) != 1:
7367       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7368
7369     self.required_nodes = 1
7370     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7371     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7372
7373     request = {
7374       "type": "relocate",
7375       "name": self.name,
7376       "disk_space_total": disk_space,
7377       "required_nodes": self.required_nodes,
7378       "relocate_from": self.relocate_from,
7379       }
7380     self.in_data["request"] = request
7381
7382   def _BuildInputData(self):
7383     """Build input data structures.
7384
7385     """
7386     self._ComputeClusterData()
7387
7388     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7389       self._AddNewInstance()
7390     else:
7391       self._AddRelocateInstance()
7392
7393     self.in_text = serializer.Dump(self.in_data)
7394
7395   def Run(self, name, validate=True, call_fn=None):
7396     """Run an instance allocator and return the results.
7397
7398     """
7399     if call_fn is None:
7400       call_fn = self.rpc.call_iallocator_runner
7401
7402     result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
7403     result.Raise("Failure while running the iallocator script")
7404
7405     self.out_text = result.payload
7406     if validate:
7407       self._ValidateResult()
7408
7409   def _ValidateResult(self):
7410     """Process the allocator results.
7411
7412     This will process and if successful save the result in
7413     self.out_data and the other parameters.
7414
7415     """
7416     try:
7417       rdict = serializer.Load(self.out_text)
7418     except Exception, err:
7419       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7420
7421     if not isinstance(rdict, dict):
7422       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7423
7424     for key in "success", "info", "nodes":
7425       if key not in rdict:
7426         raise errors.OpExecError("Can't parse iallocator results:"
7427                                  " missing key '%s'" % key)
7428       setattr(self, key, rdict[key])
7429
7430     if not isinstance(rdict["nodes"], list):
7431       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7432                                " is not a list")
7433     self.out_data = rdict
7434
7435
7436 class LUTestAllocator(NoHooksLU):
7437   """Run allocator tests.
7438
7439   This LU runs the allocator tests
7440
7441   """
7442   _OP_REQP = ["direction", "mode", "name"]
7443
7444   def CheckPrereq(self):
7445     """Check prerequisites.
7446
7447     This checks the opcode parameters depending on the director and mode test.
7448
7449     """
7450     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7451       for attr in ["name", "mem_size", "disks", "disk_template",
7452                    "os", "tags", "nics", "vcpus"]:
7453         if not hasattr(self.op, attr):
7454           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7455                                      attr)
7456       iname = self.cfg.ExpandInstanceName(self.op.name)
7457       if iname is not None:
7458         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7459                                    iname)
7460       if not isinstance(self.op.nics, list):
7461         raise errors.OpPrereqError("Invalid parameter 'nics'")
7462       for row in self.op.nics:
7463         if (not isinstance(row, dict) or
7464             "mac" not in row or
7465             "ip" not in row or
7466             "bridge" not in row):
7467           raise errors.OpPrereqError("Invalid contents of the"
7468                                      " 'nics' parameter")
7469       if not isinstance(self.op.disks, list):
7470         raise errors.OpPrereqError("Invalid parameter 'disks'")
7471       for row in self.op.disks:
7472         if (not isinstance(row, dict) or
7473             "size" not in row or
7474             not isinstance(row["size"], int) or
7475             "mode" not in row or
7476             row["mode"] not in ['r', 'w']):
7477           raise errors.OpPrereqError("Invalid contents of the"
7478                                      " 'disks' parameter")
7479       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7480         self.op.hypervisor = self.cfg.GetHypervisorType()
7481     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7482       if not hasattr(self.op, "name"):
7483         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7484       fname = self.cfg.ExpandInstanceName(self.op.name)
7485       if fname is None:
7486         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7487                                    self.op.name)
7488       self.op.name = fname
7489       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7490     else:
7491       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7492                                  self.op.mode)
7493
7494     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7495       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7496         raise errors.OpPrereqError("Missing allocator name")
7497     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7498       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7499                                  self.op.direction)
7500
7501   def Exec(self, feedback_fn):
7502     """Run the allocator test.
7503
7504     """
7505     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7506       ial = IAllocator(self.cfg, self.rpc,
7507                        mode=self.op.mode,
7508                        name=self.op.name,
7509                        mem_size=self.op.mem_size,
7510                        disks=self.op.disks,
7511                        disk_template=self.op.disk_template,
7512                        os=self.op.os,
7513                        tags=self.op.tags,
7514                        nics=self.op.nics,
7515                        vcpus=self.op.vcpus,
7516                        hypervisor=self.op.hypervisor,
7517                        )
7518     else:
7519       ial = IAllocator(self.cfg, self.rpc,
7520                        mode=self.op.mode,
7521                        name=self.op.name,
7522                        relocate_from=list(self.relocate_from),
7523                        )
7524
7525     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7526       result = ial.in_text
7527     else:
7528       ial.Run(self.op.allocator, validate=False)
7529       result = ial.out_text
7530     return result