cmdlib: Fix parameters for storage.FileStorage
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import re
30 import platform
31 import logging
32 import copy
33
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
43
44
45 class LogicalUnit(object):
46   """Logical Unit base class.
47
48   Subclasses must follow these rules:
49     - implement ExpandNames
50     - implement CheckPrereq (except when tasklets are used)
51     - implement Exec (except when tasklets are used)
52     - implement BuildHooksEnv
53     - redefine HPATH and HTYPE
54     - optionally redefine their run requirements:
55         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56
57   Note that all commands require root permissions.
58
59   @ivar dry_run_result: the value (if any) that will be returned to the caller
60       in dry-run mode (signalled by opcode dry_run parameter)
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overridden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict.fromkeys(locking.LEVELS, 0)
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92     self.LogStep = processor.LogStep
93     # support for dry-run
94     self.dry_run_result = None
95
96     # Tasklets
97     self.tasklets = None
98
99     for attr_name in self._OP_REQP:
100       attr_val = getattr(op, attr_name, None)
101       if attr_val is None:
102         raise errors.OpPrereqError("Required parameter '%s' missing" %
103                                    attr_name)
104
105     self.CheckArguments()
106
107   def __GetSSH(self):
108     """Returns the SshRunner object
109
110     """
111     if not self.__ssh:
112       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113     return self.__ssh
114
115   ssh = property(fget=__GetSSH)
116
117   def CheckArguments(self):
118     """Check syntactic validity for the opcode arguments.
119
120     This method is for doing a simple syntactic check and ensure
121     validity of opcode parameters, without any cluster-related
122     checks. While the same can be accomplished in ExpandNames and/or
123     CheckPrereq, doing these separate is better because:
124
125       - ExpandNames is left as as purely a lock-related function
126       - CheckPrereq is run after we have acquired locks (and possible
127         waited for them)
128
129     The function is allowed to change the self.op attribute so that
130     later methods can no longer worry about missing parameters.
131
132     """
133     pass
134
135   def ExpandNames(self):
136     """Expand names for this LU.
137
138     This method is called before starting to execute the opcode, and it should
139     update all the parameters of the opcode to their canonical form (e.g. a
140     short node name must be fully expanded after this method has successfully
141     completed). This way locking, hooks, logging, ecc. can work correctly.
142
143     LUs which implement this method must also populate the self.needed_locks
144     member, as a dict with lock levels as keys, and a list of needed lock names
145     as values. Rules:
146
147       - use an empty dict if you don't need any lock
148       - if you don't need any lock at a particular level omit that level
149       - don't put anything for the BGL level
150       - if you want all locks at a level use locking.ALL_SET as a value
151
152     If you need to share locks (rather than acquire them exclusively) at one
153     level you can modify self.share_locks, setting a true value (usually 1) for
154     that level. By default locks are not shared.
155
156     This function can also define a list of tasklets, which then will be
157     executed in order instead of the usual LU-level CheckPrereq and Exec
158     functions, if those are not defined by the LU.
159
160     Examples::
161
162       # Acquire all nodes and one instance
163       self.needed_locks = {
164         locking.LEVEL_NODE: locking.ALL_SET,
165         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
166       }
167       # Acquire just two nodes
168       self.needed_locks = {
169         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
170       }
171       # Acquire no locks
172       self.needed_locks = {} # No, you can't leave it to the default value None
173
174     """
175     # The implementation of this method is mandatory only if the new LU is
176     # concurrent, so that old LUs don't need to be changed all at the same
177     # time.
178     if self.REQ_BGL:
179       self.needed_locks = {} # Exclusive LUs don't need locks.
180     else:
181       raise NotImplementedError
182
183   def DeclareLocks(self, level):
184     """Declare LU locking needs for a level
185
186     While most LUs can just declare their locking needs at ExpandNames time,
187     sometimes there's the need to calculate some locks after having acquired
188     the ones before. This function is called just before acquiring locks at a
189     particular level, but after acquiring the ones at lower levels, and permits
190     such calculations. It can be used to modify self.needed_locks, and by
191     default it does nothing.
192
193     This function is only called if you have something already set in
194     self.needed_locks for the level.
195
196     @param level: Locking level which is going to be locked
197     @type level: member of ganeti.locking.LEVELS
198
199     """
200
201   def CheckPrereq(self):
202     """Check prerequisites for this LU.
203
204     This method should check that the prerequisites for the execution
205     of this LU are fulfilled. It can do internode communication, but
206     it should be idempotent - no cluster or system changes are
207     allowed.
208
209     The method should raise errors.OpPrereqError in case something is
210     not fulfilled. Its return value is ignored.
211
212     This method should also update all the parameters of the opcode to
213     their canonical form if it hasn't been done by ExpandNames before.
214
215     """
216     if self.tasklets is not None:
217       for (idx, tl) in enumerate(self.tasklets):
218         logging.debug("Checking prerequisites for tasklet %s/%s",
219                       idx + 1, len(self.tasklets))
220         tl.CheckPrereq()
221     else:
222       raise NotImplementedError
223
224   def Exec(self, feedback_fn):
225     """Execute the LU.
226
227     This method should implement the actual work. It should raise
228     errors.OpExecError for failures that are somewhat dealt with in
229     code, or expected.
230
231     """
232     if self.tasklets is not None:
233       for (idx, tl) in enumerate(self.tasklets):
234         logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
235         tl.Exec(feedback_fn)
236     else:
237       raise NotImplementedError
238
239   def BuildHooksEnv(self):
240     """Build hooks environment for this LU.
241
242     This method should return a three-node tuple consisting of: a dict
243     containing the environment that will be used for running the
244     specific hook for this LU, a list of node names on which the hook
245     should run before the execution, and a list of node names on which
246     the hook should run after the execution.
247
248     The keys of the dict must not have 'GANETI_' prefixed as this will
249     be handled in the hooks runner. Also note additional keys will be
250     added by the hooks runner. If the LU doesn't define any
251     environment, an empty dict (and not None) should be returned.
252
253     No nodes should be returned as an empty list (and not None).
254
255     Note that if the HPATH for a LU class is None, this function will
256     not be called.
257
258     """
259     raise NotImplementedError
260
261   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
262     """Notify the LU about the results of its hooks.
263
264     This method is called every time a hooks phase is executed, and notifies
265     the Logical Unit about the hooks' result. The LU can then use it to alter
266     its result based on the hooks.  By default the method does nothing and the
267     previous result is passed back unchanged but any LU can define it if it
268     wants to use the local cluster hook-scripts somehow.
269
270     @param phase: one of L{constants.HOOKS_PHASE_POST} or
271         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
272     @param hook_results: the results of the multi-node hooks rpc call
273     @param feedback_fn: function used send feedback back to the caller
274     @param lu_result: the previous Exec result this LU had, or None
275         in the PRE phase
276     @return: the new Exec result, based on the previous result
277         and hook results
278
279     """
280     return lu_result
281
282   def _ExpandAndLockInstance(self):
283     """Helper function to expand and lock an instance.
284
285     Many LUs that work on an instance take its name in self.op.instance_name
286     and need to expand it and then declare the expanded name for locking. This
287     function does it, and then updates self.op.instance_name to the expanded
288     name. It also initializes needed_locks as a dict, if this hasn't been done
289     before.
290
291     """
292     if self.needed_locks is None:
293       self.needed_locks = {}
294     else:
295       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
296         "_ExpandAndLockInstance called with instance-level locks set"
297     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
298     if expanded_name is None:
299       raise errors.OpPrereqError("Instance '%s' not known" %
300                                   self.op.instance_name)
301     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
302     self.op.instance_name = expanded_name
303
304   def _LockInstancesNodes(self, primary_only=False):
305     """Helper function to declare instances' nodes for locking.
306
307     This function should be called after locking one or more instances to lock
308     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
309     with all primary or secondary nodes for instances already locked and
310     present in self.needed_locks[locking.LEVEL_INSTANCE].
311
312     It should be called from DeclareLocks, and for safety only works if
313     self.recalculate_locks[locking.LEVEL_NODE] is set.
314
315     In the future it may grow parameters to just lock some instance's nodes, or
316     to just lock primaries or secondary nodes, if needed.
317
318     If should be called in DeclareLocks in a way similar to::
319
320       if level == locking.LEVEL_NODE:
321         self._LockInstancesNodes()
322
323     @type primary_only: boolean
324     @param primary_only: only lock primary nodes of locked instances
325
326     """
327     assert locking.LEVEL_NODE in self.recalculate_locks, \
328       "_LockInstancesNodes helper function called with no nodes to recalculate"
329
330     # TODO: check if we're really been called with the instance locks held
331
332     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
333     # future we might want to have different behaviors depending on the value
334     # of self.recalculate_locks[locking.LEVEL_NODE]
335     wanted_nodes = []
336     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
337       instance = self.context.cfg.GetInstanceInfo(instance_name)
338       wanted_nodes.append(instance.primary_node)
339       if not primary_only:
340         wanted_nodes.extend(instance.secondary_nodes)
341
342     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
343       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
344     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
345       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
346
347     del self.recalculate_locks[locking.LEVEL_NODE]
348
349
350 class NoHooksLU(LogicalUnit):
351   """Simple LU which runs no hooks.
352
353   This LU is intended as a parent for other LogicalUnits which will
354   run no hooks, in order to reduce duplicate code.
355
356   """
357   HPATH = None
358   HTYPE = None
359
360
361 class Tasklet:
362   """Tasklet base class.
363
364   Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
365   they can mix legacy code with tasklets. Locking needs to be done in the LU,
366   tasklets know nothing about locks.
367
368   Subclasses must follow these rules:
369     - Implement CheckPrereq
370     - Implement Exec
371
372   """
373   def __init__(self, lu):
374     self.lu = lu
375
376     # Shortcuts
377     self.cfg = lu.cfg
378     self.rpc = lu.rpc
379
380   def CheckPrereq(self):
381     """Check prerequisites for this tasklets.
382
383     This method should check whether the prerequisites for the execution of
384     this tasklet are fulfilled. It can do internode communication, but it
385     should be idempotent - no cluster or system changes are allowed.
386
387     The method should raise errors.OpPrereqError in case something is not
388     fulfilled. Its return value is ignored.
389
390     This method should also update all parameters to their canonical form if it
391     hasn't been done before.
392
393     """
394     raise NotImplementedError
395
396   def Exec(self, feedback_fn):
397     """Execute the tasklet.
398
399     This method should implement the actual work. It should raise
400     errors.OpExecError for failures that are somewhat dealt with in code, or
401     expected.
402
403     """
404     raise NotImplementedError
405
406
407 def _GetWantedNodes(lu, nodes):
408   """Returns list of checked and expanded node names.
409
410   @type lu: L{LogicalUnit}
411   @param lu: the logical unit on whose behalf we execute
412   @type nodes: list
413   @param nodes: list of node names or None for all nodes
414   @rtype: list
415   @return: the list of nodes, sorted
416   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
417
418   """
419   if not isinstance(nodes, list):
420     raise errors.OpPrereqError("Invalid argument type 'nodes'")
421
422   if not nodes:
423     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
424       " non-empty list of nodes whose name is to be expanded.")
425
426   wanted = []
427   for name in nodes:
428     node = lu.cfg.ExpandNodeName(name)
429     if node is None:
430       raise errors.OpPrereqError("No such node name '%s'" % name)
431     wanted.append(node)
432
433   return utils.NiceSort(wanted)
434
435
436 def _GetWantedInstances(lu, instances):
437   """Returns list of checked and expanded instance names.
438
439   @type lu: L{LogicalUnit}
440   @param lu: the logical unit on whose behalf we execute
441   @type instances: list
442   @param instances: list of instance names or None for all instances
443   @rtype: list
444   @return: the list of instances, sorted
445   @raise errors.OpPrereqError: if the instances parameter is wrong type
446   @raise errors.OpPrereqError: if any of the passed instances is not found
447
448   """
449   if not isinstance(instances, list):
450     raise errors.OpPrereqError("Invalid argument type 'instances'")
451
452   if instances:
453     wanted = []
454
455     for name in instances:
456       instance = lu.cfg.ExpandInstanceName(name)
457       if instance is None:
458         raise errors.OpPrereqError("No such instance name '%s'" % name)
459       wanted.append(instance)
460
461   else:
462     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
463   return wanted
464
465
466 def _CheckOutputFields(static, dynamic, selected):
467   """Checks whether all selected fields are valid.
468
469   @type static: L{utils.FieldSet}
470   @param static: static fields set
471   @type dynamic: L{utils.FieldSet}
472   @param dynamic: dynamic fields set
473
474   """
475   f = utils.FieldSet()
476   f.Extend(static)
477   f.Extend(dynamic)
478
479   delta = f.NonMatching(selected)
480   if delta:
481     raise errors.OpPrereqError("Unknown output fields selected: %s"
482                                % ",".join(delta))
483
484
485 def _CheckBooleanOpField(op, name):
486   """Validates boolean opcode parameters.
487
488   This will ensure that an opcode parameter is either a boolean value,
489   or None (but that it always exists).
490
491   """
492   val = getattr(op, name, None)
493   if not (val is None or isinstance(val, bool)):
494     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
495                                (name, str(val)))
496   setattr(op, name, val)
497
498
499 def _CheckNodeOnline(lu, node):
500   """Ensure that a given node is online.
501
502   @param lu: the LU on behalf of which we make the check
503   @param node: the node to check
504   @raise errors.OpPrereqError: if the node is offline
505
506   """
507   if lu.cfg.GetNodeInfo(node).offline:
508     raise errors.OpPrereqError("Can't use offline node %s" % node)
509
510
511 def _CheckNodeNotDrained(lu, node):
512   """Ensure that a given node is not drained.
513
514   @param lu: the LU on behalf of which we make the check
515   @param node: the node to check
516   @raise errors.OpPrereqError: if the node is drained
517
518   """
519   if lu.cfg.GetNodeInfo(node).drained:
520     raise errors.OpPrereqError("Can't use drained node %s" % node)
521
522
523 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
524                           memory, vcpus, nics, disk_template, disks,
525                           bep, hvp, hypervisor_name):
526   """Builds instance related env variables for hooks
527
528   This builds the hook environment from individual variables.
529
530   @type name: string
531   @param name: the name of the instance
532   @type primary_node: string
533   @param primary_node: the name of the instance's primary node
534   @type secondary_nodes: list
535   @param secondary_nodes: list of secondary nodes as strings
536   @type os_type: string
537   @param os_type: the name of the instance's OS
538   @type status: boolean
539   @param status: the should_run status of the instance
540   @type memory: string
541   @param memory: the memory size of the instance
542   @type vcpus: string
543   @param vcpus: the count of VCPUs the instance has
544   @type nics: list
545   @param nics: list of tuples (ip, mac, mode, link) representing
546       the NICs the instance has
547   @type disk_template: string
548   @param disk_template: the disk template of the instance
549   @type disks: list
550   @param disks: the list of (size, mode) pairs
551   @type bep: dict
552   @param bep: the backend parameters for the instance
553   @type hvp: dict
554   @param hvp: the hypervisor parameters for the instance
555   @type hypervisor_name: string
556   @param hypervisor_name: the hypervisor for the instance
557   @rtype: dict
558   @return: the hook environment for this instance
559
560   """
561   if status:
562     str_status = "up"
563   else:
564     str_status = "down"
565   env = {
566     "OP_TARGET": name,
567     "INSTANCE_NAME": name,
568     "INSTANCE_PRIMARY": primary_node,
569     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
570     "INSTANCE_OS_TYPE": os_type,
571     "INSTANCE_STATUS": str_status,
572     "INSTANCE_MEMORY": memory,
573     "INSTANCE_VCPUS": vcpus,
574     "INSTANCE_DISK_TEMPLATE": disk_template,
575     "INSTANCE_HYPERVISOR": hypervisor_name,
576   }
577
578   if nics:
579     nic_count = len(nics)
580     for idx, (ip, mac, mode, link) in enumerate(nics):
581       if ip is None:
582         ip = ""
583       env["INSTANCE_NIC%d_IP" % idx] = ip
584       env["INSTANCE_NIC%d_MAC" % idx] = mac
585       env["INSTANCE_NIC%d_MODE" % idx] = mode
586       env["INSTANCE_NIC%d_LINK" % idx] = link
587       if mode == constants.NIC_MODE_BRIDGED:
588         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
589   else:
590     nic_count = 0
591
592   env["INSTANCE_NIC_COUNT"] = nic_count
593
594   if disks:
595     disk_count = len(disks)
596     for idx, (size, mode) in enumerate(disks):
597       env["INSTANCE_DISK%d_SIZE" % idx] = size
598       env["INSTANCE_DISK%d_MODE" % idx] = mode
599   else:
600     disk_count = 0
601
602   env["INSTANCE_DISK_COUNT"] = disk_count
603
604   for source, kind in [(bep, "BE"), (hvp, "HV")]:
605     for key, value in source.items():
606       env["INSTANCE_%s_%s" % (kind, key)] = value
607
608   return env
609
610 def _NICListToTuple(lu, nics):
611   """Build a list of nic information tuples.
612
613   This list is suitable to be passed to _BuildInstanceHookEnv or as a return
614   value in LUQueryInstanceData.
615
616   @type lu:  L{LogicalUnit}
617   @param lu: the logical unit on whose behalf we execute
618   @type nics: list of L{objects.NIC}
619   @param nics: list of nics to convert to hooks tuples
620
621   """
622   hooks_nics = []
623   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
624   for nic in nics:
625     ip = nic.ip
626     mac = nic.mac
627     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
628     mode = filled_params[constants.NIC_MODE]
629     link = filled_params[constants.NIC_LINK]
630     hooks_nics.append((ip, mac, mode, link))
631   return hooks_nics
632
633 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
634   """Builds instance related env variables for hooks from an object.
635
636   @type lu: L{LogicalUnit}
637   @param lu: the logical unit on whose behalf we execute
638   @type instance: L{objects.Instance}
639   @param instance: the instance for which we should build the
640       environment
641   @type override: dict
642   @param override: dictionary with key/values that will override
643       our values
644   @rtype: dict
645   @return: the hook environment dictionary
646
647   """
648   cluster = lu.cfg.GetClusterInfo()
649   bep = cluster.FillBE(instance)
650   hvp = cluster.FillHV(instance)
651   args = {
652     'name': instance.name,
653     'primary_node': instance.primary_node,
654     'secondary_nodes': instance.secondary_nodes,
655     'os_type': instance.os,
656     'status': instance.admin_up,
657     'memory': bep[constants.BE_MEMORY],
658     'vcpus': bep[constants.BE_VCPUS],
659     'nics': _NICListToTuple(lu, instance.nics),
660     'disk_template': instance.disk_template,
661     'disks': [(disk.size, disk.mode) for disk in instance.disks],
662     'bep': bep,
663     'hvp': hvp,
664     'hypervisor_name': instance.hypervisor,
665   }
666   if override:
667     args.update(override)
668   return _BuildInstanceHookEnv(**args)
669
670
671 def _AdjustCandidatePool(lu):
672   """Adjust the candidate pool after node operations.
673
674   """
675   mod_list = lu.cfg.MaintainCandidatePool()
676   if mod_list:
677     lu.LogInfo("Promoted nodes to master candidate role: %s",
678                ", ".join(node.name for node in mod_list))
679     for name in mod_list:
680       lu.context.ReaddNode(name)
681   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
682   if mc_now > mc_max:
683     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
684                (mc_now, mc_max))
685
686
687 def _CheckNicsBridgesExist(lu, target_nics, target_node,
688                                profile=constants.PP_DEFAULT):
689   """Check that the brigdes needed by a list of nics exist.
690
691   """
692   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
693   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
694                 for nic in target_nics]
695   brlist = [params[constants.NIC_LINK] for params in paramslist
696             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
697   if brlist:
698     result = lu.rpc.call_bridges_exist(target_node, brlist)
699     result.Raise("Error checking bridges on destination node '%s'" %
700                  target_node, prereq=True)
701
702
703 def _CheckInstanceBridgesExist(lu, instance, node=None):
704   """Check that the brigdes needed by an instance exist.
705
706   """
707   if node is None:
708     node = instance.primary_node
709   _CheckNicsBridgesExist(lu, instance.nics, node)
710
711
712 def _GetNodePrimaryInstances(cfg, node_name):
713   """Returns primary instances on a node.
714
715   """
716   instances = []
717
718   for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
719     if node_name == inst.primary_node:
720       instances.append(inst)
721
722   return instances
723
724
725 def _GetNodeSecondaryInstances(cfg, node_name):
726   """Returns secondary instances on a node.
727
728   """
729   instances = []
730
731   for (_, inst) in cfg.GetAllInstancesInfo().iteritems():
732     if node_name in inst.secondary_nodes:
733       instances.append(inst)
734
735   return instances
736
737
738 def _GetStorageTypeArgs(cfg, storage_type):
739   """Returns the arguments for a storage type.
740
741   """
742   # Special case for file storage
743   if storage_type == constants.ST_FILE:
744     # storage.FileStorage wants a list of storage directories
745     return [[cfg.GetFileStorageDir()]]
746
747   return []
748
749
750 class LUDestroyCluster(NoHooksLU):
751   """Logical unit for destroying the cluster.
752
753   """
754   _OP_REQP = []
755
756   def CheckPrereq(self):
757     """Check prerequisites.
758
759     This checks whether the cluster is empty.
760
761     Any errors are signaled by raising errors.OpPrereqError.
762
763     """
764     master = self.cfg.GetMasterNode()
765
766     nodelist = self.cfg.GetNodeList()
767     if len(nodelist) != 1 or nodelist[0] != master:
768       raise errors.OpPrereqError("There are still %d node(s) in"
769                                  " this cluster." % (len(nodelist) - 1))
770     instancelist = self.cfg.GetInstanceList()
771     if instancelist:
772       raise errors.OpPrereqError("There are still %d instance(s) in"
773                                  " this cluster." % len(instancelist))
774
775   def Exec(self, feedback_fn):
776     """Destroys the cluster.
777
778     """
779     master = self.cfg.GetMasterNode()
780     result = self.rpc.call_node_stop_master(master, False)
781     result.Raise("Could not disable the master role")
782     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
783     utils.CreateBackup(priv_key)
784     utils.CreateBackup(pub_key)
785     return master
786
787
788 class LUVerifyCluster(LogicalUnit):
789   """Verifies the cluster status.
790
791   """
792   HPATH = "cluster-verify"
793   HTYPE = constants.HTYPE_CLUSTER
794   _OP_REQP = ["skip_checks"]
795   REQ_BGL = False
796
797   def ExpandNames(self):
798     self.needed_locks = {
799       locking.LEVEL_NODE: locking.ALL_SET,
800       locking.LEVEL_INSTANCE: locking.ALL_SET,
801     }
802     self.share_locks = dict.fromkeys(locking.LEVELS, 1)
803
804   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
805                   node_result, feedback_fn, master_files,
806                   drbd_map, vg_name):
807     """Run multiple tests against a node.
808
809     Test list:
810
811       - compares ganeti version
812       - checks vg existence and size > 20G
813       - checks config file checksum
814       - checks ssh to other nodes
815
816     @type nodeinfo: L{objects.Node}
817     @param nodeinfo: the node to check
818     @param file_list: required list of files
819     @param local_cksum: dictionary of local files and their checksums
820     @param node_result: the results from the node
821     @param feedback_fn: function used to accumulate results
822     @param master_files: list of files that only masters should have
823     @param drbd_map: the useddrbd minors for this node, in
824         form of minor: (instance, must_exist) which correspond to instances
825         and their running status
826     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
827
828     """
829     node = nodeinfo.name
830
831     # main result, node_result should be a non-empty dict
832     if not node_result or not isinstance(node_result, dict):
833       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
834       return True
835
836     # compares ganeti version
837     local_version = constants.PROTOCOL_VERSION
838     remote_version = node_result.get('version', None)
839     if not (remote_version and isinstance(remote_version, (list, tuple)) and
840             len(remote_version) == 2):
841       feedback_fn("  - ERROR: connection to %s failed" % (node))
842       return True
843
844     if local_version != remote_version[0]:
845       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
846                   " node %s %s" % (local_version, node, remote_version[0]))
847       return True
848
849     # node seems compatible, we can actually try to look into its results
850
851     bad = False
852
853     # full package version
854     if constants.RELEASE_VERSION != remote_version[1]:
855       feedback_fn("  - WARNING: software version mismatch: master %s,"
856                   " node %s %s" %
857                   (constants.RELEASE_VERSION, node, remote_version[1]))
858
859     # checks vg existence and size > 20G
860     if vg_name is not None:
861       vglist = node_result.get(constants.NV_VGLIST, None)
862       if not vglist:
863         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
864                         (node,))
865         bad = True
866       else:
867         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
868                                               constants.MIN_VG_SIZE)
869         if vgstatus:
870           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
871           bad = True
872
873     # checks config file checksum
874
875     remote_cksum = node_result.get(constants.NV_FILELIST, None)
876     if not isinstance(remote_cksum, dict):
877       bad = True
878       feedback_fn("  - ERROR: node hasn't returned file checksum data")
879     else:
880       for file_name in file_list:
881         node_is_mc = nodeinfo.master_candidate
882         must_have_file = file_name not in master_files
883         if file_name not in remote_cksum:
884           if node_is_mc or must_have_file:
885             bad = True
886             feedback_fn("  - ERROR: file '%s' missing" % file_name)
887         elif remote_cksum[file_name] != local_cksum[file_name]:
888           if node_is_mc or must_have_file:
889             bad = True
890             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
891           else:
892             # not candidate and this is not a must-have file
893             bad = True
894             feedback_fn("  - ERROR: file '%s' should not exist on non master"
895                         " candidates (and the file is outdated)" % file_name)
896         else:
897           # all good, except non-master/non-must have combination
898           if not node_is_mc and not must_have_file:
899             feedback_fn("  - ERROR: file '%s' should not exist on non master"
900                         " candidates" % file_name)
901
902     # checks ssh to any
903
904     if constants.NV_NODELIST not in node_result:
905       bad = True
906       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
907     else:
908       if node_result[constants.NV_NODELIST]:
909         bad = True
910         for node in node_result[constants.NV_NODELIST]:
911           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
912                           (node, node_result[constants.NV_NODELIST][node]))
913
914     if constants.NV_NODENETTEST not in node_result:
915       bad = True
916       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
917     else:
918       if node_result[constants.NV_NODENETTEST]:
919         bad = True
920         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
921         for node in nlist:
922           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
923                           (node, node_result[constants.NV_NODENETTEST][node]))
924
925     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
926     if isinstance(hyp_result, dict):
927       for hv_name, hv_result in hyp_result.iteritems():
928         if hv_result is not None:
929           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
930                       (hv_name, hv_result))
931
932     # check used drbd list
933     if vg_name is not None:
934       used_minors = node_result.get(constants.NV_DRBDLIST, [])
935       if not isinstance(used_minors, (tuple, list)):
936         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
937                     str(used_minors))
938       else:
939         for minor, (iname, must_exist) in drbd_map.items():
940           if minor not in used_minors and must_exist:
941             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
942                         " not active" % (minor, iname))
943             bad = True
944         for minor in used_minors:
945           if minor not in drbd_map:
946             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
947                         minor)
948             bad = True
949
950     return bad
951
952   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
953                       node_instance, feedback_fn, n_offline):
954     """Verify an instance.
955
956     This function checks to see if the required block devices are
957     available on the instance's node.
958
959     """
960     bad = False
961
962     node_current = instanceconfig.primary_node
963
964     node_vol_should = {}
965     instanceconfig.MapLVsByNode(node_vol_should)
966
967     for node in node_vol_should:
968       if node in n_offline:
969         # ignore missing volumes on offline nodes
970         continue
971       for volume in node_vol_should[node]:
972         if node not in node_vol_is or volume not in node_vol_is[node]:
973           feedback_fn("  - ERROR: volume %s missing on node %s" %
974                           (volume, node))
975           bad = True
976
977     if instanceconfig.admin_up:
978       if ((node_current not in node_instance or
979           not instance in node_instance[node_current]) and
980           node_current not in n_offline):
981         feedback_fn("  - ERROR: instance %s not running on node %s" %
982                         (instance, node_current))
983         bad = True
984
985     for node in node_instance:
986       if (not node == node_current):
987         if instance in node_instance[node]:
988           feedback_fn("  - ERROR: instance %s should not run on node %s" %
989                           (instance, node))
990           bad = True
991
992     return bad
993
994   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
995     """Verify if there are any unknown volumes in the cluster.
996
997     The .os, .swap and backup volumes are ignored. All other volumes are
998     reported as unknown.
999
1000     """
1001     bad = False
1002
1003     for node in node_vol_is:
1004       for volume in node_vol_is[node]:
1005         if node not in node_vol_should or volume not in node_vol_should[node]:
1006           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
1007                       (volume, node))
1008           bad = True
1009     return bad
1010
1011   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
1012     """Verify the list of running instances.
1013
1014     This checks what instances are running but unknown to the cluster.
1015
1016     """
1017     bad = False
1018     for node in node_instance:
1019       for runninginstance in node_instance[node]:
1020         if runninginstance not in instancelist:
1021           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
1022                           (runninginstance, node))
1023           bad = True
1024     return bad
1025
1026   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
1027     """Verify N+1 Memory Resilience.
1028
1029     Check that if one single node dies we can still start all the instances it
1030     was primary for.
1031
1032     """
1033     bad = False
1034
1035     for node, nodeinfo in node_info.iteritems():
1036       # This code checks that every node which is now listed as secondary has
1037       # enough memory to host all instances it is supposed to should a single
1038       # other node in the cluster fail.
1039       # FIXME: not ready for failover to an arbitrary node
1040       # FIXME: does not support file-backed instances
1041       # WARNING: we currently take into account down instances as well as up
1042       # ones, considering that even if they're down someone might want to start
1043       # them even in the event of a node failure.
1044       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
1045         needed_mem = 0
1046         for instance in instances:
1047           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
1048           if bep[constants.BE_AUTO_BALANCE]:
1049             needed_mem += bep[constants.BE_MEMORY]
1050         if nodeinfo['mfree'] < needed_mem:
1051           feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
1052                       " failovers should node %s fail" % (node, prinode))
1053           bad = True
1054     return bad
1055
1056   def CheckPrereq(self):
1057     """Check prerequisites.
1058
1059     Transform the list of checks we're going to skip into a set and check that
1060     all its members are valid.
1061
1062     """
1063     self.skip_set = frozenset(self.op.skip_checks)
1064     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
1065       raise errors.OpPrereqError("Invalid checks to be skipped specified")
1066
1067   def BuildHooksEnv(self):
1068     """Build hooks env.
1069
1070     Cluster-Verify hooks just ran in the post phase and their failure makes
1071     the output be logged in the verify output and the verification to fail.
1072
1073     """
1074     all_nodes = self.cfg.GetNodeList()
1075     env = {
1076       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
1077       }
1078     for node in self.cfg.GetAllNodesInfo().values():
1079       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
1080
1081     return env, [], all_nodes
1082
1083   def Exec(self, feedback_fn):
1084     """Verify integrity of cluster, performing various test on nodes.
1085
1086     """
1087     bad = False
1088     feedback_fn("* Verifying global settings")
1089     for msg in self.cfg.VerifyConfig():
1090       feedback_fn("  - ERROR: %s" % msg)
1091
1092     vg_name = self.cfg.GetVGName()
1093     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
1094     nodelist = utils.NiceSort(self.cfg.GetNodeList())
1095     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
1096     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
1097     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
1098                         for iname in instancelist)
1099     i_non_redundant = [] # Non redundant instances
1100     i_non_a_balanced = [] # Non auto-balanced instances
1101     n_offline = [] # List of offline nodes
1102     n_drained = [] # List of nodes being drained
1103     node_volume = {}
1104     node_instance = {}
1105     node_info = {}
1106     instance_cfg = {}
1107
1108     # FIXME: verify OS list
1109     # do local checksums
1110     master_files = [constants.CLUSTER_CONF_FILE]
1111
1112     file_names = ssconf.SimpleStore().GetFileList()
1113     file_names.append(constants.SSL_CERT_FILE)
1114     file_names.append(constants.RAPI_CERT_FILE)
1115     file_names.extend(master_files)
1116
1117     local_checksums = utils.FingerprintFiles(file_names)
1118
1119     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1120     node_verify_param = {
1121       constants.NV_FILELIST: file_names,
1122       constants.NV_NODELIST: [node.name for node in nodeinfo
1123                               if not node.offline],
1124       constants.NV_HYPERVISOR: hypervisors,
1125       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1126                                   node.secondary_ip) for node in nodeinfo
1127                                  if not node.offline],
1128       constants.NV_INSTANCELIST: hypervisors,
1129       constants.NV_VERSION: None,
1130       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1131       }
1132     if vg_name is not None:
1133       node_verify_param[constants.NV_VGLIST] = None
1134       node_verify_param[constants.NV_LVLIST] = vg_name
1135       node_verify_param[constants.NV_DRBDLIST] = None
1136     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1137                                            self.cfg.GetClusterName())
1138
1139     cluster = self.cfg.GetClusterInfo()
1140     master_node = self.cfg.GetMasterNode()
1141     all_drbd_map = self.cfg.ComputeDRBDMap()
1142
1143     for node_i in nodeinfo:
1144       node = node_i.name
1145
1146       if node_i.offline:
1147         feedback_fn("* Skipping offline node %s" % (node,))
1148         n_offline.append(node)
1149         continue
1150
1151       if node == master_node:
1152         ntype = "master"
1153       elif node_i.master_candidate:
1154         ntype = "master candidate"
1155       elif node_i.drained:
1156         ntype = "drained"
1157         n_drained.append(node)
1158       else:
1159         ntype = "regular"
1160       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1161
1162       msg = all_nvinfo[node].fail_msg
1163       if msg:
1164         feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1165         bad = True
1166         continue
1167
1168       nresult = all_nvinfo[node].payload
1169       node_drbd = {}
1170       for minor, instance in all_drbd_map[node].items():
1171         if instance not in instanceinfo:
1172           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1173                       instance)
1174           # ghost instance should not be running, but otherwise we
1175           # don't give double warnings (both ghost instance and
1176           # unallocated minor in use)
1177           node_drbd[minor] = (instance, False)
1178         else:
1179           instance = instanceinfo[instance]
1180           node_drbd[minor] = (instance.name, instance.admin_up)
1181       result = self._VerifyNode(node_i, file_names, local_checksums,
1182                                 nresult, feedback_fn, master_files,
1183                                 node_drbd, vg_name)
1184       bad = bad or result
1185
1186       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1187       if vg_name is None:
1188         node_volume[node] = {}
1189       elif isinstance(lvdata, basestring):
1190         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1191                     (node, utils.SafeEncode(lvdata)))
1192         bad = True
1193         node_volume[node] = {}
1194       elif not isinstance(lvdata, dict):
1195         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1196         bad = True
1197         continue
1198       else:
1199         node_volume[node] = lvdata
1200
1201       # node_instance
1202       idata = nresult.get(constants.NV_INSTANCELIST, None)
1203       if not isinstance(idata, list):
1204         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1205                     (node,))
1206         bad = True
1207         continue
1208
1209       node_instance[node] = idata
1210
1211       # node_info
1212       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1213       if not isinstance(nodeinfo, dict):
1214         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1215         bad = True
1216         continue
1217
1218       try:
1219         node_info[node] = {
1220           "mfree": int(nodeinfo['memory_free']),
1221           "pinst": [],
1222           "sinst": [],
1223           # dictionary holding all instances this node is secondary for,
1224           # grouped by their primary node. Each key is a cluster node, and each
1225           # value is a list of instances which have the key as primary and the
1226           # current node as secondary.  this is handy to calculate N+1 memory
1227           # availability if you can only failover from a primary to its
1228           # secondary.
1229           "sinst-by-pnode": {},
1230         }
1231         # FIXME: devise a free space model for file based instances as well
1232         if vg_name is not None:
1233           if (constants.NV_VGLIST not in nresult or
1234               vg_name not in nresult[constants.NV_VGLIST]):
1235             feedback_fn("  - ERROR: node %s didn't return data for the"
1236                         " volume group '%s' - it is either missing or broken" %
1237                         (node, vg_name))
1238             bad = True
1239             continue
1240           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1241       except (ValueError, KeyError):
1242         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1243                     " from node %s" % (node,))
1244         bad = True
1245         continue
1246
1247     node_vol_should = {}
1248
1249     for instance in instancelist:
1250       feedback_fn("* Verifying instance %s" % instance)
1251       inst_config = instanceinfo[instance]
1252       result =  self._VerifyInstance(instance, inst_config, node_volume,
1253                                      node_instance, feedback_fn, n_offline)
1254       bad = bad or result
1255       inst_nodes_offline = []
1256
1257       inst_config.MapLVsByNode(node_vol_should)
1258
1259       instance_cfg[instance] = inst_config
1260
1261       pnode = inst_config.primary_node
1262       if pnode in node_info:
1263         node_info[pnode]['pinst'].append(instance)
1264       elif pnode not in n_offline:
1265         feedback_fn("  - ERROR: instance %s, connection to primary node"
1266                     " %s failed" % (instance, pnode))
1267         bad = True
1268
1269       if pnode in n_offline:
1270         inst_nodes_offline.append(pnode)
1271
1272       # If the instance is non-redundant we cannot survive losing its primary
1273       # node, so we are not N+1 compliant. On the other hand we have no disk
1274       # templates with more than one secondary so that situation is not well
1275       # supported either.
1276       # FIXME: does not support file-backed instances
1277       if len(inst_config.secondary_nodes) == 0:
1278         i_non_redundant.append(instance)
1279       elif len(inst_config.secondary_nodes) > 1:
1280         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1281                     % instance)
1282
1283       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1284         i_non_a_balanced.append(instance)
1285
1286       for snode in inst_config.secondary_nodes:
1287         if snode in node_info:
1288           node_info[snode]['sinst'].append(instance)
1289           if pnode not in node_info[snode]['sinst-by-pnode']:
1290             node_info[snode]['sinst-by-pnode'][pnode] = []
1291           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1292         elif snode not in n_offline:
1293           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1294                       " %s failed" % (instance, snode))
1295           bad = True
1296         if snode in n_offline:
1297           inst_nodes_offline.append(snode)
1298
1299       if inst_nodes_offline:
1300         # warn that the instance lives on offline nodes, and set bad=True
1301         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1302                     ", ".join(inst_nodes_offline))
1303         bad = True
1304
1305     feedback_fn("* Verifying orphan volumes")
1306     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1307                                        feedback_fn)
1308     bad = bad or result
1309
1310     feedback_fn("* Verifying remaining instances")
1311     result = self._VerifyOrphanInstances(instancelist, node_instance,
1312                                          feedback_fn)
1313     bad = bad or result
1314
1315     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1316       feedback_fn("* Verifying N+1 Memory redundancy")
1317       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1318       bad = bad or result
1319
1320     feedback_fn("* Other Notes")
1321     if i_non_redundant:
1322       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1323                   % len(i_non_redundant))
1324
1325     if i_non_a_balanced:
1326       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1327                   % len(i_non_a_balanced))
1328
1329     if n_offline:
1330       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1331
1332     if n_drained:
1333       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1334
1335     return not bad
1336
1337   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1338     """Analyze the post-hooks' result
1339
1340     This method analyses the hook result, handles it, and sends some
1341     nicely-formatted feedback back to the user.
1342
1343     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1344         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1345     @param hooks_results: the results of the multi-node hooks rpc call
1346     @param feedback_fn: function used send feedback back to the caller
1347     @param lu_result: previous Exec result
1348     @return: the new Exec result, based on the previous result
1349         and hook results
1350
1351     """
1352     # We only really run POST phase hooks, and are only interested in
1353     # their results
1354     if phase == constants.HOOKS_PHASE_POST:
1355       # Used to change hooks' output to proper indentation
1356       indent_re = re.compile('^', re.M)
1357       feedback_fn("* Hooks Results")
1358       if not hooks_results:
1359         feedback_fn("  - ERROR: general communication failure")
1360         lu_result = 1
1361       else:
1362         for node_name in hooks_results:
1363           show_node_header = True
1364           res = hooks_results[node_name]
1365           msg = res.fail_msg
1366           if msg:
1367             if res.offline:
1368               # no need to warn or set fail return value
1369               continue
1370             feedback_fn("    Communication failure in hooks execution: %s" %
1371                         msg)
1372             lu_result = 1
1373             continue
1374           for script, hkr, output in res.payload:
1375             if hkr == constants.HKR_FAIL:
1376               # The node header is only shown once, if there are
1377               # failing hooks on that node
1378               if show_node_header:
1379                 feedback_fn("  Node %s:" % node_name)
1380                 show_node_header = False
1381               feedback_fn("    ERROR: Script %s failed, output:" % script)
1382               output = indent_re.sub('      ', output)
1383               feedback_fn("%s" % output)
1384               lu_result = 1
1385
1386       return lu_result
1387
1388
1389 class LUVerifyDisks(NoHooksLU):
1390   """Verifies the cluster disks status.
1391
1392   """
1393   _OP_REQP = []
1394   REQ_BGL = False
1395
1396   def ExpandNames(self):
1397     self.needed_locks = {
1398       locking.LEVEL_NODE: locking.ALL_SET,
1399       locking.LEVEL_INSTANCE: locking.ALL_SET,
1400     }
1401     self.share_locks = dict.fromkeys(locking.LEVELS, 1)
1402
1403   def CheckPrereq(self):
1404     """Check prerequisites.
1405
1406     This has no prerequisites.
1407
1408     """
1409     pass
1410
1411   def Exec(self, feedback_fn):
1412     """Verify integrity of cluster disks.
1413
1414     @rtype: tuple of three items
1415     @return: a tuple of (dict of node-to-node_error, list of instances
1416         which need activate-disks, dict of instance: (node, volume) for
1417         missing volumes
1418
1419     """
1420     result = res_nodes, res_instances, res_missing = {}, [], {}
1421
1422     vg_name = self.cfg.GetVGName()
1423     nodes = utils.NiceSort(self.cfg.GetNodeList())
1424     instances = [self.cfg.GetInstanceInfo(name)
1425                  for name in self.cfg.GetInstanceList()]
1426
1427     nv_dict = {}
1428     for inst in instances:
1429       inst_lvs = {}
1430       if (not inst.admin_up or
1431           inst.disk_template not in constants.DTS_NET_MIRROR):
1432         continue
1433       inst.MapLVsByNode(inst_lvs)
1434       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1435       for node, vol_list in inst_lvs.iteritems():
1436         for vol in vol_list:
1437           nv_dict[(node, vol)] = inst
1438
1439     if not nv_dict:
1440       return result
1441
1442     node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1443
1444     for node in nodes:
1445       # node_volume
1446       node_res = node_lvs[node]
1447       if node_res.offline:
1448         continue
1449       msg = node_res.fail_msg
1450       if msg:
1451         logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1452         res_nodes[node] = msg
1453         continue
1454
1455       lvs = node_res.payload
1456       for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1457         inst = nv_dict.pop((node, lv_name), None)
1458         if (not lv_online and inst is not None
1459             and inst.name not in res_instances):
1460           res_instances.append(inst.name)
1461
1462     # any leftover items in nv_dict are missing LVs, let's arrange the
1463     # data better
1464     for key, inst in nv_dict.iteritems():
1465       if inst.name not in res_missing:
1466         res_missing[inst.name] = []
1467       res_missing[inst.name].append(key)
1468
1469     return result
1470
1471
1472 class LURenameCluster(LogicalUnit):
1473   """Rename the cluster.
1474
1475   """
1476   HPATH = "cluster-rename"
1477   HTYPE = constants.HTYPE_CLUSTER
1478   _OP_REQP = ["name"]
1479
1480   def BuildHooksEnv(self):
1481     """Build hooks env.
1482
1483     """
1484     env = {
1485       "OP_TARGET": self.cfg.GetClusterName(),
1486       "NEW_NAME": self.op.name,
1487       }
1488     mn = self.cfg.GetMasterNode()
1489     return env, [mn], [mn]
1490
1491   def CheckPrereq(self):
1492     """Verify that the passed name is a valid one.
1493
1494     """
1495     hostname = utils.HostInfo(self.op.name)
1496
1497     new_name = hostname.name
1498     self.ip = new_ip = hostname.ip
1499     old_name = self.cfg.GetClusterName()
1500     old_ip = self.cfg.GetMasterIP()
1501     if new_name == old_name and new_ip == old_ip:
1502       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1503                                  " cluster has changed")
1504     if new_ip != old_ip:
1505       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1506         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1507                                    " reachable on the network. Aborting." %
1508                                    new_ip)
1509
1510     self.op.name = new_name
1511
1512   def Exec(self, feedback_fn):
1513     """Rename the cluster.
1514
1515     """
1516     clustername = self.op.name
1517     ip = self.ip
1518
1519     # shutdown the master IP
1520     master = self.cfg.GetMasterNode()
1521     result = self.rpc.call_node_stop_master(master, False)
1522     result.Raise("Could not disable the master role")
1523
1524     try:
1525       cluster = self.cfg.GetClusterInfo()
1526       cluster.cluster_name = clustername
1527       cluster.master_ip = ip
1528       self.cfg.Update(cluster)
1529
1530       # update the known hosts file
1531       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1532       node_list = self.cfg.GetNodeList()
1533       try:
1534         node_list.remove(master)
1535       except ValueError:
1536         pass
1537       result = self.rpc.call_upload_file(node_list,
1538                                          constants.SSH_KNOWN_HOSTS_FILE)
1539       for to_node, to_result in result.iteritems():
1540         msg = to_result.fail_msg
1541         if msg:
1542           msg = ("Copy of file %s to node %s failed: %s" %
1543                  (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1544           self.proc.LogWarning(msg)
1545
1546     finally:
1547       result = self.rpc.call_node_start_master(master, False, False)
1548       msg = result.fail_msg
1549       if msg:
1550         self.LogWarning("Could not re-enable the master role on"
1551                         " the master, please restart manually: %s", msg)
1552
1553
1554 def _RecursiveCheckIfLVMBased(disk):
1555   """Check if the given disk or its children are lvm-based.
1556
1557   @type disk: L{objects.Disk}
1558   @param disk: the disk to check
1559   @rtype: boolean
1560   @return: boolean indicating whether a LD_LV dev_type was found or not
1561
1562   """
1563   if disk.children:
1564     for chdisk in disk.children:
1565       if _RecursiveCheckIfLVMBased(chdisk):
1566         return True
1567   return disk.dev_type == constants.LD_LV
1568
1569
1570 class LUSetClusterParams(LogicalUnit):
1571   """Change the parameters of the cluster.
1572
1573   """
1574   HPATH = "cluster-modify"
1575   HTYPE = constants.HTYPE_CLUSTER
1576   _OP_REQP = []
1577   REQ_BGL = False
1578
1579   def CheckArguments(self):
1580     """Check parameters
1581
1582     """
1583     if not hasattr(self.op, "candidate_pool_size"):
1584       self.op.candidate_pool_size = None
1585     if self.op.candidate_pool_size is not None:
1586       try:
1587         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1588       except (ValueError, TypeError), err:
1589         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1590                                    str(err))
1591       if self.op.candidate_pool_size < 1:
1592         raise errors.OpPrereqError("At least one master candidate needed")
1593
1594   def ExpandNames(self):
1595     # FIXME: in the future maybe other cluster params won't require checking on
1596     # all nodes to be modified.
1597     self.needed_locks = {
1598       locking.LEVEL_NODE: locking.ALL_SET,
1599     }
1600     self.share_locks[locking.LEVEL_NODE] = 1
1601
1602   def BuildHooksEnv(self):
1603     """Build hooks env.
1604
1605     """
1606     env = {
1607       "OP_TARGET": self.cfg.GetClusterName(),
1608       "NEW_VG_NAME": self.op.vg_name,
1609       }
1610     mn = self.cfg.GetMasterNode()
1611     return env, [mn], [mn]
1612
1613   def CheckPrereq(self):
1614     """Check prerequisites.
1615
1616     This checks whether the given params don't conflict and
1617     if the given volume group is valid.
1618
1619     """
1620     if self.op.vg_name is not None and not self.op.vg_name:
1621       instances = self.cfg.GetAllInstancesInfo().values()
1622       for inst in instances:
1623         for disk in inst.disks:
1624           if _RecursiveCheckIfLVMBased(disk):
1625             raise errors.OpPrereqError("Cannot disable lvm storage while"
1626                                        " lvm-based instances exist")
1627
1628     node_list = self.acquired_locks[locking.LEVEL_NODE]
1629
1630     # if vg_name not None, checks given volume group on all nodes
1631     if self.op.vg_name:
1632       vglist = self.rpc.call_vg_list(node_list)
1633       for node in node_list:
1634         msg = vglist[node].fail_msg
1635         if msg:
1636           # ignoring down node
1637           self.LogWarning("Error while gathering data on node %s"
1638                           " (ignoring node): %s", node, msg)
1639           continue
1640         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1641                                               self.op.vg_name,
1642                                               constants.MIN_VG_SIZE)
1643         if vgstatus:
1644           raise errors.OpPrereqError("Error on node '%s': %s" %
1645                                      (node, vgstatus))
1646
1647     self.cluster = cluster = self.cfg.GetClusterInfo()
1648     # validate params changes
1649     if self.op.beparams:
1650       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1651       self.new_beparams = objects.FillDict(
1652         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1653
1654     if self.op.nicparams:
1655       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1656       self.new_nicparams = objects.FillDict(
1657         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1658       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1659
1660     # hypervisor list/parameters
1661     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1662     if self.op.hvparams:
1663       if not isinstance(self.op.hvparams, dict):
1664         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1665       for hv_name, hv_dict in self.op.hvparams.items():
1666         if hv_name not in self.new_hvparams:
1667           self.new_hvparams[hv_name] = hv_dict
1668         else:
1669           self.new_hvparams[hv_name].update(hv_dict)
1670
1671     if self.op.enabled_hypervisors is not None:
1672       self.hv_list = self.op.enabled_hypervisors
1673       if not self.hv_list:
1674         raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1675                                    " least one member")
1676       invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1677       if invalid_hvs:
1678         raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1679                                    " entries: %s" % invalid_hvs)
1680     else:
1681       self.hv_list = cluster.enabled_hypervisors
1682
1683     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1684       # either the enabled list has changed, or the parameters have, validate
1685       for hv_name, hv_params in self.new_hvparams.items():
1686         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1687             (self.op.enabled_hypervisors and
1688              hv_name in self.op.enabled_hypervisors)):
1689           # either this is a new hypervisor, or its parameters have changed
1690           hv_class = hypervisor.GetHypervisor(hv_name)
1691           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1692           hv_class.CheckParameterSyntax(hv_params)
1693           _CheckHVParams(self, node_list, hv_name, hv_params)
1694
1695   def Exec(self, feedback_fn):
1696     """Change the parameters of the cluster.
1697
1698     """
1699     if self.op.vg_name is not None:
1700       new_volume = self.op.vg_name
1701       if not new_volume:
1702         new_volume = None
1703       if new_volume != self.cfg.GetVGName():
1704         self.cfg.SetVGName(new_volume)
1705       else:
1706         feedback_fn("Cluster LVM configuration already in desired"
1707                     " state, not changing")
1708     if self.op.hvparams:
1709       self.cluster.hvparams = self.new_hvparams
1710     if self.op.enabled_hypervisors is not None:
1711       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1712     if self.op.beparams:
1713       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1714     if self.op.nicparams:
1715       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1716
1717     if self.op.candidate_pool_size is not None:
1718       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1719       # we need to update the pool size here, otherwise the save will fail
1720       _AdjustCandidatePool(self)
1721
1722     self.cfg.Update(self.cluster)
1723
1724
1725 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1726   """Distribute additional files which are part of the cluster configuration.
1727
1728   ConfigWriter takes care of distributing the config and ssconf files, but
1729   there are more files which should be distributed to all nodes. This function
1730   makes sure those are copied.
1731
1732   @param lu: calling logical unit
1733   @param additional_nodes: list of nodes not in the config to distribute to
1734
1735   """
1736   # 1. Gather target nodes
1737   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1738   dist_nodes = lu.cfg.GetNodeList()
1739   if additional_nodes is not None:
1740     dist_nodes.extend(additional_nodes)
1741   if myself.name in dist_nodes:
1742     dist_nodes.remove(myself.name)
1743   # 2. Gather files to distribute
1744   dist_files = set([constants.ETC_HOSTS,
1745                     constants.SSH_KNOWN_HOSTS_FILE,
1746                     constants.RAPI_CERT_FILE,
1747                     constants.RAPI_USERS_FILE,
1748                     constants.HMAC_CLUSTER_KEY,
1749                    ])
1750
1751   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1752   for hv_name in enabled_hypervisors:
1753     hv_class = hypervisor.GetHypervisor(hv_name)
1754     dist_files.update(hv_class.GetAncillaryFiles())
1755
1756   # 3. Perform the files upload
1757   for fname in dist_files:
1758     if os.path.exists(fname):
1759       result = lu.rpc.call_upload_file(dist_nodes, fname)
1760       for to_node, to_result in result.items():
1761         msg = to_result.fail_msg
1762         if msg:
1763           msg = ("Copy of file %s to node %s failed: %s" %
1764                  (fname, to_node, msg))
1765           lu.proc.LogWarning(msg)
1766
1767
1768 class LURedistributeConfig(NoHooksLU):
1769   """Force the redistribution of cluster configuration.
1770
1771   This is a very simple LU.
1772
1773   """
1774   _OP_REQP = []
1775   REQ_BGL = False
1776
1777   def ExpandNames(self):
1778     self.needed_locks = {
1779       locking.LEVEL_NODE: locking.ALL_SET,
1780     }
1781     self.share_locks[locking.LEVEL_NODE] = 1
1782
1783   def CheckPrereq(self):
1784     """Check prerequisites.
1785
1786     """
1787
1788   def Exec(self, feedback_fn):
1789     """Redistribute the configuration.
1790
1791     """
1792     self.cfg.Update(self.cfg.GetClusterInfo())
1793     _RedistributeAncillaryFiles(self)
1794
1795
1796 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1797   """Sleep and poll for an instance's disk to sync.
1798
1799   """
1800   if not instance.disks:
1801     return True
1802
1803   if not oneshot:
1804     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1805
1806   node = instance.primary_node
1807
1808   for dev in instance.disks:
1809     lu.cfg.SetDiskID(dev, node)
1810
1811   retries = 0
1812   degr_retries = 10 # in seconds, as we sleep 1 second each time
1813   while True:
1814     max_time = 0
1815     done = True
1816     cumul_degraded = False
1817     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1818     msg = rstats.fail_msg
1819     if msg:
1820       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1821       retries += 1
1822       if retries >= 10:
1823         raise errors.RemoteError("Can't contact node %s for mirror data,"
1824                                  " aborting." % node)
1825       time.sleep(6)
1826       continue
1827     rstats = rstats.payload
1828     retries = 0
1829     for i, mstat in enumerate(rstats):
1830       if mstat is None:
1831         lu.LogWarning("Can't compute data for node %s/%s",
1832                            node, instance.disks[i].iv_name)
1833         continue
1834       # we ignore the ldisk parameter
1835       perc_done, est_time, is_degraded, _ = mstat
1836       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1837       if perc_done is not None:
1838         done = False
1839         if est_time is not None:
1840           rem_time = "%d estimated seconds remaining" % est_time
1841           max_time = est_time
1842         else:
1843           rem_time = "no time estimate"
1844         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1845                         (instance.disks[i].iv_name, perc_done, rem_time))
1846
1847     # if we're done but degraded, let's do a few small retries, to
1848     # make sure we see a stable and not transient situation; therefore
1849     # we force restart of the loop
1850     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1851       logging.info("Degraded disks found, %d retries left", degr_retries)
1852       degr_retries -= 1
1853       time.sleep(1)
1854       continue
1855
1856     if done or oneshot:
1857       break
1858
1859     time.sleep(min(60, max_time))
1860
1861   if done:
1862     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1863   return not cumul_degraded
1864
1865
1866 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1867   """Check that mirrors are not degraded.
1868
1869   The ldisk parameter, if True, will change the test from the
1870   is_degraded attribute (which represents overall non-ok status for
1871   the device(s)) to the ldisk (representing the local storage status).
1872
1873   """
1874   lu.cfg.SetDiskID(dev, node)
1875   if ldisk:
1876     idx = 6
1877   else:
1878     idx = 5
1879
1880   result = True
1881   if on_primary or dev.AssembleOnSecondary():
1882     rstats = lu.rpc.call_blockdev_find(node, dev)
1883     msg = rstats.fail_msg
1884     if msg:
1885       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1886       result = False
1887     elif not rstats.payload:
1888       lu.LogWarning("Can't find disk on node %s", node)
1889       result = False
1890     else:
1891       result = result and (not rstats.payload[idx])
1892   if dev.children:
1893     for child in dev.children:
1894       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1895
1896   return result
1897
1898
1899 class LUDiagnoseOS(NoHooksLU):
1900   """Logical unit for OS diagnose/query.
1901
1902   """
1903   _OP_REQP = ["output_fields", "names"]
1904   REQ_BGL = False
1905   _FIELDS_STATIC = utils.FieldSet()
1906   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1907
1908   def ExpandNames(self):
1909     if self.op.names:
1910       raise errors.OpPrereqError("Selective OS query not supported")
1911
1912     _CheckOutputFields(static=self._FIELDS_STATIC,
1913                        dynamic=self._FIELDS_DYNAMIC,
1914                        selected=self.op.output_fields)
1915
1916     # Lock all nodes, in shared mode
1917     # Temporary removal of locks, should be reverted later
1918     # TODO: reintroduce locks when they are lighter-weight
1919     self.needed_locks = {}
1920     #self.share_locks[locking.LEVEL_NODE] = 1
1921     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1922
1923   def CheckPrereq(self):
1924     """Check prerequisites.
1925
1926     """
1927
1928   @staticmethod
1929   def _DiagnoseByOS(node_list, rlist):
1930     """Remaps a per-node return list into an a per-os per-node dictionary
1931
1932     @param node_list: a list with the names of all nodes
1933     @param rlist: a map with node names as keys and OS objects as values
1934
1935     @rtype: dict
1936     @return: a dictionary with osnames as keys and as value another map, with
1937         nodes as keys and tuples of (path, status, diagnose) as values, eg::
1938
1939           {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1940                                      (/srv/..., False, "invalid api")],
1941                            "node2": [(/srv/..., True, "")]}
1942           }
1943
1944     """
1945     all_os = {}
1946     # we build here the list of nodes that didn't fail the RPC (at RPC
1947     # level), so that nodes with a non-responding node daemon don't
1948     # make all OSes invalid
1949     good_nodes = [node_name for node_name in rlist
1950                   if not rlist[node_name].fail_msg]
1951     for node_name, nr in rlist.items():
1952       if nr.fail_msg or not nr.payload:
1953         continue
1954       for name, path, status, diagnose in nr.payload:
1955         if name not in all_os:
1956           # build a list of nodes for this os containing empty lists
1957           # for each node in node_list
1958           all_os[name] = {}
1959           for nname in good_nodes:
1960             all_os[name][nname] = []
1961         all_os[name][node_name].append((path, status, diagnose))
1962     return all_os
1963
1964   def Exec(self, feedback_fn):
1965     """Compute the list of OSes.
1966
1967     """
1968     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1969     node_data = self.rpc.call_os_diagnose(valid_nodes)
1970     pol = self._DiagnoseByOS(valid_nodes, node_data)
1971     output = []
1972     for os_name, os_data in pol.items():
1973       row = []
1974       for field in self.op.output_fields:
1975         if field == "name":
1976           val = os_name
1977         elif field == "valid":
1978           val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1979         elif field == "node_status":
1980           # this is just a copy of the dict
1981           val = {}
1982           for node_name, nos_list in os_data.items():
1983             val[node_name] = nos_list
1984         else:
1985           raise errors.ParameterError(field)
1986         row.append(val)
1987       output.append(row)
1988
1989     return output
1990
1991
1992 class LURemoveNode(LogicalUnit):
1993   """Logical unit for removing a node.
1994
1995   """
1996   HPATH = "node-remove"
1997   HTYPE = constants.HTYPE_NODE
1998   _OP_REQP = ["node_name"]
1999
2000   def BuildHooksEnv(self):
2001     """Build hooks env.
2002
2003     This doesn't run on the target node in the pre phase as a failed
2004     node would then be impossible to remove.
2005
2006     """
2007     env = {
2008       "OP_TARGET": self.op.node_name,
2009       "NODE_NAME": self.op.node_name,
2010       }
2011     all_nodes = self.cfg.GetNodeList()
2012     all_nodes.remove(self.op.node_name)
2013     return env, all_nodes, all_nodes
2014
2015   def CheckPrereq(self):
2016     """Check prerequisites.
2017
2018     This checks:
2019      - the node exists in the configuration
2020      - it does not have primary or secondary instances
2021      - it's not the master
2022
2023     Any errors are signaled by raising errors.OpPrereqError.
2024
2025     """
2026     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
2027     if node is None:
2028       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
2029
2030     instance_list = self.cfg.GetInstanceList()
2031
2032     masternode = self.cfg.GetMasterNode()
2033     if node.name == masternode:
2034       raise errors.OpPrereqError("Node is the master node,"
2035                                  " you need to failover first.")
2036
2037     for instance_name in instance_list:
2038       instance = self.cfg.GetInstanceInfo(instance_name)
2039       if node.name in instance.all_nodes:
2040         raise errors.OpPrereqError("Instance %s is still running on the node,"
2041                                    " please remove first." % instance_name)
2042     self.op.node_name = node.name
2043     self.node = node
2044
2045   def Exec(self, feedback_fn):
2046     """Removes the node from the cluster.
2047
2048     """
2049     node = self.node
2050     logging.info("Stopping the node daemon and removing configs from node %s",
2051                  node.name)
2052
2053     self.context.RemoveNode(node.name)
2054
2055     result = self.rpc.call_node_leave_cluster(node.name)
2056     msg = result.fail_msg
2057     if msg:
2058       self.LogWarning("Errors encountered on the remote node while leaving"
2059                       " the cluster: %s", msg)
2060
2061     # Promote nodes to master candidate as needed
2062     _AdjustCandidatePool(self)
2063
2064
2065 class LUQueryNodes(NoHooksLU):
2066   """Logical unit for querying nodes.
2067
2068   """
2069   _OP_REQP = ["output_fields", "names", "use_locking"]
2070   REQ_BGL = False
2071   _FIELDS_DYNAMIC = utils.FieldSet(
2072     "dtotal", "dfree",
2073     "mtotal", "mnode", "mfree",
2074     "bootid",
2075     "ctotal", "cnodes", "csockets",
2076     )
2077
2078   _FIELDS_STATIC = utils.FieldSet(
2079     "name", "pinst_cnt", "sinst_cnt",
2080     "pinst_list", "sinst_list",
2081     "pip", "sip", "tags",
2082     "serial_no",
2083     "master_candidate",
2084     "master",
2085     "offline",
2086     "drained",
2087     "role",
2088     )
2089
2090   def ExpandNames(self):
2091     _CheckOutputFields(static=self._FIELDS_STATIC,
2092                        dynamic=self._FIELDS_DYNAMIC,
2093                        selected=self.op.output_fields)
2094
2095     self.needed_locks = {}
2096     self.share_locks[locking.LEVEL_NODE] = 1
2097
2098     if self.op.names:
2099       self.wanted = _GetWantedNodes(self, self.op.names)
2100     else:
2101       self.wanted = locking.ALL_SET
2102
2103     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2104     self.do_locking = self.do_node_query and self.op.use_locking
2105     if self.do_locking:
2106       # if we don't request only static fields, we need to lock the nodes
2107       self.needed_locks[locking.LEVEL_NODE] = self.wanted
2108
2109
2110   def CheckPrereq(self):
2111     """Check prerequisites.
2112
2113     """
2114     # The validation of the node list is done in the _GetWantedNodes,
2115     # if non empty, and if empty, there's no validation to do
2116     pass
2117
2118   def Exec(self, feedback_fn):
2119     """Computes the list of nodes and their attributes.
2120
2121     """
2122     all_info = self.cfg.GetAllNodesInfo()
2123     if self.do_locking:
2124       nodenames = self.acquired_locks[locking.LEVEL_NODE]
2125     elif self.wanted != locking.ALL_SET:
2126       nodenames = self.wanted
2127       missing = set(nodenames).difference(all_info.keys())
2128       if missing:
2129         raise errors.OpExecError(
2130           "Some nodes were removed before retrieving their data: %s" % missing)
2131     else:
2132       nodenames = all_info.keys()
2133
2134     nodenames = utils.NiceSort(nodenames)
2135     nodelist = [all_info[name] for name in nodenames]
2136
2137     # begin data gathering
2138
2139     if self.do_node_query:
2140       live_data = {}
2141       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2142                                           self.cfg.GetHypervisorType())
2143       for name in nodenames:
2144         nodeinfo = node_data[name]
2145         if not nodeinfo.fail_msg and nodeinfo.payload:
2146           nodeinfo = nodeinfo.payload
2147           fn = utils.TryConvert
2148           live_data[name] = {
2149             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2150             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2151             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2152             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2153             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2154             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2155             "bootid": nodeinfo.get('bootid', None),
2156             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2157             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2158             }
2159         else:
2160           live_data[name] = {}
2161     else:
2162       live_data = dict.fromkeys(nodenames, {})
2163
2164     node_to_primary = dict([(name, set()) for name in nodenames])
2165     node_to_secondary = dict([(name, set()) for name in nodenames])
2166
2167     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2168                              "sinst_cnt", "sinst_list"))
2169     if inst_fields & frozenset(self.op.output_fields):
2170       instancelist = self.cfg.GetInstanceList()
2171
2172       for instance_name in instancelist:
2173         inst = self.cfg.GetInstanceInfo(instance_name)
2174         if inst.primary_node in node_to_primary:
2175           node_to_primary[inst.primary_node].add(inst.name)
2176         for secnode in inst.secondary_nodes:
2177           if secnode in node_to_secondary:
2178             node_to_secondary[secnode].add(inst.name)
2179
2180     master_node = self.cfg.GetMasterNode()
2181
2182     # end data gathering
2183
2184     output = []
2185     for node in nodelist:
2186       node_output = []
2187       for field in self.op.output_fields:
2188         if field == "name":
2189           val = node.name
2190         elif field == "pinst_list":
2191           val = list(node_to_primary[node.name])
2192         elif field == "sinst_list":
2193           val = list(node_to_secondary[node.name])
2194         elif field == "pinst_cnt":
2195           val = len(node_to_primary[node.name])
2196         elif field == "sinst_cnt":
2197           val = len(node_to_secondary[node.name])
2198         elif field == "pip":
2199           val = node.primary_ip
2200         elif field == "sip":
2201           val = node.secondary_ip
2202         elif field == "tags":
2203           val = list(node.GetTags())
2204         elif field == "serial_no":
2205           val = node.serial_no
2206         elif field == "master_candidate":
2207           val = node.master_candidate
2208         elif field == "master":
2209           val = node.name == master_node
2210         elif field == "offline":
2211           val = node.offline
2212         elif field == "drained":
2213           val = node.drained
2214         elif self._FIELDS_DYNAMIC.Matches(field):
2215           val = live_data[node.name].get(field, None)
2216         elif field == "role":
2217           if node.name == master_node:
2218             val = "M"
2219           elif node.master_candidate:
2220             val = "C"
2221           elif node.drained:
2222             val = "D"
2223           elif node.offline:
2224             val = "O"
2225           else:
2226             val = "R"
2227         else:
2228           raise errors.ParameterError(field)
2229         node_output.append(val)
2230       output.append(node_output)
2231
2232     return output
2233
2234
2235 class LUQueryNodeVolumes(NoHooksLU):
2236   """Logical unit for getting volumes on node(s).
2237
2238   """
2239   _OP_REQP = ["nodes", "output_fields"]
2240   REQ_BGL = False
2241   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2242   _FIELDS_STATIC = utils.FieldSet("node")
2243
2244   def ExpandNames(self):
2245     _CheckOutputFields(static=self._FIELDS_STATIC,
2246                        dynamic=self._FIELDS_DYNAMIC,
2247                        selected=self.op.output_fields)
2248
2249     self.needed_locks = {}
2250     self.share_locks[locking.LEVEL_NODE] = 1
2251     if not self.op.nodes:
2252       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2253     else:
2254       self.needed_locks[locking.LEVEL_NODE] = \
2255         _GetWantedNodes(self, self.op.nodes)
2256
2257   def CheckPrereq(self):
2258     """Check prerequisites.
2259
2260     This checks that the fields required are valid output fields.
2261
2262     """
2263     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2264
2265   def Exec(self, feedback_fn):
2266     """Computes the list of nodes and their attributes.
2267
2268     """
2269     nodenames = self.nodes
2270     volumes = self.rpc.call_node_volumes(nodenames)
2271
2272     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2273              in self.cfg.GetInstanceList()]
2274
2275     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2276
2277     output = []
2278     for node in nodenames:
2279       nresult = volumes[node]
2280       if nresult.offline:
2281         continue
2282       msg = nresult.fail_msg
2283       if msg:
2284         self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2285         continue
2286
2287       node_vols = nresult.payload[:]
2288       node_vols.sort(key=lambda vol: vol['dev'])
2289
2290       for vol in node_vols:
2291         node_output = []
2292         for field in self.op.output_fields:
2293           if field == "node":
2294             val = node
2295           elif field == "phys":
2296             val = vol['dev']
2297           elif field == "vg":
2298             val = vol['vg']
2299           elif field == "name":
2300             val = vol['name']
2301           elif field == "size":
2302             val = int(float(vol['size']))
2303           elif field == "instance":
2304             for inst in ilist:
2305               if node not in lv_by_node[inst]:
2306                 continue
2307               if vol['name'] in lv_by_node[inst][node]:
2308                 val = inst.name
2309                 break
2310             else:
2311               val = '-'
2312           else:
2313             raise errors.ParameterError(field)
2314           node_output.append(str(val))
2315
2316         output.append(node_output)
2317
2318     return output
2319
2320
2321 class LUQueryNodeStorage(NoHooksLU):
2322   """Logical unit for getting information on storage units on node(s).
2323
2324   """
2325   _OP_REQP = ["nodes", "storage_type", "output_fields"]
2326   REQ_BGL = False
2327   _FIELDS_STATIC = utils.FieldSet("node")
2328
2329   def ExpandNames(self):
2330     storage_type = self.op.storage_type
2331
2332     if storage_type not in constants.VALID_STORAGE_FIELDS:
2333       raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2334
2335     dynamic_fields = constants.VALID_STORAGE_FIELDS[storage_type]
2336
2337     _CheckOutputFields(static=self._FIELDS_STATIC,
2338                        dynamic=utils.FieldSet(*dynamic_fields),
2339                        selected=self.op.output_fields)
2340
2341     self.needed_locks = {}
2342     self.share_locks[locking.LEVEL_NODE] = 1
2343
2344     if self.op.nodes:
2345       self.needed_locks[locking.LEVEL_NODE] = \
2346         _GetWantedNodes(self, self.op.nodes)
2347     else:
2348       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2349
2350   def CheckPrereq(self):
2351     """Check prerequisites.
2352
2353     This checks that the fields required are valid output fields.
2354
2355     """
2356     self.op.name = getattr(self.op, "name", None)
2357
2358     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2359
2360   def Exec(self, feedback_fn):
2361     """Computes the list of nodes and their attributes.
2362
2363     """
2364     # Always get name to sort by
2365     if constants.SF_NAME in self.op.output_fields:
2366       fields = self.op.output_fields[:]
2367     else:
2368       fields = [constants.SF_NAME] + self.op.output_fields
2369
2370     # Never ask for node as it's only known to the LU
2371     while "node" in fields:
2372       fields.remove("node")
2373
2374     field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
2375     name_idx = field_idx[constants.SF_NAME]
2376
2377     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2378     data = self.rpc.call_storage_list(self.nodes,
2379                                       self.op.storage_type, st_args,
2380                                       self.op.name, fields)
2381
2382     result = []
2383
2384     for node in utils.NiceSort(self.nodes):
2385       nresult = data[node]
2386       if nresult.offline:
2387         continue
2388
2389       msg = nresult.fail_msg
2390       if msg:
2391         self.LogWarning("Can't get storage data from node %s: %s", node, msg)
2392         continue
2393
2394       rows = dict([(row[name_idx], row) for row in nresult.payload])
2395
2396       for name in utils.NiceSort(rows.keys()):
2397         row = rows[name]
2398
2399         out = []
2400
2401         for field in self.op.output_fields:
2402           if field == "node":
2403             val = node
2404           elif field in field_idx:
2405             val = row[field_idx[field]]
2406           else:
2407             raise errors.ParameterError(field)
2408
2409           out.append(val)
2410
2411         result.append(out)
2412
2413     return result
2414
2415
2416 class LUModifyNodeStorage(NoHooksLU):
2417   """Logical unit for modifying a storage volume on a node.
2418
2419   """
2420   _OP_REQP = ["node_name", "storage_type", "name", "changes"]
2421   REQ_BGL = False
2422
2423   def CheckArguments(self):
2424     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2425     if node_name is None:
2426       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2427
2428     self.op.node_name = node_name
2429
2430     storage_type = self.op.storage_type
2431     if storage_type not in constants.VALID_STORAGE_FIELDS:
2432       raise errors.OpPrereqError("Unknown storage type: %s" % storage_type)
2433
2434   def ExpandNames(self):
2435     self.needed_locks = {
2436       locking.LEVEL_NODE: self.op.node_name,
2437       }
2438
2439   def CheckPrereq(self):
2440     """Check prerequisites.
2441
2442     """
2443     storage_type = self.op.storage_type
2444
2445     try:
2446       modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
2447     except KeyError:
2448       raise errors.OpPrereqError("Storage units of type '%s' can not be"
2449                                  " modified" % storage_type)
2450
2451     diff = set(self.op.changes.keys()) - modifiable
2452     if diff:
2453       raise errors.OpPrereqError("The following fields can not be modified for"
2454                                  " storage units of type '%s': %r" %
2455                                  (storage_type, list(diff)))
2456
2457   def Exec(self, feedback_fn):
2458     """Computes the list of nodes and their attributes.
2459
2460     """
2461     st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
2462     result = self.rpc.call_storage_modify(self.op.node_name,
2463                                           self.op.storage_type, st_args,
2464                                           self.op.name, self.op.changes)
2465     result.Raise("Failed to modify storage unit '%s' on %s" %
2466                  (self.op.name, self.op.node_name))
2467
2468
2469 class LUAddNode(LogicalUnit):
2470   """Logical unit for adding node to the cluster.
2471
2472   """
2473   HPATH = "node-add"
2474   HTYPE = constants.HTYPE_NODE
2475   _OP_REQP = ["node_name"]
2476
2477   def BuildHooksEnv(self):
2478     """Build hooks env.
2479
2480     This will run on all nodes before, and on all nodes + the new node after.
2481
2482     """
2483     env = {
2484       "OP_TARGET": self.op.node_name,
2485       "NODE_NAME": self.op.node_name,
2486       "NODE_PIP": self.op.primary_ip,
2487       "NODE_SIP": self.op.secondary_ip,
2488       }
2489     nodes_0 = self.cfg.GetNodeList()
2490     nodes_1 = nodes_0 + [self.op.node_name, ]
2491     return env, nodes_0, nodes_1
2492
2493   def CheckPrereq(self):
2494     """Check prerequisites.
2495
2496     This checks:
2497      - the new node is not already in the config
2498      - it is resolvable
2499      - its parameters (single/dual homed) matches the cluster
2500
2501     Any errors are signaled by raising errors.OpPrereqError.
2502
2503     """
2504     node_name = self.op.node_name
2505     cfg = self.cfg
2506
2507     dns_data = utils.HostInfo(node_name)
2508
2509     node = dns_data.name
2510     primary_ip = self.op.primary_ip = dns_data.ip
2511     secondary_ip = getattr(self.op, "secondary_ip", None)
2512     if secondary_ip is None:
2513       secondary_ip = primary_ip
2514     if not utils.IsValidIP(secondary_ip):
2515       raise errors.OpPrereqError("Invalid secondary IP given")
2516     self.op.secondary_ip = secondary_ip
2517
2518     node_list = cfg.GetNodeList()
2519     if not self.op.readd and node in node_list:
2520       raise errors.OpPrereqError("Node %s is already in the configuration" %
2521                                  node)
2522     elif self.op.readd and node not in node_list:
2523       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2524
2525     for existing_node_name in node_list:
2526       existing_node = cfg.GetNodeInfo(existing_node_name)
2527
2528       if self.op.readd and node == existing_node_name:
2529         if (existing_node.primary_ip != primary_ip or
2530             existing_node.secondary_ip != secondary_ip):
2531           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2532                                      " address configuration as before")
2533         continue
2534
2535       if (existing_node.primary_ip == primary_ip or
2536           existing_node.secondary_ip == primary_ip or
2537           existing_node.primary_ip == secondary_ip or
2538           existing_node.secondary_ip == secondary_ip):
2539         raise errors.OpPrereqError("New node ip address(es) conflict with"
2540                                    " existing node %s" % existing_node.name)
2541
2542     # check that the type of the node (single versus dual homed) is the
2543     # same as for the master
2544     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2545     master_singlehomed = myself.secondary_ip == myself.primary_ip
2546     newbie_singlehomed = secondary_ip == primary_ip
2547     if master_singlehomed != newbie_singlehomed:
2548       if master_singlehomed:
2549         raise errors.OpPrereqError("The master has no private ip but the"
2550                                    " new node has one")
2551       else:
2552         raise errors.OpPrereqError("The master has a private ip but the"
2553                                    " new node doesn't have one")
2554
2555     # checks reachability
2556     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2557       raise errors.OpPrereqError("Node not reachable by ping")
2558
2559     if not newbie_singlehomed:
2560       # check reachability from my secondary ip to newbie's secondary ip
2561       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2562                            source=myself.secondary_ip):
2563         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2564                                    " based ping to noded port")
2565
2566     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2567     if self.op.readd:
2568       exceptions = [node]
2569     else:
2570       exceptions = []
2571     mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2572     # the new node will increase mc_max with one, so:
2573     mc_max = min(mc_max + 1, cp_size)
2574     self.master_candidate = mc_now < mc_max
2575
2576     if self.op.readd:
2577       self.new_node = self.cfg.GetNodeInfo(node)
2578       assert self.new_node is not None, "Can't retrieve locked node %s" % node
2579     else:
2580       self.new_node = objects.Node(name=node,
2581                                    primary_ip=primary_ip,
2582                                    secondary_ip=secondary_ip,
2583                                    master_candidate=self.master_candidate,
2584                                    offline=False, drained=False)
2585
2586   def Exec(self, feedback_fn):
2587     """Adds the new node to the cluster.
2588
2589     """
2590     new_node = self.new_node
2591     node = new_node.name
2592
2593     # for re-adds, reset the offline/drained/master-candidate flags;
2594     # we need to reset here, otherwise offline would prevent RPC calls
2595     # later in the procedure; this also means that if the re-add
2596     # fails, we are left with a non-offlined, broken node
2597     if self.op.readd:
2598       new_node.drained = new_node.offline = False
2599       self.LogInfo("Readding a node, the offline/drained flags were reset")
2600       # if we demote the node, we do cleanup later in the procedure
2601       new_node.master_candidate = self.master_candidate
2602
2603     # notify the user about any possible mc promotion
2604     if new_node.master_candidate:
2605       self.LogInfo("Node will be a master candidate")
2606
2607     # check connectivity
2608     result = self.rpc.call_version([node])[node]
2609     result.Raise("Can't get version information from node %s" % node)
2610     if constants.PROTOCOL_VERSION == result.payload:
2611       logging.info("Communication to node %s fine, sw version %s match",
2612                    node, result.payload)
2613     else:
2614       raise errors.OpExecError("Version mismatch master version %s,"
2615                                " node version %s" %
2616                                (constants.PROTOCOL_VERSION, result.payload))
2617
2618     # setup ssh on node
2619     logging.info("Copy ssh key to node %s", node)
2620     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2621     keyarray = []
2622     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2623                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2624                 priv_key, pub_key]
2625
2626     for i in keyfiles:
2627       f = open(i, 'r')
2628       try:
2629         keyarray.append(f.read())
2630       finally:
2631         f.close()
2632
2633     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2634                                     keyarray[2],
2635                                     keyarray[3], keyarray[4], keyarray[5])
2636     result.Raise("Cannot transfer ssh keys to the new node")
2637
2638     # Add node to our /etc/hosts, and add key to known_hosts
2639     if self.cfg.GetClusterInfo().modify_etc_hosts:
2640       utils.AddHostToEtcHosts(new_node.name)
2641
2642     if new_node.secondary_ip != new_node.primary_ip:
2643       result = self.rpc.call_node_has_ip_address(new_node.name,
2644                                                  new_node.secondary_ip)
2645       result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2646                    prereq=True)
2647       if not result.payload:
2648         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2649                                  " you gave (%s). Please fix and re-run this"
2650                                  " command." % new_node.secondary_ip)
2651
2652     node_verify_list = [self.cfg.GetMasterNode()]
2653     node_verify_param = {
2654       'nodelist': [node],
2655       # TODO: do a node-net-test as well?
2656     }
2657
2658     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2659                                        self.cfg.GetClusterName())
2660     for verifier in node_verify_list:
2661       result[verifier].Raise("Cannot communicate with node %s" % verifier)
2662       nl_payload = result[verifier].payload['nodelist']
2663       if nl_payload:
2664         for failed in nl_payload:
2665           feedback_fn("ssh/hostname verification failed %s -> %s" %
2666                       (verifier, nl_payload[failed]))
2667         raise errors.OpExecError("ssh/hostname verification failed.")
2668
2669     if self.op.readd:
2670       _RedistributeAncillaryFiles(self)
2671       self.context.ReaddNode(new_node)
2672       # make sure we redistribute the config
2673       self.cfg.Update(new_node)
2674       # and make sure the new node will not have old files around
2675       if not new_node.master_candidate:
2676         result = self.rpc.call_node_demote_from_mc(new_node.name)
2677         msg = result.RemoteFailMsg()
2678         if msg:
2679           self.LogWarning("Node failed to demote itself from master"
2680                           " candidate status: %s" % msg)
2681     else:
2682       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2683       self.context.AddNode(new_node)
2684
2685
2686 class LUSetNodeParams(LogicalUnit):
2687   """Modifies the parameters of a node.
2688
2689   """
2690   HPATH = "node-modify"
2691   HTYPE = constants.HTYPE_NODE
2692   _OP_REQP = ["node_name"]
2693   REQ_BGL = False
2694
2695   def CheckArguments(self):
2696     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2697     if node_name is None:
2698       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2699     self.op.node_name = node_name
2700     _CheckBooleanOpField(self.op, 'master_candidate')
2701     _CheckBooleanOpField(self.op, 'offline')
2702     _CheckBooleanOpField(self.op, 'drained')
2703     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2704     if all_mods.count(None) == 3:
2705       raise errors.OpPrereqError("Please pass at least one modification")
2706     if all_mods.count(True) > 1:
2707       raise errors.OpPrereqError("Can't set the node into more than one"
2708                                  " state at the same time")
2709
2710   def ExpandNames(self):
2711     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2712
2713   def BuildHooksEnv(self):
2714     """Build hooks env.
2715
2716     This runs on the master node.
2717
2718     """
2719     env = {
2720       "OP_TARGET": self.op.node_name,
2721       "MASTER_CANDIDATE": str(self.op.master_candidate),
2722       "OFFLINE": str(self.op.offline),
2723       "DRAINED": str(self.op.drained),
2724       }
2725     nl = [self.cfg.GetMasterNode(),
2726           self.op.node_name]
2727     return env, nl, nl
2728
2729   def CheckPrereq(self):
2730     """Check prerequisites.
2731
2732     This only checks the instance list against the existing names.
2733
2734     """
2735     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2736
2737     if ((self.op.master_candidate == False or self.op.offline == True or
2738          self.op.drained == True) and node.master_candidate):
2739       # we will demote the node from master_candidate
2740       if self.op.node_name == self.cfg.GetMasterNode():
2741         raise errors.OpPrereqError("The master node has to be a"
2742                                    " master candidate, online and not drained")
2743       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2744       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2745       if num_candidates <= cp_size:
2746         msg = ("Not enough master candidates (desired"
2747                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2748         if self.op.force:
2749           self.LogWarning(msg)
2750         else:
2751           raise errors.OpPrereqError(msg)
2752
2753     if (self.op.master_candidate == True and
2754         ((node.offline and not self.op.offline == False) or
2755          (node.drained and not self.op.drained == False))):
2756       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2757                                  " to master_candidate" % node.name)
2758
2759     return
2760
2761   def Exec(self, feedback_fn):
2762     """Modifies a node.
2763
2764     """
2765     node = self.node
2766
2767     result = []
2768     changed_mc = False
2769
2770     if self.op.offline is not None:
2771       node.offline = self.op.offline
2772       result.append(("offline", str(self.op.offline)))
2773       if self.op.offline == True:
2774         if node.master_candidate:
2775           node.master_candidate = False
2776           changed_mc = True
2777           result.append(("master_candidate", "auto-demotion due to offline"))
2778         if node.drained:
2779           node.drained = False
2780           result.append(("drained", "clear drained status due to offline"))
2781
2782     if self.op.master_candidate is not None:
2783       node.master_candidate = self.op.master_candidate
2784       changed_mc = True
2785       result.append(("master_candidate", str(self.op.master_candidate)))
2786       if self.op.master_candidate == False:
2787         rrc = self.rpc.call_node_demote_from_mc(node.name)
2788         msg = rrc.fail_msg
2789         if msg:
2790           self.LogWarning("Node failed to demote itself: %s" % msg)
2791
2792     if self.op.drained is not None:
2793       node.drained = self.op.drained
2794       result.append(("drained", str(self.op.drained)))
2795       if self.op.drained == True:
2796         if node.master_candidate:
2797           node.master_candidate = False
2798           changed_mc = True
2799           result.append(("master_candidate", "auto-demotion due to drain"))
2800           rrc = self.rpc.call_node_demote_from_mc(node.name)
2801           msg = rrc.RemoteFailMsg()
2802           if msg:
2803             self.LogWarning("Node failed to demote itself: %s" % msg)
2804         if node.offline:
2805           node.offline = False
2806           result.append(("offline", "clear offline status due to drain"))
2807
2808     # this will trigger configuration file update, if needed
2809     self.cfg.Update(node)
2810     # this will trigger job queue propagation or cleanup
2811     if changed_mc:
2812       self.context.ReaddNode(node)
2813
2814     return result
2815
2816
2817 class LUPowercycleNode(NoHooksLU):
2818   """Powercycles a node.
2819
2820   """
2821   _OP_REQP = ["node_name", "force"]
2822   REQ_BGL = False
2823
2824   def CheckArguments(self):
2825     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2826     if node_name is None:
2827       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2828     self.op.node_name = node_name
2829     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2830       raise errors.OpPrereqError("The node is the master and the force"
2831                                  " parameter was not set")
2832
2833   def ExpandNames(self):
2834     """Locking for PowercycleNode.
2835
2836     This is a last-resort option and shouldn't block on other
2837     jobs. Therefore, we grab no locks.
2838
2839     """
2840     self.needed_locks = {}
2841
2842   def CheckPrereq(self):
2843     """Check prerequisites.
2844
2845     This LU has no prereqs.
2846
2847     """
2848     pass
2849
2850   def Exec(self, feedback_fn):
2851     """Reboots a node.
2852
2853     """
2854     result = self.rpc.call_node_powercycle(self.op.node_name,
2855                                            self.cfg.GetHypervisorType())
2856     result.Raise("Failed to schedule the reboot")
2857     return result.payload
2858
2859
2860 class LUQueryClusterInfo(NoHooksLU):
2861   """Query cluster configuration.
2862
2863   """
2864   _OP_REQP = []
2865   REQ_BGL = False
2866
2867   def ExpandNames(self):
2868     self.needed_locks = {}
2869
2870   def CheckPrereq(self):
2871     """No prerequsites needed for this LU.
2872
2873     """
2874     pass
2875
2876   def Exec(self, feedback_fn):
2877     """Return cluster config.
2878
2879     """
2880     cluster = self.cfg.GetClusterInfo()
2881     result = {
2882       "software_version": constants.RELEASE_VERSION,
2883       "protocol_version": constants.PROTOCOL_VERSION,
2884       "config_version": constants.CONFIG_VERSION,
2885       "os_api_version": max(constants.OS_API_VERSIONS),
2886       "export_version": constants.EXPORT_VERSION,
2887       "architecture": (platform.architecture()[0], platform.machine()),
2888       "name": cluster.cluster_name,
2889       "master": cluster.master_node,
2890       "default_hypervisor": cluster.enabled_hypervisors[0],
2891       "enabled_hypervisors": cluster.enabled_hypervisors,
2892       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2893                         for hypervisor_name in cluster.enabled_hypervisors]),
2894       "beparams": cluster.beparams,
2895       "nicparams": cluster.nicparams,
2896       "candidate_pool_size": cluster.candidate_pool_size,
2897       "master_netdev": cluster.master_netdev,
2898       "volume_group_name": cluster.volume_group_name,
2899       "file_storage_dir": cluster.file_storage_dir,
2900       }
2901
2902     return result
2903
2904
2905 class LUQueryConfigValues(NoHooksLU):
2906   """Return configuration values.
2907
2908   """
2909   _OP_REQP = []
2910   REQ_BGL = False
2911   _FIELDS_DYNAMIC = utils.FieldSet()
2912   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2913
2914   def ExpandNames(self):
2915     self.needed_locks = {}
2916
2917     _CheckOutputFields(static=self._FIELDS_STATIC,
2918                        dynamic=self._FIELDS_DYNAMIC,
2919                        selected=self.op.output_fields)
2920
2921   def CheckPrereq(self):
2922     """No prerequisites.
2923
2924     """
2925     pass
2926
2927   def Exec(self, feedback_fn):
2928     """Dump a representation of the cluster config to the standard output.
2929
2930     """
2931     values = []
2932     for field in self.op.output_fields:
2933       if field == "cluster_name":
2934         entry = self.cfg.GetClusterName()
2935       elif field == "master_node":
2936         entry = self.cfg.GetMasterNode()
2937       elif field == "drain_flag":
2938         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2939       else:
2940         raise errors.ParameterError(field)
2941       values.append(entry)
2942     return values
2943
2944
2945 class LUActivateInstanceDisks(NoHooksLU):
2946   """Bring up an instance's disks.
2947
2948   """
2949   _OP_REQP = ["instance_name"]
2950   REQ_BGL = False
2951
2952   def ExpandNames(self):
2953     self._ExpandAndLockInstance()
2954     self.needed_locks[locking.LEVEL_NODE] = []
2955     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2956
2957   def DeclareLocks(self, level):
2958     if level == locking.LEVEL_NODE:
2959       self._LockInstancesNodes()
2960
2961   def CheckPrereq(self):
2962     """Check prerequisites.
2963
2964     This checks that the instance is in the cluster.
2965
2966     """
2967     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2968     assert self.instance is not None, \
2969       "Cannot retrieve locked instance %s" % self.op.instance_name
2970     _CheckNodeOnline(self, self.instance.primary_node)
2971
2972   def Exec(self, feedback_fn):
2973     """Activate the disks.
2974
2975     """
2976     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2977     if not disks_ok:
2978       raise errors.OpExecError("Cannot activate block devices")
2979
2980     return disks_info
2981
2982
2983 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2984   """Prepare the block devices for an instance.
2985
2986   This sets up the block devices on all nodes.
2987
2988   @type lu: L{LogicalUnit}
2989   @param lu: the logical unit on whose behalf we execute
2990   @type instance: L{objects.Instance}
2991   @param instance: the instance for whose disks we assemble
2992   @type ignore_secondaries: boolean
2993   @param ignore_secondaries: if true, errors on secondary nodes
2994       won't result in an error return from the function
2995   @return: False if the operation failed, otherwise a list of
2996       (host, instance_visible_name, node_visible_name)
2997       with the mapping from node devices to instance devices
2998
2999   """
3000   device_info = []
3001   disks_ok = True
3002   iname = instance.name
3003   # With the two passes mechanism we try to reduce the window of
3004   # opportunity for the race condition of switching DRBD to primary
3005   # before handshaking occured, but we do not eliminate it
3006
3007   # The proper fix would be to wait (with some limits) until the
3008   # connection has been made and drbd transitions from WFConnection
3009   # into any other network-connected state (Connected, SyncTarget,
3010   # SyncSource, etc.)
3011
3012   # 1st pass, assemble on all nodes in secondary mode
3013   for inst_disk in instance.disks:
3014     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3015       lu.cfg.SetDiskID(node_disk, node)
3016       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
3017       msg = result.fail_msg
3018       if msg:
3019         lu.proc.LogWarning("Could not prepare block device %s on node %s"
3020                            " (is_primary=False, pass=1): %s",
3021                            inst_disk.iv_name, node, msg)
3022         if not ignore_secondaries:
3023           disks_ok = False
3024
3025   # FIXME: race condition on drbd migration to primary
3026
3027   # 2nd pass, do only the primary node
3028   for inst_disk in instance.disks:
3029     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
3030       if node != instance.primary_node:
3031         continue
3032       lu.cfg.SetDiskID(node_disk, node)
3033       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
3034       msg = result.fail_msg
3035       if msg:
3036         lu.proc.LogWarning("Could not prepare block device %s on node %s"
3037                            " (is_primary=True, pass=2): %s",
3038                            inst_disk.iv_name, node, msg)
3039         disks_ok = False
3040     device_info.append((instance.primary_node, inst_disk.iv_name,
3041                         result.payload))
3042
3043   # leave the disks configured for the primary node
3044   # this is a workaround that would be fixed better by
3045   # improving the logical/physical id handling
3046   for disk in instance.disks:
3047     lu.cfg.SetDiskID(disk, instance.primary_node)
3048
3049   return disks_ok, device_info
3050
3051
3052 def _StartInstanceDisks(lu, instance, force):
3053   """Start the disks of an instance.
3054
3055   """
3056   disks_ok, _ = _AssembleInstanceDisks(lu, instance,
3057                                            ignore_secondaries=force)
3058   if not disks_ok:
3059     _ShutdownInstanceDisks(lu, instance)
3060     if force is not None and not force:
3061       lu.proc.LogWarning("", hint="If the message above refers to a"
3062                          " secondary node,"
3063                          " you can retry the operation using '--force'.")
3064     raise errors.OpExecError("Disk consistency error")
3065
3066
3067 class LUDeactivateInstanceDisks(NoHooksLU):
3068   """Shutdown an instance's disks.
3069
3070   """
3071   _OP_REQP = ["instance_name"]
3072   REQ_BGL = False
3073
3074   def ExpandNames(self):
3075     self._ExpandAndLockInstance()
3076     self.needed_locks[locking.LEVEL_NODE] = []
3077     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3078
3079   def DeclareLocks(self, level):
3080     if level == locking.LEVEL_NODE:
3081       self._LockInstancesNodes()
3082
3083   def CheckPrereq(self):
3084     """Check prerequisites.
3085
3086     This checks that the instance is in the cluster.
3087
3088     """
3089     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3090     assert self.instance is not None, \
3091       "Cannot retrieve locked instance %s" % self.op.instance_name
3092
3093   def Exec(self, feedback_fn):
3094     """Deactivate the disks
3095
3096     """
3097     instance = self.instance
3098     _SafeShutdownInstanceDisks(self, instance)
3099
3100
3101 def _SafeShutdownInstanceDisks(lu, instance):
3102   """Shutdown block devices of an instance.
3103
3104   This function checks if an instance is running, before calling
3105   _ShutdownInstanceDisks.
3106
3107   """
3108   pnode = instance.primary_node
3109   ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
3110   ins_l.Raise("Can't contact node %s" % pnode)
3111
3112   if instance.name in ins_l.payload:
3113     raise errors.OpExecError("Instance is running, can't shutdown"
3114                              " block devices.")
3115
3116   _ShutdownInstanceDisks(lu, instance)
3117
3118
3119 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
3120   """Shutdown block devices of an instance.
3121
3122   This does the shutdown on all nodes of the instance.
3123
3124   If the ignore_primary is false, errors on the primary node are
3125   ignored.
3126
3127   """
3128   all_result = True
3129   for disk in instance.disks:
3130     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
3131       lu.cfg.SetDiskID(top_disk, node)
3132       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
3133       msg = result.fail_msg
3134       if msg:
3135         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
3136                       disk.iv_name, node, msg)
3137         if not ignore_primary or node != instance.primary_node:
3138           all_result = False
3139   return all_result
3140
3141
3142 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
3143   """Checks if a node has enough free memory.
3144
3145   This function check if a given node has the needed amount of free
3146   memory. In case the node has less memory or we cannot get the
3147   information from the node, this function raise an OpPrereqError
3148   exception.
3149
3150   @type lu: C{LogicalUnit}
3151   @param lu: a logical unit from which we get configuration data
3152   @type node: C{str}
3153   @param node: the node to check
3154   @type reason: C{str}
3155   @param reason: string to use in the error message
3156   @type requested: C{int}
3157   @param requested: the amount of memory in MiB to check for
3158   @type hypervisor_name: C{str}
3159   @param hypervisor_name: the hypervisor to ask for memory stats
3160   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
3161       we cannot check the node
3162
3163   """
3164   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
3165   nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
3166   free_mem = nodeinfo[node].payload.get('memory_free', None)
3167   if not isinstance(free_mem, int):
3168     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
3169                                " was '%s'" % (node, free_mem))
3170   if requested > free_mem:
3171     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
3172                                " needed %s MiB, available %s MiB" %
3173                                (node, reason, requested, free_mem))
3174
3175
3176 class LUStartupInstance(LogicalUnit):
3177   """Starts an instance.
3178
3179   """
3180   HPATH = "instance-start"
3181   HTYPE = constants.HTYPE_INSTANCE
3182   _OP_REQP = ["instance_name", "force"]
3183   REQ_BGL = False
3184
3185   def ExpandNames(self):
3186     self._ExpandAndLockInstance()
3187
3188   def BuildHooksEnv(self):
3189     """Build hooks env.
3190
3191     This runs on master, primary and secondary nodes of the instance.
3192
3193     """
3194     env = {
3195       "FORCE": self.op.force,
3196       }
3197     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3198     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3199     return env, nl, nl
3200
3201   def CheckPrereq(self):
3202     """Check prerequisites.
3203
3204     This checks that the instance is in the cluster.
3205
3206     """
3207     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3208     assert self.instance is not None, \
3209       "Cannot retrieve locked instance %s" % self.op.instance_name
3210
3211     # extra beparams
3212     self.beparams = getattr(self.op, "beparams", {})
3213     if self.beparams:
3214       if not isinstance(self.beparams, dict):
3215         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
3216                                    " dict" % (type(self.beparams), ))
3217       # fill the beparams dict
3218       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
3219       self.op.beparams = self.beparams
3220
3221     # extra hvparams
3222     self.hvparams = getattr(self.op, "hvparams", {})
3223     if self.hvparams:
3224       if not isinstance(self.hvparams, dict):
3225         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3226                                    " dict" % (type(self.hvparams), ))
3227
3228       # check hypervisor parameter syntax (locally)
3229       cluster = self.cfg.GetClusterInfo()
3230       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3231       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
3232                                     instance.hvparams)
3233       filled_hvp.update(self.hvparams)
3234       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3235       hv_type.CheckParameterSyntax(filled_hvp)
3236       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3237       self.op.hvparams = self.hvparams
3238
3239     _CheckNodeOnline(self, instance.primary_node)
3240
3241     bep = self.cfg.GetClusterInfo().FillBE(instance)
3242     # check bridges existence
3243     _CheckInstanceBridgesExist(self, instance)
3244
3245     remote_info = self.rpc.call_instance_info(instance.primary_node,
3246                                               instance.name,
3247                                               instance.hypervisor)
3248     remote_info.Raise("Error checking node %s" % instance.primary_node,
3249                       prereq=True)
3250     if not remote_info.payload: # not running already
3251       _CheckNodeFreeMemory(self, instance.primary_node,
3252                            "starting instance %s" % instance.name,
3253                            bep[constants.BE_MEMORY], instance.hypervisor)
3254
3255   def Exec(self, feedback_fn):
3256     """Start the instance.
3257
3258     """
3259     instance = self.instance
3260     force = self.op.force
3261
3262     self.cfg.MarkInstanceUp(instance.name)
3263
3264     node_current = instance.primary_node
3265
3266     _StartInstanceDisks(self, instance, force)
3267
3268     result = self.rpc.call_instance_start(node_current, instance,
3269                                           self.hvparams, self.beparams)
3270     msg = result.fail_msg
3271     if msg:
3272       _ShutdownInstanceDisks(self, instance)
3273       raise errors.OpExecError("Could not start instance: %s" % msg)
3274
3275
3276 class LURebootInstance(LogicalUnit):
3277   """Reboot an instance.
3278
3279   """
3280   HPATH = "instance-reboot"
3281   HTYPE = constants.HTYPE_INSTANCE
3282   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3283   REQ_BGL = False
3284
3285   def ExpandNames(self):
3286     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3287                                    constants.INSTANCE_REBOOT_HARD,
3288                                    constants.INSTANCE_REBOOT_FULL]:
3289       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3290                                   (constants.INSTANCE_REBOOT_SOFT,
3291                                    constants.INSTANCE_REBOOT_HARD,
3292                                    constants.INSTANCE_REBOOT_FULL))
3293     self._ExpandAndLockInstance()
3294
3295   def BuildHooksEnv(self):
3296     """Build hooks env.
3297
3298     This runs on master, primary and secondary nodes of the instance.
3299
3300     """
3301     env = {
3302       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3303       "REBOOT_TYPE": self.op.reboot_type,
3304       }
3305     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3306     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3307     return env, nl, nl
3308
3309   def CheckPrereq(self):
3310     """Check prerequisites.
3311
3312     This checks that the instance is in the cluster.
3313
3314     """
3315     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3316     assert self.instance is not None, \
3317       "Cannot retrieve locked instance %s" % self.op.instance_name
3318
3319     _CheckNodeOnline(self, instance.primary_node)
3320
3321     # check bridges existence
3322     _CheckInstanceBridgesExist(self, instance)
3323
3324   def Exec(self, feedback_fn):
3325     """Reboot the instance.
3326
3327     """
3328     instance = self.instance
3329     ignore_secondaries = self.op.ignore_secondaries
3330     reboot_type = self.op.reboot_type
3331
3332     node_current = instance.primary_node
3333
3334     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3335                        constants.INSTANCE_REBOOT_HARD]:
3336       for disk in instance.disks:
3337         self.cfg.SetDiskID(disk, node_current)
3338       result = self.rpc.call_instance_reboot(node_current, instance,
3339                                              reboot_type)
3340       result.Raise("Could not reboot instance")
3341     else:
3342       result = self.rpc.call_instance_shutdown(node_current, instance)
3343       result.Raise("Could not shutdown instance for full reboot")
3344       _ShutdownInstanceDisks(self, instance)
3345       _StartInstanceDisks(self, instance, ignore_secondaries)
3346       result = self.rpc.call_instance_start(node_current, instance, None, None)
3347       msg = result.fail_msg
3348       if msg:
3349         _ShutdownInstanceDisks(self, instance)
3350         raise errors.OpExecError("Could not start instance for"
3351                                  " full reboot: %s" % msg)
3352
3353     self.cfg.MarkInstanceUp(instance.name)
3354
3355
3356 class LUShutdownInstance(LogicalUnit):
3357   """Shutdown an instance.
3358
3359   """
3360   HPATH = "instance-stop"
3361   HTYPE = constants.HTYPE_INSTANCE
3362   _OP_REQP = ["instance_name"]
3363   REQ_BGL = False
3364
3365   def ExpandNames(self):
3366     self._ExpandAndLockInstance()
3367
3368   def BuildHooksEnv(self):
3369     """Build hooks env.
3370
3371     This runs on master, primary and secondary nodes of the instance.
3372
3373     """
3374     env = _BuildInstanceHookEnvByObject(self, self.instance)
3375     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3376     return env, nl, nl
3377
3378   def CheckPrereq(self):
3379     """Check prerequisites.
3380
3381     This checks that the instance is in the cluster.
3382
3383     """
3384     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3385     assert self.instance is not None, \
3386       "Cannot retrieve locked instance %s" % self.op.instance_name
3387     _CheckNodeOnline(self, self.instance.primary_node)
3388
3389   def Exec(self, feedback_fn):
3390     """Shutdown the instance.
3391
3392     """
3393     instance = self.instance
3394     node_current = instance.primary_node
3395     self.cfg.MarkInstanceDown(instance.name)
3396     result = self.rpc.call_instance_shutdown(node_current, instance)
3397     msg = result.fail_msg
3398     if msg:
3399       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3400
3401     _ShutdownInstanceDisks(self, instance)
3402
3403
3404 class LUReinstallInstance(LogicalUnit):
3405   """Reinstall an instance.
3406
3407   """
3408   HPATH = "instance-reinstall"
3409   HTYPE = constants.HTYPE_INSTANCE
3410   _OP_REQP = ["instance_name"]
3411   REQ_BGL = False
3412
3413   def ExpandNames(self):
3414     self._ExpandAndLockInstance()
3415
3416   def BuildHooksEnv(self):
3417     """Build hooks env.
3418
3419     This runs on master, primary and secondary nodes of the instance.
3420
3421     """
3422     env = _BuildInstanceHookEnvByObject(self, self.instance)
3423     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3424     return env, nl, nl
3425
3426   def CheckPrereq(self):
3427     """Check prerequisites.
3428
3429     This checks that the instance is in the cluster and is not running.
3430
3431     """
3432     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3433     assert instance is not None, \
3434       "Cannot retrieve locked instance %s" % self.op.instance_name
3435     _CheckNodeOnline(self, instance.primary_node)
3436
3437     if instance.disk_template == constants.DT_DISKLESS:
3438       raise errors.OpPrereqError("Instance '%s' has no disks" %
3439                                  self.op.instance_name)
3440     if instance.admin_up:
3441       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3442                                  self.op.instance_name)
3443     remote_info = self.rpc.call_instance_info(instance.primary_node,
3444                                               instance.name,
3445                                               instance.hypervisor)
3446     remote_info.Raise("Error checking node %s" % instance.primary_node,
3447                       prereq=True)
3448     if remote_info.payload:
3449       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3450                                  (self.op.instance_name,
3451                                   instance.primary_node))
3452
3453     self.op.os_type = getattr(self.op, "os_type", None)
3454     if self.op.os_type is not None:
3455       # OS verification
3456       pnode = self.cfg.GetNodeInfo(
3457         self.cfg.ExpandNodeName(instance.primary_node))
3458       if pnode is None:
3459         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3460                                    self.op.pnode)
3461       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3462       result.Raise("OS '%s' not in supported OS list for primary node %s" %
3463                    (self.op.os_type, pnode.name), prereq=True)
3464
3465     self.instance = instance
3466
3467   def Exec(self, feedback_fn):
3468     """Reinstall the instance.
3469
3470     """
3471     inst = self.instance
3472
3473     if self.op.os_type is not None:
3474       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3475       inst.os = self.op.os_type
3476       self.cfg.Update(inst)
3477
3478     _StartInstanceDisks(self, inst, None)
3479     try:
3480       feedback_fn("Running the instance OS create scripts...")
3481       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3482       result.Raise("Could not install OS for instance %s on node %s" %
3483                    (inst.name, inst.primary_node))
3484     finally:
3485       _ShutdownInstanceDisks(self, inst)
3486
3487
3488 class LURenameInstance(LogicalUnit):
3489   """Rename an instance.
3490
3491   """
3492   HPATH = "instance-rename"
3493   HTYPE = constants.HTYPE_INSTANCE
3494   _OP_REQP = ["instance_name", "new_name"]
3495
3496   def BuildHooksEnv(self):
3497     """Build hooks env.
3498
3499     This runs on master, primary and secondary nodes of the instance.
3500
3501     """
3502     env = _BuildInstanceHookEnvByObject(self, self.instance)
3503     env["INSTANCE_NEW_NAME"] = self.op.new_name
3504     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3505     return env, nl, nl
3506
3507   def CheckPrereq(self):
3508     """Check prerequisites.
3509
3510     This checks that the instance is in the cluster and is not running.
3511
3512     """
3513     instance = self.cfg.GetInstanceInfo(
3514       self.cfg.ExpandInstanceName(self.op.instance_name))
3515     if instance is None:
3516       raise errors.OpPrereqError("Instance '%s' not known" %
3517                                  self.op.instance_name)
3518     _CheckNodeOnline(self, instance.primary_node)
3519
3520     if instance.admin_up:
3521       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3522                                  self.op.instance_name)
3523     remote_info = self.rpc.call_instance_info(instance.primary_node,
3524                                               instance.name,
3525                                               instance.hypervisor)
3526     remote_info.Raise("Error checking node %s" % instance.primary_node,
3527                       prereq=True)
3528     if remote_info.payload:
3529       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3530                                  (self.op.instance_name,
3531                                   instance.primary_node))
3532     self.instance = instance
3533
3534     # new name verification
3535     name_info = utils.HostInfo(self.op.new_name)
3536
3537     self.op.new_name = new_name = name_info.name
3538     instance_list = self.cfg.GetInstanceList()
3539     if new_name in instance_list:
3540       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3541                                  new_name)
3542
3543     if not getattr(self.op, "ignore_ip", False):
3544       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3545         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3546                                    (name_info.ip, new_name))
3547
3548
3549   def Exec(self, feedback_fn):
3550     """Reinstall the instance.
3551
3552     """
3553     inst = self.instance
3554     old_name = inst.name
3555
3556     if inst.disk_template == constants.DT_FILE:
3557       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3558
3559     self.cfg.RenameInstance(inst.name, self.op.new_name)
3560     # Change the instance lock. This is definitely safe while we hold the BGL
3561     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3562     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3563
3564     # re-read the instance from the configuration after rename
3565     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3566
3567     if inst.disk_template == constants.DT_FILE:
3568       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3569       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3570                                                      old_file_storage_dir,
3571                                                      new_file_storage_dir)
3572       result.Raise("Could not rename on node %s directory '%s' to '%s'"
3573                    " (but the instance has been renamed in Ganeti)" %
3574                    (inst.primary_node, old_file_storage_dir,
3575                     new_file_storage_dir))
3576
3577     _StartInstanceDisks(self, inst, None)
3578     try:
3579       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3580                                                  old_name)
3581       msg = result.fail_msg
3582       if msg:
3583         msg = ("Could not run OS rename script for instance %s on node %s"
3584                " (but the instance has been renamed in Ganeti): %s" %
3585                (inst.name, inst.primary_node, msg))
3586         self.proc.LogWarning(msg)
3587     finally:
3588       _ShutdownInstanceDisks(self, inst)
3589
3590
3591 class LURemoveInstance(LogicalUnit):
3592   """Remove an instance.
3593
3594   """
3595   HPATH = "instance-remove"
3596   HTYPE = constants.HTYPE_INSTANCE
3597   _OP_REQP = ["instance_name", "ignore_failures"]
3598   REQ_BGL = False
3599
3600   def ExpandNames(self):
3601     self._ExpandAndLockInstance()
3602     self.needed_locks[locking.LEVEL_NODE] = []
3603     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3604
3605   def DeclareLocks(self, level):
3606     if level == locking.LEVEL_NODE:
3607       self._LockInstancesNodes()
3608
3609   def BuildHooksEnv(self):
3610     """Build hooks env.
3611
3612     This runs on master, primary and secondary nodes of the instance.
3613
3614     """
3615     env = _BuildInstanceHookEnvByObject(self, self.instance)
3616     nl = [self.cfg.GetMasterNode()]
3617     return env, nl, nl
3618
3619   def CheckPrereq(self):
3620     """Check prerequisites.
3621
3622     This checks that the instance is in the cluster.
3623
3624     """
3625     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3626     assert self.instance is not None, \
3627       "Cannot retrieve locked instance %s" % self.op.instance_name
3628
3629   def Exec(self, feedback_fn):
3630     """Remove the instance.
3631
3632     """
3633     instance = self.instance
3634     logging.info("Shutting down instance %s on node %s",
3635                  instance.name, instance.primary_node)
3636
3637     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3638     msg = result.fail_msg
3639     if msg:
3640       if self.op.ignore_failures:
3641         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3642       else:
3643         raise errors.OpExecError("Could not shutdown instance %s on"
3644                                  " node %s: %s" %
3645                                  (instance.name, instance.primary_node, msg))
3646
3647     logging.info("Removing block devices for instance %s", instance.name)
3648
3649     if not _RemoveDisks(self, instance):
3650       if self.op.ignore_failures:
3651         feedback_fn("Warning: can't remove instance's disks")
3652       else:
3653         raise errors.OpExecError("Can't remove instance's disks")
3654
3655     logging.info("Removing instance %s out of cluster config", instance.name)
3656
3657     self.cfg.RemoveInstance(instance.name)
3658     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3659
3660
3661 class LUQueryInstances(NoHooksLU):
3662   """Logical unit for querying instances.
3663
3664   """
3665   _OP_REQP = ["output_fields", "names", "use_locking"]
3666   REQ_BGL = False
3667   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3668                                     "admin_state",
3669                                     "disk_template", "ip", "mac", "bridge",
3670                                     "nic_mode", "nic_link",
3671                                     "sda_size", "sdb_size", "vcpus", "tags",
3672                                     "network_port", "beparams",
3673                                     r"(disk)\.(size)/([0-9]+)",
3674                                     r"(disk)\.(sizes)", "disk_usage",
3675                                     r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3676                                     r"(nic)\.(bridge)/([0-9]+)",
3677                                     r"(nic)\.(macs|ips|modes|links|bridges)",
3678                                     r"(disk|nic)\.(count)",
3679                                     "serial_no", "hypervisor", "hvparams",] +
3680                                   ["hv/%s" % name
3681                                    for name in constants.HVS_PARAMETERS] +
3682                                   ["be/%s" % name
3683                                    for name in constants.BES_PARAMETERS])
3684   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3685
3686
3687   def ExpandNames(self):
3688     _CheckOutputFields(static=self._FIELDS_STATIC,
3689                        dynamic=self._FIELDS_DYNAMIC,
3690                        selected=self.op.output_fields)
3691
3692     self.needed_locks = {}
3693     self.share_locks[locking.LEVEL_INSTANCE] = 1
3694     self.share_locks[locking.LEVEL_NODE] = 1
3695
3696     if self.op.names:
3697       self.wanted = _GetWantedInstances(self, self.op.names)
3698     else:
3699       self.wanted = locking.ALL_SET
3700
3701     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3702     self.do_locking = self.do_node_query and self.op.use_locking
3703     if self.do_locking:
3704       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3705       self.needed_locks[locking.LEVEL_NODE] = []
3706       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3707
3708   def DeclareLocks(self, level):
3709     if level == locking.LEVEL_NODE and self.do_locking:
3710       self._LockInstancesNodes()
3711
3712   def CheckPrereq(self):
3713     """Check prerequisites.
3714
3715     """
3716     pass
3717
3718   def Exec(self, feedback_fn):
3719     """Computes the list of nodes and their attributes.
3720
3721     """
3722     all_info = self.cfg.GetAllInstancesInfo()
3723     if self.wanted == locking.ALL_SET:
3724       # caller didn't specify instance names, so ordering is not important
3725       if self.do_locking:
3726         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3727       else:
3728         instance_names = all_info.keys()
3729       instance_names = utils.NiceSort(instance_names)
3730     else:
3731       # caller did specify names, so we must keep the ordering
3732       if self.do_locking:
3733         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3734       else:
3735         tgt_set = all_info.keys()
3736       missing = set(self.wanted).difference(tgt_set)
3737       if missing:
3738         raise errors.OpExecError("Some instances were removed before"
3739                                  " retrieving their data: %s" % missing)
3740       instance_names = self.wanted
3741
3742     instance_list = [all_info[iname] for iname in instance_names]
3743
3744     # begin data gathering
3745
3746     nodes = frozenset([inst.primary_node for inst in instance_list])
3747     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3748
3749     bad_nodes = []
3750     off_nodes = []
3751     if self.do_node_query:
3752       live_data = {}
3753       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3754       for name in nodes:
3755         result = node_data[name]
3756         if result.offline:
3757           # offline nodes will be in both lists
3758           off_nodes.append(name)
3759         if result.failed or result.fail_msg:
3760           bad_nodes.append(name)
3761         else:
3762           if result.payload:
3763             live_data.update(result.payload)
3764           # else no instance is alive
3765     else:
3766       live_data = dict([(name, {}) for name in instance_names])
3767
3768     # end data gathering
3769
3770     HVPREFIX = "hv/"
3771     BEPREFIX = "be/"
3772     output = []
3773     cluster = self.cfg.GetClusterInfo()
3774     for instance in instance_list:
3775       iout = []
3776       i_hv = cluster.FillHV(instance)
3777       i_be = cluster.FillBE(instance)
3778       i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
3779                                  nic.nicparams) for nic in instance.nics]
3780       for field in self.op.output_fields:
3781         st_match = self._FIELDS_STATIC.Matches(field)
3782         if field == "name":
3783           val = instance.name
3784         elif field == "os":
3785           val = instance.os
3786         elif field == "pnode":
3787           val = instance.primary_node
3788         elif field == "snodes":
3789           val = list(instance.secondary_nodes)
3790         elif field == "admin_state":
3791           val = instance.admin_up
3792         elif field == "oper_state":
3793           if instance.primary_node in bad_nodes:
3794             val = None
3795           else:
3796             val = bool(live_data.get(instance.name))
3797         elif field == "status":
3798           if instance.primary_node in off_nodes:
3799             val = "ERROR_nodeoffline"
3800           elif instance.primary_node in bad_nodes:
3801             val = "ERROR_nodedown"
3802           else:
3803             running = bool(live_data.get(instance.name))
3804             if running:
3805               if instance.admin_up:
3806                 val = "running"
3807               else:
3808                 val = "ERROR_up"
3809             else:
3810               if instance.admin_up:
3811                 val = "ERROR_down"
3812               else:
3813                 val = "ADMIN_down"
3814         elif field == "oper_ram":
3815           if instance.primary_node in bad_nodes:
3816             val = None
3817           elif instance.name in live_data:
3818             val = live_data[instance.name].get("memory", "?")
3819           else:
3820             val = "-"
3821         elif field == "vcpus":
3822           val = i_be[constants.BE_VCPUS]
3823         elif field == "disk_template":
3824           val = instance.disk_template
3825         elif field == "ip":
3826           if instance.nics:
3827             val = instance.nics[0].ip
3828           else:
3829             val = None
3830         elif field == "nic_mode":
3831           if instance.nics:
3832             val = i_nicp[0][constants.NIC_MODE]
3833           else:
3834             val = None
3835         elif field == "nic_link":
3836           if instance.nics:
3837             val = i_nicp[0][constants.NIC_LINK]
3838           else:
3839             val = None
3840         elif field == "bridge":
3841           if (instance.nics and
3842               i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
3843             val = i_nicp[0][constants.NIC_LINK]
3844           else:
3845             val = None
3846         elif field == "mac":
3847           if instance.nics:
3848             val = instance.nics[0].mac
3849           else:
3850             val = None
3851         elif field == "sda_size" or field == "sdb_size":
3852           idx = ord(field[2]) - ord('a')
3853           try:
3854             val = instance.FindDisk(idx).size
3855           except errors.OpPrereqError:
3856             val = None
3857         elif field == "disk_usage": # total disk usage per node
3858           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3859           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3860         elif field == "tags":
3861           val = list(instance.GetTags())
3862         elif field == "serial_no":
3863           val = instance.serial_no
3864         elif field == "network_port":
3865           val = instance.network_port
3866         elif field == "hypervisor":
3867           val = instance.hypervisor
3868         elif field == "hvparams":
3869           val = i_hv
3870         elif (field.startswith(HVPREFIX) and
3871               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3872           val = i_hv.get(field[len(HVPREFIX):], None)
3873         elif field == "beparams":
3874           val = i_be
3875         elif (field.startswith(BEPREFIX) and
3876               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3877           val = i_be.get(field[len(BEPREFIX):], None)
3878         elif st_match and st_match.groups():
3879           # matches a variable list
3880           st_groups = st_match.groups()
3881           if st_groups and st_groups[0] == "disk":
3882             if st_groups[1] == "count":
3883               val = len(instance.disks)
3884             elif st_groups[1] == "sizes":
3885               val = [disk.size for disk in instance.disks]
3886             elif st_groups[1] == "size":
3887               try:
3888                 val = instance.FindDisk(st_groups[2]).size
3889               except errors.OpPrereqError:
3890                 val = None
3891             else:
3892               assert False, "Unhandled disk parameter"
3893           elif st_groups[0] == "nic":
3894             if st_groups[1] == "count":
3895               val = len(instance.nics)
3896             elif st_groups[1] == "macs":
3897               val = [nic.mac for nic in instance.nics]
3898             elif st_groups[1] == "ips":
3899               val = [nic.ip for nic in instance.nics]
3900             elif st_groups[1] == "modes":
3901               val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
3902             elif st_groups[1] == "links":
3903               val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
3904             elif st_groups[1] == "bridges":
3905               val = []
3906               for nicp in i_nicp:
3907                 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3908                   val.append(nicp[constants.NIC_LINK])
3909                 else:
3910                   val.append(None)
3911             else:
3912               # index-based item
3913               nic_idx = int(st_groups[2])
3914               if nic_idx >= len(instance.nics):
3915                 val = None
3916               else:
3917                 if st_groups[1] == "mac":
3918                   val = instance.nics[nic_idx].mac
3919                 elif st_groups[1] == "ip":
3920                   val = instance.nics[nic_idx].ip
3921                 elif st_groups[1] == "mode":
3922                   val = i_nicp[nic_idx][constants.NIC_MODE]
3923                 elif st_groups[1] == "link":
3924                   val = i_nicp[nic_idx][constants.NIC_LINK]
3925                 elif st_groups[1] == "bridge":
3926                   nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
3927                   if nic_mode == constants.NIC_MODE_BRIDGED:
3928                     val = i_nicp[nic_idx][constants.NIC_LINK]
3929                   else:
3930                     val = None
3931                 else:
3932                   assert False, "Unhandled NIC parameter"
3933           else:
3934             assert False, ("Declared but unhandled variable parameter '%s'" %
3935                            field)
3936         else:
3937           assert False, "Declared but unhandled parameter '%s'" % field
3938         iout.append(val)
3939       output.append(iout)
3940
3941     return output
3942
3943
3944 class LUFailoverInstance(LogicalUnit):
3945   """Failover an instance.
3946
3947   """
3948   HPATH = "instance-failover"
3949   HTYPE = constants.HTYPE_INSTANCE
3950   _OP_REQP = ["instance_name", "ignore_consistency"]
3951   REQ_BGL = False
3952
3953   def ExpandNames(self):
3954     self._ExpandAndLockInstance()
3955     self.needed_locks[locking.LEVEL_NODE] = []
3956     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3957
3958   def DeclareLocks(self, level):
3959     if level == locking.LEVEL_NODE:
3960       self._LockInstancesNodes()
3961
3962   def BuildHooksEnv(self):
3963     """Build hooks env.
3964
3965     This runs on master, primary and secondary nodes of the instance.
3966
3967     """
3968     env = {
3969       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3970       }
3971     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3972     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3973     return env, nl, nl
3974
3975   def CheckPrereq(self):
3976     """Check prerequisites.
3977
3978     This checks that the instance is in the cluster.
3979
3980     """
3981     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3982     assert self.instance is not None, \
3983       "Cannot retrieve locked instance %s" % self.op.instance_name
3984
3985     bep = self.cfg.GetClusterInfo().FillBE(instance)
3986     if instance.disk_template not in constants.DTS_NET_MIRROR:
3987       raise errors.OpPrereqError("Instance's disk layout is not"
3988                                  " network mirrored, cannot failover.")
3989
3990     secondary_nodes = instance.secondary_nodes
3991     if not secondary_nodes:
3992       raise errors.ProgrammerError("no secondary node but using "
3993                                    "a mirrored disk template")
3994
3995     target_node = secondary_nodes[0]
3996     _CheckNodeOnline(self, target_node)
3997     _CheckNodeNotDrained(self, target_node)
3998     if instance.admin_up:
3999       # check memory requirements on the secondary node
4000       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
4001                            instance.name, bep[constants.BE_MEMORY],
4002                            instance.hypervisor)
4003     else:
4004       self.LogInfo("Not checking memory on the secondary node as"
4005                    " instance will not be started")
4006
4007     # check bridge existance
4008     _CheckInstanceBridgesExist(self, instance, node=target_node)
4009
4010   def Exec(self, feedback_fn):
4011     """Failover an instance.
4012
4013     The failover is done by shutting it down on its present node and
4014     starting it on the secondary.
4015
4016     """
4017     instance = self.instance
4018
4019     source_node = instance.primary_node
4020     target_node = instance.secondary_nodes[0]
4021
4022     feedback_fn("* checking disk consistency between source and target")
4023     for dev in instance.disks:
4024       # for drbd, these are drbd over lvm
4025       if not _CheckDiskConsistency(self, dev, target_node, False):
4026         if instance.admin_up and not self.op.ignore_consistency:
4027           raise errors.OpExecError("Disk %s is degraded on target node,"
4028                                    " aborting failover." % dev.iv_name)
4029
4030     feedback_fn("* shutting down instance on source node")
4031     logging.info("Shutting down instance %s on node %s",
4032                  instance.name, source_node)
4033
4034     result = self.rpc.call_instance_shutdown(source_node, instance)
4035     msg = result.fail_msg
4036     if msg:
4037       if self.op.ignore_consistency:
4038         self.proc.LogWarning("Could not shutdown instance %s on node %s."
4039                              " Proceeding anyway. Please make sure node"
4040                              " %s is down. Error details: %s",
4041                              instance.name, source_node, source_node, msg)
4042       else:
4043         raise errors.OpExecError("Could not shutdown instance %s on"
4044                                  " node %s: %s" %
4045                                  (instance.name, source_node, msg))
4046
4047     feedback_fn("* deactivating the instance's disks on source node")
4048     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
4049       raise errors.OpExecError("Can't shut down the instance's disks.")
4050
4051     instance.primary_node = target_node
4052     # distribute new instance config to the other nodes
4053     self.cfg.Update(instance)
4054
4055     # Only start the instance if it's marked as up
4056     if instance.admin_up:
4057       feedback_fn("* activating the instance's disks on target node")
4058       logging.info("Starting instance %s on node %s",
4059                    instance.name, target_node)
4060
4061       disks_ok, _ = _AssembleInstanceDisks(self, instance,
4062                                                ignore_secondaries=True)
4063       if not disks_ok:
4064         _ShutdownInstanceDisks(self, instance)
4065         raise errors.OpExecError("Can't activate the instance's disks")
4066
4067       feedback_fn("* starting the instance on the target node")
4068       result = self.rpc.call_instance_start(target_node, instance, None, None)
4069       msg = result.fail_msg
4070       if msg:
4071         _ShutdownInstanceDisks(self, instance)
4072         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
4073                                  (instance.name, target_node, msg))
4074
4075
4076 class LUMigrateInstance(LogicalUnit):
4077   """Migrate an instance.
4078
4079   This is migration without shutting down, compared to the failover,
4080   which is done with shutdown.
4081
4082   """
4083   HPATH = "instance-migrate"
4084   HTYPE = constants.HTYPE_INSTANCE
4085   _OP_REQP = ["instance_name", "live", "cleanup"]
4086
4087   REQ_BGL = False
4088
4089   def ExpandNames(self):
4090     self._ExpandAndLockInstance()
4091
4092     self.needed_locks[locking.LEVEL_NODE] = []
4093     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4094
4095     self._migrater = TLMigrateInstance(self, self.op.instance_name,
4096                                        self.op.live, self.op.cleanup)
4097     self.tasklets = [self._migrater]
4098
4099   def DeclareLocks(self, level):
4100     if level == locking.LEVEL_NODE:
4101       self._LockInstancesNodes()
4102
4103   def BuildHooksEnv(self):
4104     """Build hooks env.
4105
4106     This runs on master, primary and secondary nodes of the instance.
4107
4108     """
4109     instance = self._migrater.instance
4110     env = _BuildInstanceHookEnvByObject(self, instance)
4111     env["MIGRATE_LIVE"] = self.op.live
4112     env["MIGRATE_CLEANUP"] = self.op.cleanup
4113     nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
4114     return env, nl, nl
4115
4116
4117 class LUMigrateNode(LogicalUnit):
4118   """Migrate all instances from a node.
4119
4120   """
4121   HPATH = "node-migrate"
4122   HTYPE = constants.HTYPE_NODE
4123   _OP_REQP = ["node_name", "live"]
4124   REQ_BGL = False
4125
4126   def ExpandNames(self):
4127     self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
4128     if self.op.node_name is None:
4129       raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
4130
4131     self.needed_locks = {
4132       locking.LEVEL_NODE: [self.op.node_name],
4133       }
4134
4135     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
4136
4137     # Create tasklets for migrating instances for all instances on this node
4138     names = []
4139     tasklets = []
4140
4141     for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name):
4142       logging.debug("Migrating instance %s", inst.name)
4143       names.append(inst.name)
4144
4145       tasklets.append(TLMigrateInstance(self, inst.name, self.op.live, False))
4146
4147     self.tasklets = tasklets
4148
4149     # Declare instance locks
4150     self.needed_locks[locking.LEVEL_INSTANCE] = names
4151
4152   def DeclareLocks(self, level):
4153     if level == locking.LEVEL_NODE:
4154       self._LockInstancesNodes()
4155
4156   def BuildHooksEnv(self):
4157     """Build hooks env.
4158
4159     This runs on the master, the primary and all the secondaries.
4160
4161     """
4162     env = {
4163       "NODE_NAME": self.op.node_name,
4164       }
4165
4166     nl = [self.cfg.GetMasterNode()]
4167
4168     return (env, nl, nl)
4169
4170
4171 class TLMigrateInstance(Tasklet):
4172   def __init__(self, lu, instance_name, live, cleanup):
4173     """Initializes this class.
4174
4175     """
4176     Tasklet.__init__(self, lu)
4177
4178     # Parameters
4179     self.instance_name = instance_name
4180     self.live = live
4181     self.cleanup = cleanup
4182
4183   def CheckPrereq(self):
4184     """Check prerequisites.
4185
4186     This checks that the instance is in the cluster.
4187
4188     """
4189     instance = self.cfg.GetInstanceInfo(
4190       self.cfg.ExpandInstanceName(self.instance_name))
4191     if instance is None:
4192       raise errors.OpPrereqError("Instance '%s' not known" %
4193                                  self.instance_name)
4194
4195     if instance.disk_template != constants.DT_DRBD8:
4196       raise errors.OpPrereqError("Instance's disk layout is not"
4197                                  " drbd8, cannot migrate.")
4198
4199     secondary_nodes = instance.secondary_nodes
4200     if not secondary_nodes:
4201       raise errors.ConfigurationError("No secondary node but using"
4202                                       " drbd8 disk template")
4203
4204     i_be = self.cfg.GetClusterInfo().FillBE(instance)
4205
4206     target_node = secondary_nodes[0]
4207     # check memory requirements on the secondary node
4208     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
4209                          instance.name, i_be[constants.BE_MEMORY],
4210                          instance.hypervisor)
4211
4212     # check bridge existance
4213     _CheckInstanceBridgesExist(self, instance, node=target_node)
4214
4215     if not self.cleanup:
4216       _CheckNodeNotDrained(self, target_node)
4217       result = self.rpc.call_instance_migratable(instance.primary_node,
4218                                                  instance)
4219       result.Raise("Can't migrate, please use failover", prereq=True)
4220
4221     self.instance = instance
4222
4223   def _WaitUntilSync(self):
4224     """Poll with custom rpc for disk sync.
4225
4226     This uses our own step-based rpc call.
4227
4228     """
4229     self.feedback_fn("* wait until resync is done")
4230     all_done = False
4231     while not all_done:
4232       all_done = True
4233       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
4234                                             self.nodes_ip,
4235                                             self.instance.disks)
4236       min_percent = 100
4237       for node, nres in result.items():
4238         nres.Raise("Cannot resync disks on node %s" % node)
4239         node_done, node_percent = nres.payload
4240         all_done = all_done and node_done
4241         if node_percent is not None:
4242           min_percent = min(min_percent, node_percent)
4243       if not all_done:
4244         if min_percent < 100:
4245           self.feedback_fn("   - progress: %.1f%%" % min_percent)
4246         time.sleep(2)
4247
4248   def _EnsureSecondary(self, node):
4249     """Demote a node to secondary.
4250
4251     """
4252     self.feedback_fn("* switching node %s to secondary mode" % node)
4253
4254     for dev in self.instance.disks:
4255       self.cfg.SetDiskID(dev, node)
4256
4257     result = self.rpc.call_blockdev_close(node, self.instance.name,
4258                                           self.instance.disks)
4259     result.Raise("Cannot change disk to secondary on node %s" % node)
4260
4261   def _GoStandalone(self):
4262     """Disconnect from the network.
4263
4264     """
4265     self.feedback_fn("* changing into standalone mode")
4266     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
4267                                                self.instance.disks)
4268     for node, nres in result.items():
4269       nres.Raise("Cannot disconnect disks node %s" % node)
4270
4271   def _GoReconnect(self, multimaster):
4272     """Reconnect to the network.
4273
4274     """
4275     if multimaster:
4276       msg = "dual-master"
4277     else:
4278       msg = "single-master"
4279     self.feedback_fn("* changing disks into %s mode" % msg)
4280     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
4281                                            self.instance.disks,
4282                                            self.instance.name, multimaster)
4283     for node, nres in result.items():
4284       nres.Raise("Cannot change disks config on node %s" % node)
4285
4286   def _ExecCleanup(self):
4287     """Try to cleanup after a failed migration.
4288
4289     The cleanup is done by:
4290       - check that the instance is running only on one node
4291         (and update the config if needed)
4292       - change disks on its secondary node to secondary
4293       - wait until disks are fully synchronized
4294       - disconnect from the network
4295       - change disks into single-master mode
4296       - wait again until disks are fully synchronized
4297
4298     """
4299     instance = self.instance
4300     target_node = self.target_node
4301     source_node = self.source_node
4302
4303     # check running on only one node
4304     self.feedback_fn("* checking where the instance actually runs"
4305                      " (if this hangs, the hypervisor might be in"
4306                      " a bad state)")
4307     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4308     for node, result in ins_l.items():
4309       result.Raise("Can't contact node %s" % node)
4310
4311     runningon_source = instance.name in ins_l[source_node].payload
4312     runningon_target = instance.name in ins_l[target_node].payload
4313
4314     if runningon_source and runningon_target:
4315       raise errors.OpExecError("Instance seems to be running on two nodes,"
4316                                " or the hypervisor is confused. You will have"
4317                                " to ensure manually that it runs only on one"
4318                                " and restart this operation.")
4319
4320     if not (runningon_source or runningon_target):
4321       raise errors.OpExecError("Instance does not seem to be running at all."
4322                                " In this case, it's safer to repair by"
4323                                " running 'gnt-instance stop' to ensure disk"
4324                                " shutdown, and then restarting it.")
4325
4326     if runningon_target:
4327       # the migration has actually succeeded, we need to update the config
4328       self.feedback_fn("* instance running on secondary node (%s),"
4329                        " updating config" % target_node)
4330       instance.primary_node = target_node
4331       self.cfg.Update(instance)
4332       demoted_node = source_node
4333     else:
4334       self.feedback_fn("* instance confirmed to be running on its"
4335                        " primary node (%s)" % source_node)
4336       demoted_node = target_node
4337
4338     self._EnsureSecondary(demoted_node)
4339     try:
4340       self._WaitUntilSync()
4341     except errors.OpExecError:
4342       # we ignore here errors, since if the device is standalone, it
4343       # won't be able to sync
4344       pass
4345     self._GoStandalone()
4346     self._GoReconnect(False)
4347     self._WaitUntilSync()
4348
4349     self.feedback_fn("* done")
4350
4351   def _RevertDiskStatus(self):
4352     """Try to revert the disk status after a failed migration.
4353
4354     """
4355     target_node = self.target_node
4356     try:
4357       self._EnsureSecondary(target_node)
4358       self._GoStandalone()
4359       self._GoReconnect(False)
4360       self._WaitUntilSync()
4361     except errors.OpExecError, err:
4362       self.lu.LogWarning("Migration failed and I can't reconnect the"
4363                          " drives: error '%s'\n"
4364                          "Please look and recover the instance status" %
4365                          str(err))
4366
4367   def _AbortMigration(self):
4368     """Call the hypervisor code to abort a started migration.
4369
4370     """
4371     instance = self.instance
4372     target_node = self.target_node
4373     migration_info = self.migration_info
4374
4375     abort_result = self.rpc.call_finalize_migration(target_node,
4376                                                     instance,
4377                                                     migration_info,
4378                                                     False)
4379     abort_msg = abort_result.fail_msg
4380     if abort_msg:
4381       logging.error("Aborting migration failed on target node %s: %s" %
4382                     (target_node, abort_msg))
4383       # Don't raise an exception here, as we stil have to try to revert the
4384       # disk status, even if this step failed.
4385
4386   def _ExecMigration(self):
4387     """Migrate an instance.
4388
4389     The migrate is done by:
4390       - change the disks into dual-master mode
4391       - wait until disks are fully synchronized again
4392       - migrate the instance
4393       - change disks on the new secondary node (the old primary) to secondary
4394       - wait until disks are fully synchronized
4395       - change disks into single-master mode
4396
4397     """
4398     instance = self.instance
4399     target_node = self.target_node
4400     source_node = self.source_node
4401
4402     self.feedback_fn("* checking disk consistency between source and target")
4403     for dev in instance.disks:
4404       if not _CheckDiskConsistency(self, dev, target_node, False):
4405         raise errors.OpExecError("Disk %s is degraded or not fully"
4406                                  " synchronized on target node,"
4407                                  " aborting migrate." % dev.iv_name)
4408
4409     # First get the migration information from the remote node
4410     result = self.rpc.call_migration_info(source_node, instance)
4411     msg = result.fail_msg
4412     if msg:
4413       log_err = ("Failed fetching source migration information from %s: %s" %
4414                  (source_node, msg))
4415       logging.error(log_err)
4416       raise errors.OpExecError(log_err)
4417
4418     self.migration_info = migration_info = result.payload
4419
4420     # Then switch the disks to master/master mode
4421     self._EnsureSecondary(target_node)
4422     self._GoStandalone()
4423     self._GoReconnect(True)
4424     self._WaitUntilSync()
4425
4426     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4427     result = self.rpc.call_accept_instance(target_node,
4428                                            instance,
4429                                            migration_info,
4430                                            self.nodes_ip[target_node])
4431
4432     msg = result.fail_msg
4433     if msg:
4434       logging.error("Instance pre-migration failed, trying to revert"
4435                     " disk status: %s", msg)
4436       self._AbortMigration()
4437       self._RevertDiskStatus()
4438       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4439                                (instance.name, msg))
4440
4441     self.feedback_fn("* migrating instance to %s" % target_node)
4442     time.sleep(10)
4443     result = self.rpc.call_instance_migrate(source_node, instance,
4444                                             self.nodes_ip[target_node],
4445                                             self.live)
4446     msg = result.fail_msg
4447     if msg:
4448       logging.error("Instance migration failed, trying to revert"
4449                     " disk status: %s", msg)
4450       self._AbortMigration()
4451       self._RevertDiskStatus()
4452       raise errors.OpExecError("Could not migrate instance %s: %s" %
4453                                (instance.name, msg))
4454     time.sleep(10)
4455
4456     instance.primary_node = target_node
4457     # distribute new instance config to the other nodes
4458     self.cfg.Update(instance)
4459
4460     result = self.rpc.call_finalize_migration(target_node,
4461                                               instance,
4462                                               migration_info,
4463                                               True)
4464     msg = result.fail_msg
4465     if msg:
4466       logging.error("Instance migration succeeded, but finalization failed:"
4467                     " %s" % msg)
4468       raise errors.OpExecError("Could not finalize instance migration: %s" %
4469                                msg)
4470
4471     self._EnsureSecondary(source_node)
4472     self._WaitUntilSync()
4473     self._GoStandalone()
4474     self._GoReconnect(False)
4475     self._WaitUntilSync()
4476
4477     self.feedback_fn("* done")
4478
4479   def Exec(self, feedback_fn):
4480     """Perform the migration.
4481
4482     """
4483     feedback_fn("Migrating instance %s" % self.instance.name)
4484
4485     self.feedback_fn = feedback_fn
4486
4487     self.source_node = self.instance.primary_node
4488     self.target_node = self.instance.secondary_nodes[0]
4489     self.all_nodes = [self.source_node, self.target_node]
4490     self.nodes_ip = {
4491       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4492       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4493       }
4494
4495     if self.cleanup:
4496       return self._ExecCleanup()
4497     else:
4498       return self._ExecMigration()
4499
4500
4501 def _CreateBlockDev(lu, node, instance, device, force_create,
4502                     info, force_open):
4503   """Create a tree of block devices on a given node.
4504
4505   If this device type has to be created on secondaries, create it and
4506   all its children.
4507
4508   If not, just recurse to children keeping the same 'force' value.
4509
4510   @param lu: the lu on whose behalf we execute
4511   @param node: the node on which to create the device
4512   @type instance: L{objects.Instance}
4513   @param instance: the instance which owns the device
4514   @type device: L{objects.Disk}
4515   @param device: the device to create
4516   @type force_create: boolean
4517   @param force_create: whether to force creation of this device; this
4518       will be change to True whenever we find a device which has
4519       CreateOnSecondary() attribute
4520   @param info: the extra 'metadata' we should attach to the device
4521       (this will be represented as a LVM tag)
4522   @type force_open: boolean
4523   @param force_open: this parameter will be passes to the
4524       L{backend.BlockdevCreate} function where it specifies
4525       whether we run on primary or not, and it affects both
4526       the child assembly and the device own Open() execution
4527
4528   """
4529   if device.CreateOnSecondary():
4530     force_create = True
4531
4532   if device.children:
4533     for child in device.children:
4534       _CreateBlockDev(lu, node, instance, child, force_create,
4535                       info, force_open)
4536
4537   if not force_create:
4538     return
4539
4540   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4541
4542
4543 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4544   """Create a single block device on a given node.
4545
4546   This will not recurse over children of the device, so they must be
4547   created in advance.
4548
4549   @param lu: the lu on whose behalf we execute
4550   @param node: the node on which to create the device
4551   @type instance: L{objects.Instance}
4552   @param instance: the instance which owns the device
4553   @type device: L{objects.Disk}
4554   @param device: the device to create
4555   @param info: the extra 'metadata' we should attach to the device
4556       (this will be represented as a LVM tag)
4557   @type force_open: boolean
4558   @param force_open: this parameter will be passes to the
4559       L{backend.BlockdevCreate} function where it specifies
4560       whether we run on primary or not, and it affects both
4561       the child assembly and the device own Open() execution
4562
4563   """
4564   lu.cfg.SetDiskID(device, node)
4565   result = lu.rpc.call_blockdev_create(node, device, device.size,
4566                                        instance.name, force_open, info)
4567   result.Raise("Can't create block device %s on"
4568                " node %s for instance %s" % (device, node, instance.name))
4569   if device.physical_id is None:
4570     device.physical_id = result.payload
4571
4572
4573 def _GenerateUniqueNames(lu, exts):
4574   """Generate a suitable LV name.
4575
4576   This will generate a logical volume name for the given instance.
4577
4578   """
4579   results = []
4580   for val in exts:
4581     new_id = lu.cfg.GenerateUniqueID()
4582     results.append("%s%s" % (new_id, val))
4583   return results
4584
4585
4586 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4587                          p_minor, s_minor):
4588   """Generate a drbd8 device complete with its children.
4589
4590   """
4591   port = lu.cfg.AllocatePort()
4592   vgname = lu.cfg.GetVGName()
4593   shared_secret = lu.cfg.GenerateDRBDSecret()
4594   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4595                           logical_id=(vgname, names[0]))
4596   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4597                           logical_id=(vgname, names[1]))
4598   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4599                           logical_id=(primary, secondary, port,
4600                                       p_minor, s_minor,
4601                                       shared_secret),
4602                           children=[dev_data, dev_meta],
4603                           iv_name=iv_name)
4604   return drbd_dev
4605
4606
4607 def _GenerateDiskTemplate(lu, template_name,
4608                           instance_name, primary_node,
4609                           secondary_nodes, disk_info,
4610                           file_storage_dir, file_driver,
4611                           base_index):
4612   """Generate the entire disk layout for a given template type.
4613
4614   """
4615   #TODO: compute space requirements
4616
4617   vgname = lu.cfg.GetVGName()
4618   disk_count = len(disk_info)
4619   disks = []
4620   if template_name == constants.DT_DISKLESS:
4621     pass
4622   elif template_name == constants.DT_PLAIN:
4623     if len(secondary_nodes) != 0:
4624       raise errors.ProgrammerError("Wrong template configuration")
4625
4626     names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4627                                       for i in range(disk_count)])
4628     for idx, disk in enumerate(disk_info):
4629       disk_index = idx + base_index
4630       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4631                               logical_id=(vgname, names[idx]),
4632                               iv_name="disk/%d" % disk_index,
4633                               mode=disk["mode"])
4634       disks.append(disk_dev)
4635   elif template_name == constants.DT_DRBD8:
4636     if len(secondary_nodes) != 1:
4637       raise errors.ProgrammerError("Wrong template configuration")
4638     remote_node = secondary_nodes[0]
4639     minors = lu.cfg.AllocateDRBDMinor(
4640       [primary_node, remote_node] * len(disk_info), instance_name)
4641
4642     names = []
4643     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4644                                                for i in range(disk_count)]):
4645       names.append(lv_prefix + "_data")
4646       names.append(lv_prefix + "_meta")
4647     for idx, disk in enumerate(disk_info):
4648       disk_index = idx + base_index
4649       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4650                                       disk["size"], names[idx*2:idx*2+2],
4651                                       "disk/%d" % disk_index,
4652                                       minors[idx*2], minors[idx*2+1])
4653       disk_dev.mode = disk["mode"]
4654       disks.append(disk_dev)
4655   elif template_name == constants.DT_FILE:
4656     if len(secondary_nodes) != 0:
4657       raise errors.ProgrammerError("Wrong template configuration")
4658
4659     for idx, disk in enumerate(disk_info):
4660       disk_index = idx + base_index
4661       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4662                               iv_name="disk/%d" % disk_index,
4663                               logical_id=(file_driver,
4664                                           "%s/disk%d" % (file_storage_dir,
4665                                                          disk_index)),
4666                               mode=disk["mode"])
4667       disks.append(disk_dev)
4668   else:
4669     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4670   return disks
4671
4672
4673 def _GetInstanceInfoText(instance):
4674   """Compute that text that should be added to the disk's metadata.
4675
4676   """
4677   return "originstname+%s" % instance.name
4678
4679
4680 def _CreateDisks(lu, instance):
4681   """Create all disks for an instance.
4682
4683   This abstracts away some work from AddInstance.
4684
4685   @type lu: L{LogicalUnit}
4686   @param lu: the logical unit on whose behalf we execute
4687   @type instance: L{objects.Instance}
4688   @param instance: the instance whose disks we should create
4689   @rtype: boolean
4690   @return: the success of the creation
4691
4692   """
4693   info = _GetInstanceInfoText(instance)
4694   pnode = instance.primary_node
4695
4696   if instance.disk_template == constants.DT_FILE:
4697     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4698     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4699
4700     result.Raise("Failed to create directory '%s' on"
4701                  " node %s: %s" % (file_storage_dir, pnode))
4702
4703   # Note: this needs to be kept in sync with adding of disks in
4704   # LUSetInstanceParams
4705   for device in instance.disks:
4706     logging.info("Creating volume %s for instance %s",
4707                  device.iv_name, instance.name)
4708     #HARDCODE
4709     for node in instance.all_nodes:
4710       f_create = node == pnode
4711       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4712
4713
4714 def _RemoveDisks(lu, instance):
4715   """Remove all disks for an instance.
4716
4717   This abstracts away some work from `AddInstance()` and
4718   `RemoveInstance()`. Note that in case some of the devices couldn't
4719   be removed, the removal will continue with the other ones (compare
4720   with `_CreateDisks()`).
4721
4722   @type lu: L{LogicalUnit}
4723   @param lu: the logical unit on whose behalf we execute
4724   @type instance: L{objects.Instance}
4725   @param instance: the instance whose disks we should remove
4726   @rtype: boolean
4727   @return: the success of the removal
4728
4729   """
4730   logging.info("Removing block devices for instance %s", instance.name)
4731
4732   all_result = True
4733   for device in instance.disks:
4734     for node, disk in device.ComputeNodeTree(instance.primary_node):
4735       lu.cfg.SetDiskID(disk, node)
4736       msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4737       if msg:
4738         lu.LogWarning("Could not remove block device %s on node %s,"
4739                       " continuing anyway: %s", device.iv_name, node, msg)
4740         all_result = False
4741
4742   if instance.disk_template == constants.DT_FILE:
4743     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4744     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4745                                                  file_storage_dir)
4746     msg = result.fail_msg
4747     if msg:
4748       lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4749                     file_storage_dir, instance.primary_node, msg)
4750       all_result = False
4751
4752   return all_result
4753
4754
4755 def _ComputeDiskSize(disk_template, disks):
4756   """Compute disk size requirements in the volume group
4757
4758   """
4759   # Required free disk space as a function of disk and swap space
4760   req_size_dict = {
4761     constants.DT_DISKLESS: None,
4762     constants.DT_PLAIN: sum(d["size"] for d in disks),
4763     # 128 MB are added for drbd metadata for each disk
4764     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4765     constants.DT_FILE: None,
4766   }
4767
4768   if disk_template not in req_size_dict:
4769     raise errors.ProgrammerError("Disk template '%s' size requirement"
4770                                  " is unknown" %  disk_template)
4771
4772   return req_size_dict[disk_template]
4773
4774
4775 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4776   """Hypervisor parameter validation.
4777
4778   This function abstract the hypervisor parameter validation to be
4779   used in both instance create and instance modify.
4780
4781   @type lu: L{LogicalUnit}
4782   @param lu: the logical unit for which we check
4783   @type nodenames: list
4784   @param nodenames: the list of nodes on which we should check
4785   @type hvname: string
4786   @param hvname: the name of the hypervisor we should use
4787   @type hvparams: dict
4788   @param hvparams: the parameters which we need to check
4789   @raise errors.OpPrereqError: if the parameters are not valid
4790
4791   """
4792   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4793                                                   hvname,
4794                                                   hvparams)
4795   for node in nodenames:
4796     info = hvinfo[node]
4797     if info.offline:
4798       continue
4799     info.Raise("Hypervisor parameter validation failed on node %s" % node)
4800
4801
4802 class LUCreateInstance(LogicalUnit):
4803   """Create an instance.
4804
4805   """
4806   HPATH = "instance-add"
4807   HTYPE = constants.HTYPE_INSTANCE
4808   _OP_REQP = ["instance_name", "disks", "disk_template",
4809               "mode", "start",
4810               "wait_for_sync", "ip_check", "nics",
4811               "hvparams", "beparams"]
4812   REQ_BGL = False
4813
4814   def _ExpandNode(self, node):
4815     """Expands and checks one node name.
4816
4817     """
4818     node_full = self.cfg.ExpandNodeName(node)
4819     if node_full is None:
4820       raise errors.OpPrereqError("Unknown node %s" % node)
4821     return node_full
4822
4823   def ExpandNames(self):
4824     """ExpandNames for CreateInstance.
4825
4826     Figure out the right locks for instance creation.
4827
4828     """
4829     self.needed_locks = {}
4830
4831     # set optional parameters to none if they don't exist
4832     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4833       if not hasattr(self.op, attr):
4834         setattr(self.op, attr, None)
4835
4836     # cheap checks, mostly valid constants given
4837
4838     # verify creation mode
4839     if self.op.mode not in (constants.INSTANCE_CREATE,
4840                             constants.INSTANCE_IMPORT):
4841       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4842                                  self.op.mode)
4843
4844     # disk template and mirror node verification
4845     if self.op.disk_template not in constants.DISK_TEMPLATES:
4846       raise errors.OpPrereqError("Invalid disk template name")
4847
4848     if self.op.hypervisor is None:
4849       self.op.hypervisor = self.cfg.GetHypervisorType()
4850
4851     cluster = self.cfg.GetClusterInfo()
4852     enabled_hvs = cluster.enabled_hypervisors
4853     if self.op.hypervisor not in enabled_hvs:
4854       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4855                                  " cluster (%s)" % (self.op.hypervisor,
4856                                   ",".join(enabled_hvs)))
4857
4858     # check hypervisor parameter syntax (locally)
4859     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4860     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4861                                   self.op.hvparams)
4862     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4863     hv_type.CheckParameterSyntax(filled_hvp)
4864     self.hv_full = filled_hvp
4865
4866     # fill and remember the beparams dict
4867     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4868     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4869                                     self.op.beparams)
4870
4871     #### instance parameters check
4872
4873     # instance name verification
4874     hostname1 = utils.HostInfo(self.op.instance_name)
4875     self.op.instance_name = instance_name = hostname1.name
4876
4877     # this is just a preventive check, but someone might still add this
4878     # instance in the meantime, and creation will fail at lock-add time
4879     if instance_name in self.cfg.GetInstanceList():
4880       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4881                                  instance_name)
4882
4883     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4884
4885     # NIC buildup
4886     self.nics = []
4887     for idx, nic in enumerate(self.op.nics):
4888       nic_mode_req = nic.get("mode", None)
4889       nic_mode = nic_mode_req
4890       if nic_mode is None:
4891         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4892
4893       # in routed mode, for the first nic, the default ip is 'auto'
4894       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4895         default_ip_mode = constants.VALUE_AUTO
4896       else:
4897         default_ip_mode = constants.VALUE_NONE
4898
4899       # ip validity checks
4900       ip = nic.get("ip", default_ip_mode)
4901       if ip is None or ip.lower() == constants.VALUE_NONE:
4902         nic_ip = None
4903       elif ip.lower() == constants.VALUE_AUTO:
4904         nic_ip = hostname1.ip
4905       else:
4906         if not utils.IsValidIP(ip):
4907           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4908                                      " like a valid IP" % ip)
4909         nic_ip = ip
4910
4911       # TODO: check the ip for uniqueness !!
4912       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4913         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4914
4915       # MAC address verification
4916       mac = nic.get("mac", constants.VALUE_AUTO)
4917       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4918         if not utils.IsValidMac(mac.lower()):
4919           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4920                                      mac)
4921       # bridge verification
4922       bridge = nic.get("bridge", None)
4923       link = nic.get("link", None)
4924       if bridge and link:
4925         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
4926                                    " at the same time")
4927       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4928         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4929       elif bridge:
4930         link = bridge
4931
4932       nicparams = {}
4933       if nic_mode_req:
4934         nicparams[constants.NIC_MODE] = nic_mode_req
4935       if link:
4936         nicparams[constants.NIC_LINK] = link
4937
4938       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4939                                       nicparams)
4940       objects.NIC.CheckParameterSyntax(check_params)
4941       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4942
4943     # disk checks/pre-build
4944     self.disks = []
4945     for disk in self.op.disks:
4946       mode = disk.get("mode", constants.DISK_RDWR)
4947       if mode not in constants.DISK_ACCESS_SET:
4948         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4949                                    mode)
4950       size = disk.get("size", None)
4951       if size is None:
4952         raise errors.OpPrereqError("Missing disk size")
4953       try:
4954         size = int(size)
4955       except ValueError:
4956         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4957       self.disks.append({"size": size, "mode": mode})
4958
4959     # used in CheckPrereq for ip ping check
4960     self.check_ip = hostname1.ip
4961
4962     # file storage checks
4963     if (self.op.file_driver and
4964         not self.op.file_driver in constants.FILE_DRIVER):
4965       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4966                                  self.op.file_driver)
4967
4968     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4969       raise errors.OpPrereqError("File storage directory path not absolute")
4970
4971     ### Node/iallocator related checks
4972     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4973       raise errors.OpPrereqError("One and only one of iallocator and primary"
4974                                  " node must be given")
4975
4976     if self.op.iallocator:
4977       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4978     else:
4979       self.op.pnode = self._ExpandNode(self.op.pnode)
4980       nodelist = [self.op.pnode]
4981       if self.op.snode is not None:
4982         self.op.snode = self._ExpandNode(self.op.snode)
4983         nodelist.append(self.op.snode)
4984       self.needed_locks[locking.LEVEL_NODE] = nodelist
4985
4986     # in case of import lock the source node too
4987     if self.op.mode == constants.INSTANCE_IMPORT:
4988       src_node = getattr(self.op, "src_node", None)
4989       src_path = getattr(self.op, "src_path", None)
4990
4991       if src_path is None:
4992         self.op.src_path = src_path = self.op.instance_name
4993
4994       if src_node is None:
4995         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4996         self.op.src_node = None
4997         if os.path.isabs(src_path):
4998           raise errors.OpPrereqError("Importing an instance from an absolute"
4999                                      " path requires a source node option.")
5000       else:
5001         self.op.src_node = src_node = self._ExpandNode(src_node)
5002         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
5003           self.needed_locks[locking.LEVEL_NODE].append(src_node)
5004         if not os.path.isabs(src_path):
5005           self.op.src_path = src_path = \
5006             os.path.join(constants.EXPORT_DIR, src_path)
5007
5008     else: # INSTANCE_CREATE
5009       if getattr(self.op, "os_type", None) is None:
5010         raise errors.OpPrereqError("No guest OS specified")
5011
5012   def _RunAllocator(self):
5013     """Run the allocator based on input opcode.
5014
5015     """
5016     nics = [n.ToDict() for n in self.nics]
5017     ial = IAllocator(self.cfg, self.rpc,
5018                      mode=constants.IALLOCATOR_MODE_ALLOC,
5019                      name=self.op.instance_name,
5020                      disk_template=self.op.disk_template,
5021                      tags=[],
5022                      os=self.op.os_type,
5023                      vcpus=self.be_full[constants.BE_VCPUS],
5024                      mem_size=self.be_full[constants.BE_MEMORY],
5025                      disks=self.disks,
5026                      nics=nics,
5027                      hypervisor=self.op.hypervisor,
5028                      )
5029
5030     ial.Run(self.op.iallocator)
5031
5032     if not ial.success:
5033       raise errors.OpPrereqError("Can't compute nodes using"
5034                                  " iallocator '%s': %s" % (self.op.iallocator,
5035                                                            ial.info))
5036     if len(ial.nodes) != ial.required_nodes:
5037       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5038                                  " of nodes (%s), required %s" %
5039                                  (self.op.iallocator, len(ial.nodes),
5040                                   ial.required_nodes))
5041     self.op.pnode = ial.nodes[0]
5042     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
5043                  self.op.instance_name, self.op.iallocator,
5044                  ", ".join(ial.nodes))
5045     if ial.required_nodes == 2:
5046       self.op.snode = ial.nodes[1]
5047
5048   def BuildHooksEnv(self):
5049     """Build hooks env.
5050
5051     This runs on master, primary and secondary nodes of the instance.
5052
5053     """
5054     env = {
5055       "ADD_MODE": self.op.mode,
5056       }
5057     if self.op.mode == constants.INSTANCE_IMPORT:
5058       env["SRC_NODE"] = self.op.src_node
5059       env["SRC_PATH"] = self.op.src_path
5060       env["SRC_IMAGES"] = self.src_images
5061
5062     env.update(_BuildInstanceHookEnv(
5063       name=self.op.instance_name,
5064       primary_node=self.op.pnode,
5065       secondary_nodes=self.secondaries,
5066       status=self.op.start,
5067       os_type=self.op.os_type,
5068       memory=self.be_full[constants.BE_MEMORY],
5069       vcpus=self.be_full[constants.BE_VCPUS],
5070       nics=_NICListToTuple(self, self.nics),
5071       disk_template=self.op.disk_template,
5072       disks=[(d["size"], d["mode"]) for d in self.disks],
5073       bep=self.be_full,
5074       hvp=self.hv_full,
5075       hypervisor_name=self.op.hypervisor,
5076     ))
5077
5078     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
5079           self.secondaries)
5080     return env, nl, nl
5081
5082
5083   def CheckPrereq(self):
5084     """Check prerequisites.
5085
5086     """
5087     if (not self.cfg.GetVGName() and
5088         self.op.disk_template not in constants.DTS_NOT_LVM):
5089       raise errors.OpPrereqError("Cluster does not support lvm-based"
5090                                  " instances")
5091
5092     if self.op.mode == constants.INSTANCE_IMPORT:
5093       src_node = self.op.src_node
5094       src_path = self.op.src_path
5095
5096       if src_node is None:
5097         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
5098         exp_list = self.rpc.call_export_list(locked_nodes)
5099         found = False
5100         for node in exp_list:
5101           if exp_list[node].fail_msg:
5102             continue
5103           if src_path in exp_list[node].payload:
5104             found = True
5105             self.op.src_node = src_node = node
5106             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
5107                                                        src_path)
5108             break
5109         if not found:
5110           raise errors.OpPrereqError("No export found for relative path %s" %
5111                                       src_path)
5112
5113       _CheckNodeOnline(self, src_node)
5114       result = self.rpc.call_export_info(src_node, src_path)
5115       result.Raise("No export or invalid export found in dir %s" % src_path)
5116
5117       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
5118       if not export_info.has_section(constants.INISECT_EXP):
5119         raise errors.ProgrammerError("Corrupted export config")
5120
5121       ei_version = export_info.get(constants.INISECT_EXP, 'version')
5122       if (int(ei_version) != constants.EXPORT_VERSION):
5123         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
5124                                    (ei_version, constants.EXPORT_VERSION))
5125
5126       # Check that the new instance doesn't have less disks than the export
5127       instance_disks = len(self.disks)
5128       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
5129       if instance_disks < export_disks:
5130         raise errors.OpPrereqError("Not enough disks to import."
5131                                    " (instance: %d, export: %d)" %
5132                                    (instance_disks, export_disks))
5133
5134       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
5135       disk_images = []
5136       for idx in range(export_disks):
5137         option = 'disk%d_dump' % idx
5138         if export_info.has_option(constants.INISECT_INS, option):
5139           # FIXME: are the old os-es, disk sizes, etc. useful?
5140           export_name = export_info.get(constants.INISECT_INS, option)
5141           image = os.path.join(src_path, export_name)
5142           disk_images.append(image)
5143         else:
5144           disk_images.append(False)
5145
5146       self.src_images = disk_images
5147
5148       old_name = export_info.get(constants.INISECT_INS, 'name')
5149       # FIXME: int() here could throw a ValueError on broken exports
5150       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
5151       if self.op.instance_name == old_name:
5152         for idx, nic in enumerate(self.nics):
5153           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
5154             nic_mac_ini = 'nic%d_mac' % idx
5155             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
5156
5157     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
5158     # ip ping checks (we use the same ip that was resolved in ExpandNames)
5159     if self.op.start and not self.op.ip_check:
5160       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
5161                                  " adding an instance in start mode")
5162
5163     if self.op.ip_check:
5164       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
5165         raise errors.OpPrereqError("IP %s of instance %s already in use" %
5166                                    (self.check_ip, self.op.instance_name))
5167
5168     #### mac address generation
5169     # By generating here the mac address both the allocator and the hooks get
5170     # the real final mac address rather than the 'auto' or 'generate' value.
5171     # There is a race condition between the generation and the instance object
5172     # creation, which means that we know the mac is valid now, but we're not
5173     # sure it will be when we actually add the instance. If things go bad
5174     # adding the instance will abort because of a duplicate mac, and the
5175     # creation job will fail.
5176     for nic in self.nics:
5177       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5178         nic.mac = self.cfg.GenerateMAC()
5179
5180     #### allocator run
5181
5182     if self.op.iallocator is not None:
5183       self._RunAllocator()
5184
5185     #### node related checks
5186
5187     # check primary node
5188     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
5189     assert self.pnode is not None, \
5190       "Cannot retrieve locked node %s" % self.op.pnode
5191     if pnode.offline:
5192       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
5193                                  pnode.name)
5194     if pnode.drained:
5195       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
5196                                  pnode.name)
5197
5198     self.secondaries = []
5199
5200     # mirror node verification
5201     if self.op.disk_template in constants.DTS_NET_MIRROR:
5202       if self.op.snode is None:
5203         raise errors.OpPrereqError("The networked disk templates need"
5204                                    " a mirror node")
5205       if self.op.snode == pnode.name:
5206         raise errors.OpPrereqError("The secondary node cannot be"
5207                                    " the primary node.")
5208       _CheckNodeOnline(self, self.op.snode)
5209       _CheckNodeNotDrained(self, self.op.snode)
5210       self.secondaries.append(self.op.snode)
5211
5212     nodenames = [pnode.name] + self.secondaries
5213
5214     req_size = _ComputeDiskSize(self.op.disk_template,
5215                                 self.disks)
5216
5217     # Check lv size requirements
5218     if req_size is not None:
5219       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5220                                          self.op.hypervisor)
5221       for node in nodenames:
5222         info = nodeinfo[node]
5223         info.Raise("Cannot get current information from node %s" % node)
5224         info = info.payload
5225         vg_free = info.get('vg_free', None)
5226         if not isinstance(vg_free, int):
5227           raise errors.OpPrereqError("Can't compute free disk space on"
5228                                      " node %s" % node)
5229         if req_size > vg_free:
5230           raise errors.OpPrereqError("Not enough disk space on target node %s."
5231                                      " %d MB available, %d MB required" %
5232                                      (node, vg_free, req_size))
5233
5234     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
5235
5236     # os verification
5237     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
5238     result.Raise("OS '%s' not in supported os list for primary node %s" %
5239                  (self.op.os_type, pnode.name), prereq=True)
5240
5241     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
5242
5243     # memory check on primary node
5244     if self.op.start:
5245       _CheckNodeFreeMemory(self, self.pnode.name,
5246                            "creating instance %s" % self.op.instance_name,
5247                            self.be_full[constants.BE_MEMORY],
5248                            self.op.hypervisor)
5249
5250     self.dry_run_result = list(nodenames)
5251
5252   def Exec(self, feedback_fn):
5253     """Create and add the instance to the cluster.
5254
5255     """
5256     instance = self.op.instance_name
5257     pnode_name = self.pnode.name
5258
5259     ht_kind = self.op.hypervisor
5260     if ht_kind in constants.HTS_REQ_PORT:
5261       network_port = self.cfg.AllocatePort()
5262     else:
5263       network_port = None
5264
5265     ##if self.op.vnc_bind_address is None:
5266     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
5267
5268     # this is needed because os.path.join does not accept None arguments
5269     if self.op.file_storage_dir is None:
5270       string_file_storage_dir = ""
5271     else:
5272       string_file_storage_dir = self.op.file_storage_dir
5273
5274     # build the full file storage dir path
5275     file_storage_dir = os.path.normpath(os.path.join(
5276                                         self.cfg.GetFileStorageDir(),
5277                                         string_file_storage_dir, instance))
5278
5279
5280     disks = _GenerateDiskTemplate(self,
5281                                   self.op.disk_template,
5282                                   instance, pnode_name,
5283                                   self.secondaries,
5284                                   self.disks,
5285                                   file_storage_dir,
5286                                   self.op.file_driver,
5287                                   0)
5288
5289     iobj = objects.Instance(name=instance, os=self.op.os_type,
5290                             primary_node=pnode_name,
5291                             nics=self.nics, disks=disks,
5292                             disk_template=self.op.disk_template,
5293                             admin_up=False,
5294                             network_port=network_port,
5295                             beparams=self.op.beparams,
5296                             hvparams=self.op.hvparams,
5297                             hypervisor=self.op.hypervisor,
5298                             )
5299
5300     feedback_fn("* creating instance disks...")
5301     try:
5302       _CreateDisks(self, iobj)
5303     except errors.OpExecError:
5304       self.LogWarning("Device creation failed, reverting...")
5305       try:
5306         _RemoveDisks(self, iobj)
5307       finally:
5308         self.cfg.ReleaseDRBDMinors(instance)
5309         raise
5310
5311     feedback_fn("adding instance %s to cluster config" % instance)
5312
5313     self.cfg.AddInstance(iobj)
5314     # Declare that we don't want to remove the instance lock anymore, as we've
5315     # added the instance to the config
5316     del self.remove_locks[locking.LEVEL_INSTANCE]
5317     # Unlock all the nodes
5318     if self.op.mode == constants.INSTANCE_IMPORT:
5319       nodes_keep = [self.op.src_node]
5320       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5321                        if node != self.op.src_node]
5322       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5323       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5324     else:
5325       self.context.glm.release(locking.LEVEL_NODE)
5326       del self.acquired_locks[locking.LEVEL_NODE]
5327
5328     if self.op.wait_for_sync:
5329       disk_abort = not _WaitForSync(self, iobj)
5330     elif iobj.disk_template in constants.DTS_NET_MIRROR:
5331       # make sure the disks are not degraded (still sync-ing is ok)
5332       time.sleep(15)
5333       feedback_fn("* checking mirrors status")
5334       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5335     else:
5336       disk_abort = False
5337
5338     if disk_abort:
5339       _RemoveDisks(self, iobj)
5340       self.cfg.RemoveInstance(iobj.name)
5341       # Make sure the instance lock gets removed
5342       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5343       raise errors.OpExecError("There are some degraded disks for"
5344                                " this instance")
5345
5346     feedback_fn("creating os for instance %s on node %s" %
5347                 (instance, pnode_name))
5348
5349     if iobj.disk_template != constants.DT_DISKLESS:
5350       if self.op.mode == constants.INSTANCE_CREATE:
5351         feedback_fn("* running the instance OS create scripts...")
5352         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5353         result.Raise("Could not add os for instance %s"
5354                      " on node %s" % (instance, pnode_name))
5355
5356       elif self.op.mode == constants.INSTANCE_IMPORT:
5357         feedback_fn("* running the instance OS import scripts...")
5358         src_node = self.op.src_node
5359         src_images = self.src_images
5360         cluster_name = self.cfg.GetClusterName()
5361         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5362                                                          src_node, src_images,
5363                                                          cluster_name)
5364         msg = import_result.fail_msg
5365         if msg:
5366           self.LogWarning("Error while importing the disk images for instance"
5367                           " %s on node %s: %s" % (instance, pnode_name, msg))
5368       else:
5369         # also checked in the prereq part
5370         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5371                                      % self.op.mode)
5372
5373     if self.op.start:
5374       iobj.admin_up = True
5375       self.cfg.Update(iobj)
5376       logging.info("Starting instance %s on node %s", instance, pnode_name)
5377       feedback_fn("* starting instance...")
5378       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5379       result.Raise("Could not start instance")
5380
5381     return list(iobj.all_nodes)
5382
5383
5384 class LUConnectConsole(NoHooksLU):
5385   """Connect to an instance's console.
5386
5387   This is somewhat special in that it returns the command line that
5388   you need to run on the master node in order to connect to the
5389   console.
5390
5391   """
5392   _OP_REQP = ["instance_name"]
5393   REQ_BGL = False
5394
5395   def ExpandNames(self):
5396     self._ExpandAndLockInstance()
5397
5398   def CheckPrereq(self):
5399     """Check prerequisites.
5400
5401     This checks that the instance is in the cluster.
5402
5403     """
5404     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5405     assert self.instance is not None, \
5406       "Cannot retrieve locked instance %s" % self.op.instance_name
5407     _CheckNodeOnline(self, self.instance.primary_node)
5408
5409   def Exec(self, feedback_fn):
5410     """Connect to the console of an instance
5411
5412     """
5413     instance = self.instance
5414     node = instance.primary_node
5415
5416     node_insts = self.rpc.call_instance_list([node],
5417                                              [instance.hypervisor])[node]
5418     node_insts.Raise("Can't get node information from %s" % node)
5419
5420     if instance.name not in node_insts.payload:
5421       raise errors.OpExecError("Instance %s is not running." % instance.name)
5422
5423     logging.debug("Connecting to console of %s on %s", instance.name, node)
5424
5425     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5426     cluster = self.cfg.GetClusterInfo()
5427     # beparams and hvparams are passed separately, to avoid editing the
5428     # instance and then saving the defaults in the instance itself.
5429     hvparams = cluster.FillHV(instance)
5430     beparams = cluster.FillBE(instance)
5431     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5432
5433     # build ssh cmdline
5434     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5435
5436
5437 class LUReplaceDisks(LogicalUnit):
5438   """Replace the disks of an instance.
5439
5440   """
5441   HPATH = "mirrors-replace"
5442   HTYPE = constants.HTYPE_INSTANCE
5443   _OP_REQP = ["instance_name", "mode", "disks"]
5444   REQ_BGL = False
5445
5446   def CheckArguments(self):
5447     if not hasattr(self.op, "remote_node"):
5448       self.op.remote_node = None
5449     if not hasattr(self.op, "iallocator"):
5450       self.op.iallocator = None
5451
5452     TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
5453                                   self.op.iallocator)
5454
5455   def ExpandNames(self):
5456     self._ExpandAndLockInstance()
5457
5458     if self.op.iallocator is not None:
5459       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5460
5461     elif self.op.remote_node is not None:
5462       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5463       if remote_node is None:
5464         raise errors.OpPrereqError("Node '%s' not known" %
5465                                    self.op.remote_node)
5466
5467       self.op.remote_node = remote_node
5468
5469       # Warning: do not remove the locking of the new secondary here
5470       # unless DRBD8.AddChildren is changed to work in parallel;
5471       # currently it doesn't since parallel invocations of
5472       # FindUnusedMinor will conflict
5473       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5474       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5475
5476     else:
5477       self.needed_locks[locking.LEVEL_NODE] = []
5478       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5479
5480     self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
5481                                    self.op.iallocator, self.op.remote_node,
5482                                    self.op.disks)
5483
5484     self.tasklets = [self.replacer]
5485
5486   def DeclareLocks(self, level):
5487     # If we're not already locking all nodes in the set we have to declare the
5488     # instance's primary/secondary nodes.
5489     if (level == locking.LEVEL_NODE and
5490         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5491       self._LockInstancesNodes()
5492
5493   def BuildHooksEnv(self):
5494     """Build hooks env.
5495
5496     This runs on the master, the primary and all the secondaries.
5497
5498     """
5499     instance = self.replacer.instance
5500     env = {
5501       "MODE": self.op.mode,
5502       "NEW_SECONDARY": self.op.remote_node,
5503       "OLD_SECONDARY": instance.secondary_nodes[0],
5504       }
5505     env.update(_BuildInstanceHookEnvByObject(self, instance))
5506     nl = [
5507       self.cfg.GetMasterNode(),
5508       instance.primary_node,
5509       ]
5510     if self.op.remote_node is not None:
5511       nl.append(self.op.remote_node)
5512     return env, nl, nl
5513
5514
5515 class LUEvacuateNode(LogicalUnit):
5516   """Relocate the secondary instances from a node.
5517
5518   """
5519   HPATH = "node-evacuate"
5520   HTYPE = constants.HTYPE_NODE
5521   _OP_REQP = ["node_name"]
5522   REQ_BGL = False
5523
5524   def CheckArguments(self):
5525     if not hasattr(self.op, "remote_node"):
5526       self.op.remote_node = None
5527     if not hasattr(self.op, "iallocator"):
5528       self.op.iallocator = None
5529
5530     TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
5531                                   self.op.remote_node,
5532                                   self.op.iallocator)
5533
5534   def ExpandNames(self):
5535     self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
5536     if self.op.node_name is None:
5537       raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name)
5538
5539     self.needed_locks = {}
5540
5541     # Declare node locks
5542     if self.op.iallocator is not None:
5543       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5544
5545     elif self.op.remote_node is not None:
5546       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5547       if remote_node is None:
5548         raise errors.OpPrereqError("Node '%s' not known" %
5549                                    self.op.remote_node)
5550
5551       self.op.remote_node = remote_node
5552
5553       # Warning: do not remove the locking of the new secondary here
5554       # unless DRBD8.AddChildren is changed to work in parallel;
5555       # currently it doesn't since parallel invocations of
5556       # FindUnusedMinor will conflict
5557       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5558       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5559
5560     else:
5561       raise errors.OpPrereqError("Invalid parameters")
5562
5563     # Create tasklets for replacing disks for all secondary instances on this
5564     # node
5565     names = []
5566     tasklets = []
5567
5568     for inst in _GetNodeSecondaryInstances(self.cfg, self.op.node_name):
5569       logging.debug("Replacing disks for instance %s", inst.name)
5570       names.append(inst.name)
5571
5572       replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
5573                                 self.op.iallocator, self.op.remote_node, [])
5574       tasklets.append(replacer)
5575
5576     self.tasklets = tasklets
5577     self.instance_names = names
5578
5579     # Declare instance locks
5580     self.needed_locks[locking.LEVEL_INSTANCE] = self.instance_names
5581
5582   def DeclareLocks(self, level):
5583     # If we're not already locking all nodes in the set we have to declare the
5584     # instance's primary/secondary nodes.
5585     if (level == locking.LEVEL_NODE and
5586         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5587       self._LockInstancesNodes()
5588
5589   def BuildHooksEnv(self):
5590     """Build hooks env.
5591
5592     This runs on the master, the primary and all the secondaries.
5593
5594     """
5595     env = {
5596       "NODE_NAME": self.op.node_name,
5597       }
5598
5599     nl = [self.cfg.GetMasterNode()]
5600
5601     if self.op.remote_node is not None:
5602       env["NEW_SECONDARY"] = self.op.remote_node
5603       nl.append(self.op.remote_node)
5604
5605     return (env, nl, nl)
5606
5607
5608 class TLReplaceDisks(Tasklet):
5609   """Replaces disks for an instance.
5610
5611   Note: Locking is not within the scope of this class.
5612
5613   """
5614   def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
5615                disks):
5616     """Initializes this class.
5617
5618     """
5619     Tasklet.__init__(self, lu)
5620
5621     # Parameters
5622     self.instance_name = instance_name
5623     self.mode = mode
5624     self.iallocator_name = iallocator_name
5625     self.remote_node = remote_node
5626     self.disks = disks
5627
5628     # Runtime data
5629     self.instance = None
5630     self.new_node = None
5631     self.target_node = None
5632     self.other_node = None
5633     self.remote_node_info = None
5634     self.node_secondary_ip = None
5635
5636   @staticmethod
5637   def CheckArguments(mode, remote_node, iallocator):
5638     """Helper function for users of this class.
5639
5640     """
5641     # check for valid parameter combination
5642     cnt = [remote_node, iallocator].count(None)
5643     if mode == constants.REPLACE_DISK_CHG:
5644       if cnt == 2:
5645         raise errors.OpPrereqError("When changing the secondary either an"
5646                                    " iallocator script must be used or the"
5647                                    " new node given")
5648       elif cnt == 0:
5649         raise errors.OpPrereqError("Give either the iallocator or the new"
5650                                    " secondary, not both")
5651     else: # not replacing the secondary
5652       if cnt != 2:
5653         raise errors.OpPrereqError("The iallocator and new node options can"
5654                                    " be used only when changing the"
5655                                    " secondary node")
5656
5657   @staticmethod
5658   def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
5659     """Compute a new secondary node using an IAllocator.
5660
5661     """
5662     ial = IAllocator(lu.cfg, lu.rpc,
5663                      mode=constants.IALLOCATOR_MODE_RELOC,
5664                      name=instance_name,
5665                      relocate_from=relocate_from)
5666
5667     ial.Run(iallocator_name)
5668
5669     if not ial.success:
5670       raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
5671                                  " %s" % (iallocator_name, ial.info))
5672
5673     if len(ial.nodes) != ial.required_nodes:
5674       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5675                                  " of nodes (%s), required %s" %
5676                                  (len(ial.nodes), ial.required_nodes))
5677
5678     remote_node_name = ial.nodes[0]
5679
5680     lu.LogInfo("Selected new secondary for instance '%s': %s",
5681                instance_name, remote_node_name)
5682
5683     return remote_node_name
5684
5685   def CheckPrereq(self):
5686     """Check prerequisites.
5687
5688     This checks that the instance is in the cluster.
5689
5690     """
5691     self.instance = self.cfg.GetInstanceInfo(self.instance_name)
5692     assert self.instance is not None, \
5693       "Cannot retrieve locked instance %s" % self.instance_name
5694
5695     if self.instance.disk_template != constants.DT_DRBD8:
5696       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5697                                  " instances")
5698
5699     if len(self.instance.secondary_nodes) != 1:
5700       raise errors.OpPrereqError("The instance has a strange layout,"
5701                                  " expected one secondary but found %d" %
5702                                  len(self.instance.secondary_nodes))
5703
5704     secondary_node = self.instance.secondary_nodes[0]
5705
5706     if self.iallocator_name is None:
5707       remote_node = self.remote_node
5708     else:
5709       remote_node = self._RunAllocator(self.lu, self.iallocator_name,
5710                                        self.instance.name, secondary_node)
5711
5712     if remote_node is not None:
5713       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5714       assert self.remote_node_info is not None, \
5715         "Cannot retrieve locked node %s" % remote_node
5716     else:
5717       self.remote_node_info = None
5718
5719     if remote_node == self.instance.primary_node:
5720       raise errors.OpPrereqError("The specified node is the primary node of"
5721                                  " the instance.")
5722
5723     if remote_node == secondary_node:
5724       raise errors.OpPrereqError("The specified node is already the"
5725                                  " secondary node of the instance.")
5726
5727     if self.mode == constants.REPLACE_DISK_PRI:
5728       self.target_node = self.instance.primary_node
5729       self.other_node = secondary_node
5730       check_nodes = [self.target_node, self.other_node]
5731
5732     elif self.mode == constants.REPLACE_DISK_SEC:
5733       self.target_node = secondary_node
5734       self.other_node = self.instance.primary_node
5735       check_nodes = [self.target_node, self.other_node]
5736
5737     elif self.mode == constants.REPLACE_DISK_CHG:
5738       self.new_node = remote_node
5739       self.other_node = self.instance.primary_node
5740       self.target_node = secondary_node
5741       check_nodes = [self.new_node, self.other_node]
5742
5743       _CheckNodeNotDrained(self.lu, remote_node)
5744
5745     else:
5746       raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
5747                                    self.mode)
5748
5749     for node in check_nodes:
5750       _CheckNodeOnline(self.lu, node)
5751
5752     # If not specified all disks should be replaced
5753     if not self.disks:
5754       self.disks = range(len(self.instance.disks))
5755
5756     # Check whether disks are valid
5757     for disk_idx in self.disks:
5758       self.instance.FindDisk(disk_idx)
5759
5760     # Get secondary node IP addresses
5761     node_2nd_ip = {}
5762
5763     for node_name in [self.target_node, self.other_node, self.new_node]:
5764       if node_name is not None:
5765         node_2nd_ip[node_name] = self.cfg.GetNodeInfo(node_name).secondary_ip
5766
5767     self.node_secondary_ip = node_2nd_ip
5768
5769   def Exec(self, feedback_fn):
5770     """Execute disk replacement.
5771
5772     This dispatches the disk replacement to the appropriate handler.
5773
5774     """
5775     feedback_fn("Replacing disks for %s" % self.instance.name)
5776
5777     activate_disks = (not self.instance.admin_up)
5778
5779     # Activate the instance disks if we're replacing them on a down instance
5780     if activate_disks:
5781       _StartInstanceDisks(self.lu, self.instance, True)
5782
5783     try:
5784       if self.mode == constants.REPLACE_DISK_CHG:
5785         return self._ExecDrbd8Secondary()
5786       else:
5787         return self._ExecDrbd8DiskOnly()
5788
5789     finally:
5790       # Deactivate the instance disks if we're replacing them on a down instance
5791       if activate_disks:
5792         _SafeShutdownInstanceDisks(self.lu, self.instance)
5793
5794   def _CheckVolumeGroup(self, nodes):
5795     self.lu.LogInfo("Checking volume groups")
5796
5797     vgname = self.cfg.GetVGName()
5798
5799     # Make sure volume group exists on all involved nodes
5800     results = self.rpc.call_vg_list(nodes)
5801     if not results:
5802       raise errors.OpExecError("Can't list volume groups on the nodes")
5803
5804     for node in nodes:
5805       res = results[node]
5806       res.Raise("Error checking node %s" % node)
5807       if vgname not in res.payload:
5808         raise errors.OpExecError("Volume group '%s' not found on node %s" %
5809                                  (vgname, node))
5810
5811   def _CheckDisksExistence(self, nodes):
5812     # Check disk existence
5813     for idx, dev in enumerate(self.instance.disks):
5814       if idx not in self.disks:
5815         continue
5816
5817       for node in nodes:
5818         self.lu.LogInfo("Checking disk/%d on %s" % (idx, node))
5819         self.cfg.SetDiskID(dev, node)
5820
5821         result = self.rpc.call_blockdev_find(node, dev)
5822
5823         msg = result.fail_msg
5824         if msg or not result.payload:
5825           if not msg:
5826             msg = "disk not found"
5827           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5828                                    (idx, node, msg))
5829
5830   def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
5831     for idx, dev in enumerate(self.instance.disks):
5832       if idx not in self.disks:
5833         continue
5834
5835       self.lu.LogInfo("Checking disk/%d consistency on node %s" %
5836                       (idx, node_name))
5837
5838       if not _CheckDiskConsistency(self.lu, dev, node_name, on_primary,
5839                                    ldisk=ldisk):
5840         raise errors.OpExecError("Node %s has degraded storage, unsafe to"
5841                                  " replace disks for instance %s" %
5842                                  (node_name, self.instance.name))
5843
5844   def _CreateNewStorage(self, node_name):
5845     vgname = self.cfg.GetVGName()
5846     iv_names = {}
5847
5848     for idx, dev in enumerate(self.instance.disks):
5849       if idx not in self.disks:
5850         continue
5851
5852       self.lu.LogInfo("Adding storage on %s for disk/%d" % (node_name, idx))
5853
5854       self.cfg.SetDiskID(dev, node_name)
5855
5856       lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
5857       names = _GenerateUniqueNames(self.lu, lv_names)
5858
5859       lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
5860                              logical_id=(vgname, names[0]))
5861       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5862                              logical_id=(vgname, names[1]))
5863
5864       new_lvs = [lv_data, lv_meta]
5865       old_lvs = dev.children
5866       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5867
5868       # we pass force_create=True to force the LVM creation
5869       for new_lv in new_lvs:
5870         _CreateBlockDev(self.lu, node_name, self.instance, new_lv, True,
5871                         _GetInstanceInfoText(self.instance), False)
5872
5873     return iv_names
5874
5875   def _CheckDevices(self, node_name, iv_names):
5876     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5877       self.cfg.SetDiskID(dev, node_name)
5878
5879       result = self.rpc.call_blockdev_find(node_name, dev)
5880
5881       msg = result.fail_msg
5882       if msg or not result.payload:
5883         if not msg:
5884           msg = "disk not found"
5885         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5886                                  (name, msg))
5887
5888       if result.payload[5]:
5889         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5890
5891   def _RemoveOldStorage(self, node_name, iv_names):
5892     for name, (dev, old_lvs, _) in iv_names.iteritems():
5893       self.lu.LogInfo("Remove logical volumes for %s" % name)
5894
5895       for lv in old_lvs:
5896         self.cfg.SetDiskID(lv, node_name)
5897
5898         msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
5899         if msg:
5900           self.lu.LogWarning("Can't remove old LV: %s" % msg,
5901                              hint="remove unused LVs manually")
5902
5903   def _ExecDrbd8DiskOnly(self):
5904     """Replace a disk on the primary or secondary for DRBD 8.
5905
5906     The algorithm for replace is quite complicated:
5907
5908       1. for each disk to be replaced:
5909
5910         1. create new LVs on the target node with unique names
5911         1. detach old LVs from the drbd device
5912         1. rename old LVs to name_replaced.<time_t>
5913         1. rename new LVs to old LVs
5914         1. attach the new LVs (with the old names now) to the drbd device
5915
5916       1. wait for sync across all devices
5917
5918       1. for each modified disk:
5919
5920         1. remove old LVs (which have the name name_replaces.<time_t>)
5921
5922     Failures are not very well handled.
5923
5924     """
5925     steps_total = 6
5926
5927     # Step: check device activation
5928     self.lu.LogStep(1, steps_total, "Check device existence")
5929     self._CheckDisksExistence([self.other_node, self.target_node])
5930     self._CheckVolumeGroup([self.target_node, self.other_node])
5931
5932     # Step: check other node consistency
5933     self.lu.LogStep(2, steps_total, "Check peer consistency")
5934     self._CheckDisksConsistency(self.other_node,
5935                                 self.other_node == self.instance.primary_node,
5936                                 False)
5937
5938     # Step: create new storage
5939     self.lu.LogStep(3, steps_total, "Allocate new storage")
5940     iv_names = self._CreateNewStorage(self.target_node)
5941
5942     # Step: for each lv, detach+rename*2+attach
5943     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
5944     for dev, old_lvs, new_lvs in iv_names.itervalues():
5945       self.lu.LogInfo("Detaching %s drbd from local storage" % dev.iv_name)
5946
5947       result = self.rpc.call_blockdev_removechildren(self.target_node, dev, old_lvs)
5948       result.Raise("Can't detach drbd from local storage on node"
5949                    " %s for device %s" % (self.target_node, dev.iv_name))
5950       #dev.children = []
5951       #cfg.Update(instance)
5952
5953       # ok, we created the new LVs, so now we know we have the needed
5954       # storage; as such, we proceed on the target node to rename
5955       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5956       # using the assumption that logical_id == physical_id (which in
5957       # turn is the unique_id on that node)
5958
5959       # FIXME(iustin): use a better name for the replaced LVs
5960       temp_suffix = int(time.time())
5961       ren_fn = lambda d, suff: (d.physical_id[0],
5962                                 d.physical_id[1] + "_replaced-%s" % suff)
5963
5964       # Build the rename list based on what LVs exist on the node
5965       rename_old_to_new = []
5966       for to_ren in old_lvs:
5967         result = self.rpc.call_blockdev_find(self.target_node, to_ren)
5968         if not result.fail_msg and result.payload:
5969           # device exists
5970           rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
5971
5972       self.lu.LogInfo("Renaming the old LVs on the target node")
5973       result = self.rpc.call_blockdev_rename(self.target_node, rename_old_to_new)
5974       result.Raise("Can't rename old LVs on node %s" % self.target_node)
5975
5976       # Now we rename the new LVs to the old LVs
5977       self.lu.LogInfo("Renaming the new LVs on the target node")
5978       rename_new_to_old = [(new, old.physical_id)
5979                            for old, new in zip(old_lvs, new_lvs)]
5980       result = self.rpc.call_blockdev_rename(self.target_node, rename_new_to_old)
5981       result.Raise("Can't rename new LVs on node %s" % self.target_node)
5982
5983       for old, new in zip(old_lvs, new_lvs):
5984         new.logical_id = old.logical_id
5985         self.cfg.SetDiskID(new, self.target_node)
5986
5987       for disk in old_lvs:
5988         disk.logical_id = ren_fn(disk, temp_suffix)
5989         self.cfg.SetDiskID(disk, self.target_node)
5990
5991       # Now that the new lvs have the old name, we can add them to the device
5992       self.lu.LogInfo("Adding new mirror component on %s" % self.target_node)
5993       result = self.rpc.call_blockdev_addchildren(self.target_node, dev, new_lvs)
5994       msg = result.fail_msg
5995       if msg:
5996         for new_lv in new_lvs:
5997           msg2 = self.rpc.call_blockdev_remove(self.target_node, new_lv).fail_msg
5998           if msg2:
5999             self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
6000                                hint=("cleanup manually the unused logical"
6001                                      "volumes"))
6002         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
6003
6004       dev.children = new_lvs
6005
6006       self.cfg.Update(self.instance)
6007
6008     # Wait for sync
6009     # This can fail as the old devices are degraded and _WaitForSync
6010     # does a combined result over all disks, so we don't check its return value
6011     self.lu.LogStep(5, steps_total, "Sync devices")
6012     _WaitForSync(self.lu, self.instance, unlock=True)
6013
6014     # Check all devices manually
6015     self._CheckDevices(self.instance.primary_node, iv_names)
6016
6017     # Step: remove old storage
6018     self.lu.LogStep(6, steps_total, "Removing old storage")
6019     self._RemoveOldStorage(self.target_node, iv_names)
6020
6021   def _ExecDrbd8Secondary(self):
6022     """Replace the secondary node for DRBD 8.
6023
6024     The algorithm for replace is quite complicated:
6025       - for all disks of the instance:
6026         - create new LVs on the new node with same names
6027         - shutdown the drbd device on the old secondary
6028         - disconnect the drbd network on the primary
6029         - create the drbd device on the new secondary
6030         - network attach the drbd on the primary, using an artifice:
6031           the drbd code for Attach() will connect to the network if it
6032           finds a device which is connected to the good local disks but
6033           not network enabled
6034       - wait for sync across all devices
6035       - remove all disks from the old secondary
6036
6037     Failures are not very well handled.
6038
6039     """
6040     steps_total = 6
6041
6042     # Step: check device activation
6043     self.lu.LogStep(1, steps_total, "Check device existence")
6044     self._CheckDisksExistence([self.instance.primary_node])
6045     self._CheckVolumeGroup([self.instance.primary_node])
6046
6047     # Step: check other node consistency
6048     self.lu.LogStep(2, steps_total, "Check peer consistency")
6049     self._CheckDisksConsistency(self.instance.primary_node, True, True)
6050
6051     # Step: create new storage
6052     self.lu.LogStep(3, steps_total, "Allocate new storage")
6053     for idx, dev in enumerate(self.instance.disks):
6054       self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
6055                       (self.new_node, idx))
6056       # we pass force_create=True to force LVM creation
6057       for new_lv in dev.children:
6058         _CreateBlockDev(self.lu, self.new_node, self.instance, new_lv, True,
6059                         _GetInstanceInfoText(self.instance), False)
6060
6061     # Step 4: dbrd minors and drbd setups changes
6062     # after this, we must manually remove the drbd minors on both the
6063     # error and the success paths
6064     self.lu.LogStep(4, steps_total, "Changing drbd configuration")
6065     minors = self.cfg.AllocateDRBDMinor([self.new_node for dev in self.instance.disks],
6066                                         self.instance.name)
6067     logging.debug("Allocated minors %r" % (minors,))
6068
6069     iv_names = {}
6070     for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
6071       self.lu.LogInfo("activating a new drbd on %s for disk/%d" % (self.new_node, idx))
6072       # create new devices on new_node; note that we create two IDs:
6073       # one without port, so the drbd will be activated without
6074       # networking information on the new node at this stage, and one
6075       # with network, for the latter activation in step 4
6076       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
6077       if self.instance.primary_node == o_node1:
6078         p_minor = o_minor1
6079       else:
6080         p_minor = o_minor2
6081
6082       new_alone_id = (self.instance.primary_node, self.new_node, None, p_minor, new_minor, o_secret)
6083       new_net_id = (self.instance.primary_node, self.new_node, o_port, p_minor, new_minor, o_secret)
6084
6085       iv_names[idx] = (dev, dev.children, new_net_id)
6086       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
6087                     new_net_id)
6088       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
6089                               logical_id=new_alone_id,
6090                               children=dev.children,
6091                               size=dev.size)
6092       try:
6093         _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd,
6094                               _GetInstanceInfoText(self.instance), False)
6095       except errors.GenericError:
6096         self.cfg.ReleaseDRBDMinors(self.instance.name)
6097         raise
6098
6099     # We have new devices, shutdown the drbd on the old secondary
6100     for idx, dev in enumerate(self.instance.disks):
6101       self.lu.LogInfo("Shutting down drbd for disk/%d on old node" % idx)
6102       self.cfg.SetDiskID(dev, self.target_node)
6103       msg = self.rpc.call_blockdev_shutdown(self.target_node, dev).fail_msg
6104       if msg:
6105         self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
6106                            "node: %s" % (idx, msg),
6107                            hint=("Please cleanup this device manually as"
6108                                  " soon as possible"))
6109
6110     self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
6111     result = self.rpc.call_drbd_disconnect_net([self.instance.primary_node], self.node_secondary_ip,
6112                                                self.instance.disks)[self.instance.primary_node]
6113
6114     msg = result.fail_msg
6115     if msg:
6116       # detaches didn't succeed (unlikely)
6117       self.cfg.ReleaseDRBDMinors(self.instance.name)
6118       raise errors.OpExecError("Can't detach the disks from the network on"
6119                                " old node: %s" % (msg,))
6120
6121     # if we managed to detach at least one, we update all the disks of
6122     # the instance to point to the new secondary
6123     self.lu.LogInfo("Updating instance configuration")
6124     for dev, _, new_logical_id in iv_names.itervalues():
6125       dev.logical_id = new_logical_id
6126       self.cfg.SetDiskID(dev, self.instance.primary_node)
6127
6128     self.cfg.Update(self.instance)
6129
6130     # and now perform the drbd attach
6131     self.lu.LogInfo("Attaching primary drbds to new secondary"
6132                     " (standalone => connected)")
6133     result = self.rpc.call_drbd_attach_net([self.instance.primary_node, self.new_node], self.node_secondary_ip,
6134                                            self.instance.disks, self.instance.name,
6135                                            False)
6136     for to_node, to_result in result.items():
6137       msg = to_result.fail_msg
6138       if msg:
6139         self.lu.LogWarning("Can't attach drbd disks on node %s: %s", to_node, msg,
6140                            hint=("please do a gnt-instance info to see the"
6141                                  " status of disks"))
6142
6143     # Wait for sync
6144     # This can fail as the old devices are degraded and _WaitForSync
6145     # does a combined result over all disks, so we don't check its return value
6146     self.lu.LogStep(5, steps_total, "Sync devices")
6147     _WaitForSync(self.lu, self.instance, unlock=True)
6148
6149     # Check all devices manually
6150     self._CheckDevices(self.instance.primary_node, iv_names)
6151
6152     # Step: remove old storage
6153     self.lu.LogStep(6, steps_total, "Removing old storage")
6154     self._RemoveOldStorage(self.target_node, iv_names)
6155
6156
6157 class LUGrowDisk(LogicalUnit):
6158   """Grow a disk of an instance.
6159
6160   """
6161   HPATH = "disk-grow"
6162   HTYPE = constants.HTYPE_INSTANCE
6163   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
6164   REQ_BGL = False
6165
6166   def ExpandNames(self):
6167     self._ExpandAndLockInstance()
6168     self.needed_locks[locking.LEVEL_NODE] = []
6169     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6170
6171   def DeclareLocks(self, level):
6172     if level == locking.LEVEL_NODE:
6173       self._LockInstancesNodes()
6174
6175   def BuildHooksEnv(self):
6176     """Build hooks env.
6177
6178     This runs on the master, the primary and all the secondaries.
6179
6180     """
6181     env = {
6182       "DISK": self.op.disk,
6183       "AMOUNT": self.op.amount,
6184       }
6185     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6186     nl = [
6187       self.cfg.GetMasterNode(),
6188       self.instance.primary_node,
6189       ]
6190     return env, nl, nl
6191
6192   def CheckPrereq(self):
6193     """Check prerequisites.
6194
6195     This checks that the instance is in the cluster.
6196
6197     """
6198     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6199     assert instance is not None, \
6200       "Cannot retrieve locked instance %s" % self.op.instance_name
6201     nodenames = list(instance.all_nodes)
6202     for node in nodenames:
6203       _CheckNodeOnline(self, node)
6204
6205
6206     self.instance = instance
6207
6208     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
6209       raise errors.OpPrereqError("Instance's disk layout does not support"
6210                                  " growing.")
6211
6212     self.disk = instance.FindDisk(self.op.disk)
6213
6214     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
6215                                        instance.hypervisor)
6216     for node in nodenames:
6217       info = nodeinfo[node]
6218       info.Raise("Cannot get current information from node %s" % node)
6219       vg_free = info.payload.get('vg_free', None)
6220       if not isinstance(vg_free, int):
6221         raise errors.OpPrereqError("Can't compute free disk space on"
6222                                    " node %s" % node)
6223       if self.op.amount > vg_free:
6224         raise errors.OpPrereqError("Not enough disk space on target node %s:"
6225                                    " %d MiB available, %d MiB required" %
6226                                    (node, vg_free, self.op.amount))
6227
6228   def Exec(self, feedback_fn):
6229     """Execute disk grow.
6230
6231     """
6232     instance = self.instance
6233     disk = self.disk
6234     for node in instance.all_nodes:
6235       self.cfg.SetDiskID(disk, node)
6236       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
6237       result.Raise("Grow request failed to node %s" % node)
6238     disk.RecordGrow(self.op.amount)
6239     self.cfg.Update(instance)
6240     if self.op.wait_for_sync:
6241       disk_abort = not _WaitForSync(self, instance)
6242       if disk_abort:
6243         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
6244                              " status.\nPlease check the instance.")
6245
6246
6247 class LUQueryInstanceData(NoHooksLU):
6248   """Query runtime instance data.
6249
6250   """
6251   _OP_REQP = ["instances", "static"]
6252   REQ_BGL = False
6253
6254   def ExpandNames(self):
6255     self.needed_locks = {}
6256     self.share_locks = dict.fromkeys(locking.LEVELS, 1)
6257
6258     if not isinstance(self.op.instances, list):
6259       raise errors.OpPrereqError("Invalid argument type 'instances'")
6260
6261     if self.op.instances:
6262       self.wanted_names = []
6263       for name in self.op.instances:
6264         full_name = self.cfg.ExpandInstanceName(name)
6265         if full_name is None:
6266           raise errors.OpPrereqError("Instance '%s' not known" % name)
6267         self.wanted_names.append(full_name)
6268       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
6269     else:
6270       self.wanted_names = None
6271       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
6272
6273     self.needed_locks[locking.LEVEL_NODE] = []
6274     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6275
6276   def DeclareLocks(self, level):
6277     if level == locking.LEVEL_NODE:
6278       self._LockInstancesNodes()
6279
6280   def CheckPrereq(self):
6281     """Check prerequisites.
6282
6283     This only checks the optional instance list against the existing names.
6284
6285     """
6286     if self.wanted_names is None:
6287       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
6288
6289     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
6290                              in self.wanted_names]
6291     return
6292
6293   def _ComputeDiskStatus(self, instance, snode, dev):
6294     """Compute block device status.
6295
6296     """
6297     static = self.op.static
6298     if not static:
6299       self.cfg.SetDiskID(dev, instance.primary_node)
6300       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
6301       if dev_pstatus.offline:
6302         dev_pstatus = None
6303       else:
6304         dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
6305         dev_pstatus = dev_pstatus.payload
6306     else:
6307       dev_pstatus = None
6308
6309     if dev.dev_type in constants.LDS_DRBD:
6310       # we change the snode then (otherwise we use the one passed in)
6311       if dev.logical_id[0] == instance.primary_node:
6312         snode = dev.logical_id[1]
6313       else:
6314         snode = dev.logical_id[0]
6315
6316     if snode and not static:
6317       self.cfg.SetDiskID(dev, snode)
6318       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
6319       if dev_sstatus.offline:
6320         dev_sstatus = None
6321       else:
6322         dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
6323         dev_sstatus = dev_sstatus.payload
6324     else:
6325       dev_sstatus = None
6326
6327     if dev.children:
6328       dev_children = [self._ComputeDiskStatus(instance, snode, child)
6329                       for child in dev.children]
6330     else:
6331       dev_children = []
6332
6333     data = {
6334       "iv_name": dev.iv_name,
6335       "dev_type": dev.dev_type,
6336       "logical_id": dev.logical_id,
6337       "physical_id": dev.physical_id,
6338       "pstatus": dev_pstatus,
6339       "sstatus": dev_sstatus,
6340       "children": dev_children,
6341       "mode": dev.mode,
6342       "size": dev.size,
6343       }
6344
6345     return data
6346
6347   def Exec(self, feedback_fn):
6348     """Gather and return data"""
6349     result = {}
6350
6351     cluster = self.cfg.GetClusterInfo()
6352
6353     for instance in self.wanted_instances:
6354       if not self.op.static:
6355         remote_info = self.rpc.call_instance_info(instance.primary_node,
6356                                                   instance.name,
6357                                                   instance.hypervisor)
6358         remote_info.Raise("Error checking node %s" % instance.primary_node)
6359         remote_info = remote_info.payload
6360         if remote_info and "state" in remote_info:
6361           remote_state = "up"
6362         else:
6363           remote_state = "down"
6364       else:
6365         remote_state = None
6366       if instance.admin_up:
6367         config_state = "up"
6368       else:
6369         config_state = "down"
6370
6371       disks = [self._ComputeDiskStatus(instance, None, device)
6372                for device in instance.disks]
6373
6374       idict = {
6375         "name": instance.name,
6376         "config_state": config_state,
6377         "run_state": remote_state,
6378         "pnode": instance.primary_node,
6379         "snodes": instance.secondary_nodes,
6380         "os": instance.os,
6381         # this happens to be the same format used for hooks
6382         "nics": _NICListToTuple(self, instance.nics),
6383         "disks": disks,
6384         "hypervisor": instance.hypervisor,
6385         "network_port": instance.network_port,
6386         "hv_instance": instance.hvparams,
6387         "hv_actual": cluster.FillHV(instance),
6388         "be_instance": instance.beparams,
6389         "be_actual": cluster.FillBE(instance),
6390         }
6391
6392       result[instance.name] = idict
6393
6394     return result
6395
6396
6397 class LUSetInstanceParams(LogicalUnit):
6398   """Modifies an instances's parameters.
6399
6400   """
6401   HPATH = "instance-modify"
6402   HTYPE = constants.HTYPE_INSTANCE
6403   _OP_REQP = ["instance_name"]
6404   REQ_BGL = False
6405
6406   def CheckArguments(self):
6407     if not hasattr(self.op, 'nics'):
6408       self.op.nics = []
6409     if not hasattr(self.op, 'disks'):
6410       self.op.disks = []
6411     if not hasattr(self.op, 'beparams'):
6412       self.op.beparams = {}
6413     if not hasattr(self.op, 'hvparams'):
6414       self.op.hvparams = {}
6415     self.op.force = getattr(self.op, "force", False)
6416     if not (self.op.nics or self.op.disks or
6417             self.op.hvparams or self.op.beparams):
6418       raise errors.OpPrereqError("No changes submitted")
6419
6420     # Disk validation
6421     disk_addremove = 0
6422     for disk_op, disk_dict in self.op.disks:
6423       if disk_op == constants.DDM_REMOVE:
6424         disk_addremove += 1
6425         continue
6426       elif disk_op == constants.DDM_ADD:
6427         disk_addremove += 1
6428       else:
6429         if not isinstance(disk_op, int):
6430           raise errors.OpPrereqError("Invalid disk index")
6431         if not isinstance(disk_dict, dict):
6432           msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
6433           raise errors.OpPrereqError(msg)
6434
6435       if disk_op == constants.DDM_ADD:
6436         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
6437         if mode not in constants.DISK_ACCESS_SET:
6438           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
6439         size = disk_dict.get('size', None)
6440         if size is None:
6441           raise errors.OpPrereqError("Required disk parameter size missing")
6442         try:
6443           size = int(size)
6444         except ValueError, err:
6445           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
6446                                      str(err))
6447         disk_dict['size'] = size
6448       else:
6449         # modification of disk
6450         if 'size' in disk_dict:
6451           raise errors.OpPrereqError("Disk size change not possible, use"
6452                                      " grow-disk")
6453
6454     if disk_addremove > 1:
6455       raise errors.OpPrereqError("Only one disk add or remove operation"
6456                                  " supported at a time")
6457
6458     # NIC validation
6459     nic_addremove = 0
6460     for nic_op, nic_dict in self.op.nics:
6461       if nic_op == constants.DDM_REMOVE:
6462         nic_addremove += 1
6463         continue
6464       elif nic_op == constants.DDM_ADD:
6465         nic_addremove += 1
6466       else:
6467         if not isinstance(nic_op, int):
6468           raise errors.OpPrereqError("Invalid nic index")
6469         if not isinstance(nic_dict, dict):
6470           msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
6471           raise errors.OpPrereqError(msg)
6472
6473       # nic_dict should be a dict
6474       nic_ip = nic_dict.get('ip', None)
6475       if nic_ip is not None:
6476         if nic_ip.lower() == constants.VALUE_NONE:
6477           nic_dict['ip'] = None
6478         else:
6479           if not utils.IsValidIP(nic_ip):
6480             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
6481
6482       nic_bridge = nic_dict.get('bridge', None)
6483       nic_link = nic_dict.get('link', None)
6484       if nic_bridge and nic_link:
6485         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
6486                                    " at the same time")
6487       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
6488         nic_dict['bridge'] = None
6489       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
6490         nic_dict['link'] = None
6491
6492       if nic_op == constants.DDM_ADD:
6493         nic_mac = nic_dict.get('mac', None)
6494         if nic_mac is None:
6495           nic_dict['mac'] = constants.VALUE_AUTO
6496
6497       if 'mac' in nic_dict:
6498         nic_mac = nic_dict['mac']
6499         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6500           if not utils.IsValidMac(nic_mac):
6501             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6502         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6503           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6504                                      " modifying an existing nic")
6505
6506     if nic_addremove > 1:
6507       raise errors.OpPrereqError("Only one NIC add or remove operation"
6508                                  " supported at a time")
6509
6510   def ExpandNames(self):
6511     self._ExpandAndLockInstance()
6512     self.needed_locks[locking.LEVEL_NODE] = []
6513     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6514
6515   def DeclareLocks(self, level):
6516     if level == locking.LEVEL_NODE:
6517       self._LockInstancesNodes()
6518
6519   def BuildHooksEnv(self):
6520     """Build hooks env.
6521
6522     This runs on the master, primary and secondaries.
6523
6524     """
6525     args = dict()
6526     if constants.BE_MEMORY in self.be_new:
6527       args['memory'] = self.be_new[constants.BE_MEMORY]
6528     if constants.BE_VCPUS in self.be_new:
6529       args['vcpus'] = self.be_new[constants.BE_VCPUS]
6530     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6531     # information at all.
6532     if self.op.nics:
6533       args['nics'] = []
6534       nic_override = dict(self.op.nics)
6535       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6536       for idx, nic in enumerate(self.instance.nics):
6537         if idx in nic_override:
6538           this_nic_override = nic_override[idx]
6539         else:
6540           this_nic_override = {}
6541         if 'ip' in this_nic_override:
6542           ip = this_nic_override['ip']
6543         else:
6544           ip = nic.ip
6545         if 'mac' in this_nic_override:
6546           mac = this_nic_override['mac']
6547         else:
6548           mac = nic.mac
6549         if idx in self.nic_pnew:
6550           nicparams = self.nic_pnew[idx]
6551         else:
6552           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6553         mode = nicparams[constants.NIC_MODE]
6554         link = nicparams[constants.NIC_LINK]
6555         args['nics'].append((ip, mac, mode, link))
6556       if constants.DDM_ADD in nic_override:
6557         ip = nic_override[constants.DDM_ADD].get('ip', None)
6558         mac = nic_override[constants.DDM_ADD]['mac']
6559         nicparams = self.nic_pnew[constants.DDM_ADD]
6560         mode = nicparams[constants.NIC_MODE]
6561         link = nicparams[constants.NIC_LINK]
6562         args['nics'].append((ip, mac, mode, link))
6563       elif constants.DDM_REMOVE in nic_override:
6564         del args['nics'][-1]
6565
6566     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6567     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6568     return env, nl, nl
6569
6570   def _GetUpdatedParams(self, old_params, update_dict,
6571                         default_values, parameter_types):
6572     """Return the new params dict for the given params.
6573
6574     @type old_params: dict
6575     @param old_params: old parameters
6576     @type update_dict: dict
6577     @param update_dict: dict containing new parameter values,
6578                         or constants.VALUE_DEFAULT to reset the
6579                         parameter to its default value
6580     @type default_values: dict
6581     @param default_values: default values for the filled parameters
6582     @type parameter_types: dict
6583     @param parameter_types: dict mapping target dict keys to types
6584                             in constants.ENFORCEABLE_TYPES
6585     @rtype: (dict, dict)
6586     @return: (new_parameters, filled_parameters)
6587
6588     """
6589     params_copy = copy.deepcopy(old_params)
6590     for key, val in update_dict.iteritems():
6591       if val == constants.VALUE_DEFAULT:
6592         try:
6593           del params_copy[key]
6594         except KeyError:
6595           pass
6596       else:
6597         params_copy[key] = val
6598     utils.ForceDictType(params_copy, parameter_types)
6599     params_filled = objects.FillDict(default_values, params_copy)
6600     return (params_copy, params_filled)
6601
6602   def CheckPrereq(self):
6603     """Check prerequisites.
6604
6605     This only checks the instance list against the existing names.
6606
6607     """
6608     self.force = self.op.force
6609
6610     # checking the new params on the primary/secondary nodes
6611
6612     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6613     cluster = self.cluster = self.cfg.GetClusterInfo()
6614     assert self.instance is not None, \
6615       "Cannot retrieve locked instance %s" % self.op.instance_name
6616     pnode = instance.primary_node
6617     nodelist = list(instance.all_nodes)
6618
6619     # hvparams processing
6620     if self.op.hvparams:
6621       i_hvdict, hv_new = self._GetUpdatedParams(
6622                              instance.hvparams, self.op.hvparams,
6623                              cluster.hvparams[instance.hypervisor],
6624                              constants.HVS_PARAMETER_TYPES)
6625       # local check
6626       hypervisor.GetHypervisor(
6627         instance.hypervisor).CheckParameterSyntax(hv_new)
6628       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6629       self.hv_new = hv_new # the new actual values
6630       self.hv_inst = i_hvdict # the new dict (without defaults)
6631     else:
6632       self.hv_new = self.hv_inst = {}
6633
6634     # beparams processing
6635     if self.op.beparams:
6636       i_bedict, be_new = self._GetUpdatedParams(
6637                              instance.beparams, self.op.beparams,
6638                              cluster.beparams[constants.PP_DEFAULT],
6639                              constants.BES_PARAMETER_TYPES)
6640       self.be_new = be_new # the new actual values
6641       self.be_inst = i_bedict # the new dict (without defaults)
6642     else:
6643       self.be_new = self.be_inst = {}
6644
6645     self.warn = []
6646
6647     if constants.BE_MEMORY in self.op.beparams and not self.force:
6648       mem_check_list = [pnode]
6649       if be_new[constants.BE_AUTO_BALANCE]:
6650         # either we changed auto_balance to yes or it was from before
6651         mem_check_list.extend(instance.secondary_nodes)
6652       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6653                                                   instance.hypervisor)
6654       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6655                                          instance.hypervisor)
6656       pninfo = nodeinfo[pnode]
6657       msg = pninfo.fail_msg
6658       if msg:
6659         # Assume the primary node is unreachable and go ahead
6660         self.warn.append("Can't get info from primary node %s: %s" %
6661                          (pnode,  msg))
6662       elif not isinstance(pninfo.payload.get('memory_free', None), int):
6663         self.warn.append("Node data from primary node %s doesn't contain"
6664                          " free memory information" % pnode)
6665       elif instance_info.fail_msg:
6666         self.warn.append("Can't get instance runtime information: %s" %
6667                         instance_info.fail_msg)
6668       else:
6669         if instance_info.payload:
6670           current_mem = int(instance_info.payload['memory'])
6671         else:
6672           # Assume instance not running
6673           # (there is a slight race condition here, but it's not very probable,
6674           # and we have no other way to check)
6675           current_mem = 0
6676         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6677                     pninfo.payload['memory_free'])
6678         if miss_mem > 0:
6679           raise errors.OpPrereqError("This change will prevent the instance"
6680                                      " from starting, due to %d MB of memory"
6681                                      " missing on its primary node" % miss_mem)
6682
6683       if be_new[constants.BE_AUTO_BALANCE]:
6684         for node, nres in nodeinfo.items():
6685           if node not in instance.secondary_nodes:
6686             continue
6687           msg = nres.fail_msg
6688           if msg:
6689             self.warn.append("Can't get info from secondary node %s: %s" %
6690                              (node, msg))
6691           elif not isinstance(nres.payload.get('memory_free', None), int):
6692             self.warn.append("Secondary node %s didn't return free"
6693                              " memory information" % node)
6694           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6695             self.warn.append("Not enough memory to failover instance to"
6696                              " secondary node %s" % node)
6697
6698     # NIC processing
6699     self.nic_pnew = {}
6700     self.nic_pinst = {}
6701     for nic_op, nic_dict in self.op.nics:
6702       if nic_op == constants.DDM_REMOVE:
6703         if not instance.nics:
6704           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6705         continue
6706       if nic_op != constants.DDM_ADD:
6707         # an existing nic
6708         if nic_op < 0 or nic_op >= len(instance.nics):
6709           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6710                                      " are 0 to %d" %
6711                                      (nic_op, len(instance.nics)))
6712         old_nic_params = instance.nics[nic_op].nicparams
6713         old_nic_ip = instance.nics[nic_op].ip
6714       else:
6715         old_nic_params = {}
6716         old_nic_ip = None
6717
6718       update_params_dict = dict([(key, nic_dict[key])
6719                                  for key in constants.NICS_PARAMETERS
6720                                  if key in nic_dict])
6721
6722       if 'bridge' in nic_dict:
6723         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6724
6725       new_nic_params, new_filled_nic_params = \
6726           self._GetUpdatedParams(old_nic_params, update_params_dict,
6727                                  cluster.nicparams[constants.PP_DEFAULT],
6728                                  constants.NICS_PARAMETER_TYPES)
6729       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6730       self.nic_pinst[nic_op] = new_nic_params
6731       self.nic_pnew[nic_op] = new_filled_nic_params
6732       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6733
6734       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6735         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6736         msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6737         if msg:
6738           msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6739           if self.force:
6740             self.warn.append(msg)
6741           else:
6742             raise errors.OpPrereqError(msg)
6743       if new_nic_mode == constants.NIC_MODE_ROUTED:
6744         if 'ip' in nic_dict:
6745           nic_ip = nic_dict['ip']
6746         else:
6747           nic_ip = old_nic_ip
6748         if nic_ip is None:
6749           raise errors.OpPrereqError('Cannot set the nic ip to None'
6750                                      ' on a routed nic')
6751       if 'mac' in nic_dict:
6752         nic_mac = nic_dict['mac']
6753         if nic_mac is None:
6754           raise errors.OpPrereqError('Cannot set the nic mac to None')
6755         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6756           # otherwise generate the mac
6757           nic_dict['mac'] = self.cfg.GenerateMAC()
6758         else:
6759           # or validate/reserve the current one
6760           if self.cfg.IsMacInUse(nic_mac):
6761             raise errors.OpPrereqError("MAC address %s already in use"
6762                                        " in cluster" % nic_mac)
6763
6764     # DISK processing
6765     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6766       raise errors.OpPrereqError("Disk operations not supported for"
6767                                  " diskless instances")
6768     for disk_op, disk_dict in self.op.disks:
6769       if disk_op == constants.DDM_REMOVE:
6770         if len(instance.disks) == 1:
6771           raise errors.OpPrereqError("Cannot remove the last disk of"
6772                                      " an instance")
6773         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6774         ins_l = ins_l[pnode]
6775         msg = ins_l.fail_msg
6776         if msg:
6777           raise errors.OpPrereqError("Can't contact node %s: %s" %
6778                                      (pnode, msg))
6779         if instance.name in ins_l.payload:
6780           raise errors.OpPrereqError("Instance is running, can't remove"
6781                                      " disks.")
6782
6783       if (disk_op == constants.DDM_ADD and
6784           len(instance.nics) >= constants.MAX_DISKS):
6785         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6786                                    " add more" % constants.MAX_DISKS)
6787       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6788         # an existing disk
6789         if disk_op < 0 or disk_op >= len(instance.disks):
6790           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6791                                      " are 0 to %d" %
6792                                      (disk_op, len(instance.disks)))
6793
6794     return
6795
6796   def Exec(self, feedback_fn):
6797     """Modifies an instance.
6798
6799     All parameters take effect only at the next restart of the instance.
6800
6801     """
6802     # Process here the warnings from CheckPrereq, as we don't have a
6803     # feedback_fn there.
6804     for warn in self.warn:
6805       feedback_fn("WARNING: %s" % warn)
6806
6807     result = []
6808     instance = self.instance
6809     cluster = self.cluster
6810     # disk changes
6811     for disk_op, disk_dict in self.op.disks:
6812       if disk_op == constants.DDM_REMOVE:
6813         # remove the last disk
6814         device = instance.disks.pop()
6815         device_idx = len(instance.disks)
6816         for node, disk in device.ComputeNodeTree(instance.primary_node):
6817           self.cfg.SetDiskID(disk, node)
6818           msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6819           if msg:
6820             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6821                             " continuing anyway", device_idx, node, msg)
6822         result.append(("disk/%d" % device_idx, "remove"))
6823       elif disk_op == constants.DDM_ADD:
6824         # add a new disk
6825         if instance.disk_template == constants.DT_FILE:
6826           file_driver, file_path = instance.disks[0].logical_id
6827           file_path = os.path.dirname(file_path)
6828         else:
6829           file_driver = file_path = None
6830         disk_idx_base = len(instance.disks)
6831         new_disk = _GenerateDiskTemplate(self,
6832                                          instance.disk_template,
6833                                          instance.name, instance.primary_node,
6834                                          instance.secondary_nodes,
6835                                          [disk_dict],
6836                                          file_path,
6837                                          file_driver,
6838                                          disk_idx_base)[0]
6839         instance.disks.append(new_disk)
6840         info = _GetInstanceInfoText(instance)
6841
6842         logging.info("Creating volume %s for instance %s",
6843                      new_disk.iv_name, instance.name)
6844         # Note: this needs to be kept in sync with _CreateDisks
6845         #HARDCODE
6846         for node in instance.all_nodes:
6847           f_create = node == instance.primary_node
6848           try:
6849             _CreateBlockDev(self, node, instance, new_disk,
6850                             f_create, info, f_create)
6851           except errors.OpExecError, err:
6852             self.LogWarning("Failed to create volume %s (%s) on"
6853                             " node %s: %s",
6854                             new_disk.iv_name, new_disk, node, err)
6855         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6856                        (new_disk.size, new_disk.mode)))
6857       else:
6858         # change a given disk
6859         instance.disks[disk_op].mode = disk_dict['mode']
6860         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6861     # NIC changes
6862     for nic_op, nic_dict in self.op.nics:
6863       if nic_op == constants.DDM_REMOVE:
6864         # remove the last nic
6865         del instance.nics[-1]
6866         result.append(("nic.%d" % len(instance.nics), "remove"))
6867       elif nic_op == constants.DDM_ADD:
6868         # mac and bridge should be set, by now
6869         mac = nic_dict['mac']
6870         ip = nic_dict.get('ip', None)
6871         nicparams = self.nic_pinst[constants.DDM_ADD]
6872         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6873         instance.nics.append(new_nic)
6874         result.append(("nic.%d" % (len(instance.nics) - 1),
6875                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6876                        (new_nic.mac, new_nic.ip,
6877                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6878                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6879                        )))
6880       else:
6881         for key in 'mac', 'ip':
6882           if key in nic_dict:
6883             setattr(instance.nics[nic_op], key, nic_dict[key])
6884         if nic_op in self.nic_pnew:
6885           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6886         for key, val in nic_dict.iteritems():
6887           result.append(("nic.%s/%d" % (key, nic_op), val))
6888
6889     # hvparams changes
6890     if self.op.hvparams:
6891       instance.hvparams = self.hv_inst
6892       for key, val in self.op.hvparams.iteritems():
6893         result.append(("hv/%s" % key, val))
6894
6895     # beparams changes
6896     if self.op.beparams:
6897       instance.beparams = self.be_inst
6898       for key, val in self.op.beparams.iteritems():
6899         result.append(("be/%s" % key, val))
6900
6901     self.cfg.Update(instance)
6902
6903     return result
6904
6905
6906 class LUQueryExports(NoHooksLU):
6907   """Query the exports list
6908
6909   """
6910   _OP_REQP = ['nodes']
6911   REQ_BGL = False
6912
6913   def ExpandNames(self):
6914     self.needed_locks = {}
6915     self.share_locks[locking.LEVEL_NODE] = 1
6916     if not self.op.nodes:
6917       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6918     else:
6919       self.needed_locks[locking.LEVEL_NODE] = \
6920         _GetWantedNodes(self, self.op.nodes)
6921
6922   def CheckPrereq(self):
6923     """Check prerequisites.
6924
6925     """
6926     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6927
6928   def Exec(self, feedback_fn):
6929     """Compute the list of all the exported system images.
6930
6931     @rtype: dict
6932     @return: a dictionary with the structure node->(export-list)
6933         where export-list is a list of the instances exported on
6934         that node.
6935
6936     """
6937     rpcresult = self.rpc.call_export_list(self.nodes)
6938     result = {}
6939     for node in rpcresult:
6940       if rpcresult[node].fail_msg:
6941         result[node] = False
6942       else:
6943         result[node] = rpcresult[node].payload
6944
6945     return result
6946
6947
6948 class LUExportInstance(LogicalUnit):
6949   """Export an instance to an image in the cluster.
6950
6951   """
6952   HPATH = "instance-export"
6953   HTYPE = constants.HTYPE_INSTANCE
6954   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6955   REQ_BGL = False
6956
6957   def ExpandNames(self):
6958     self._ExpandAndLockInstance()
6959     # FIXME: lock only instance primary and destination node
6960     #
6961     # Sad but true, for now we have do lock all nodes, as we don't know where
6962     # the previous export might be, and and in this LU we search for it and
6963     # remove it from its current node. In the future we could fix this by:
6964     #  - making a tasklet to search (share-lock all), then create the new one,
6965     #    then one to remove, after
6966     #  - removing the removal operation altogether
6967     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6968
6969   def DeclareLocks(self, level):
6970     """Last minute lock declaration."""
6971     # All nodes are locked anyway, so nothing to do here.
6972
6973   def BuildHooksEnv(self):
6974     """Build hooks env.
6975
6976     This will run on the master, primary node and target node.
6977
6978     """
6979     env = {
6980       "EXPORT_NODE": self.op.target_node,
6981       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6982       }
6983     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6984     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6985           self.op.target_node]
6986     return env, nl, nl
6987
6988   def CheckPrereq(self):
6989     """Check prerequisites.
6990
6991     This checks that the instance and node names are valid.
6992
6993     """
6994     instance_name = self.op.instance_name
6995     self.instance = self.cfg.GetInstanceInfo(instance_name)
6996     assert self.instance is not None, \
6997           "Cannot retrieve locked instance %s" % self.op.instance_name
6998     _CheckNodeOnline(self, self.instance.primary_node)
6999
7000     self.dst_node = self.cfg.GetNodeInfo(
7001       self.cfg.ExpandNodeName(self.op.target_node))
7002
7003     if self.dst_node is None:
7004       # This is wrong node name, not a non-locked node
7005       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
7006     _CheckNodeOnline(self, self.dst_node.name)
7007     _CheckNodeNotDrained(self, self.dst_node.name)
7008
7009     # instance disk type verification
7010     for disk in self.instance.disks:
7011       if disk.dev_type == constants.LD_FILE:
7012         raise errors.OpPrereqError("Export not supported for instances with"
7013                                    " file-based disks")
7014
7015   def Exec(self, feedback_fn):
7016     """Export an instance to an image in the cluster.
7017
7018     """
7019     instance = self.instance
7020     dst_node = self.dst_node
7021     src_node = instance.primary_node
7022     if self.op.shutdown:
7023       # shutdown the instance, but not the disks
7024       result = self.rpc.call_instance_shutdown(src_node, instance)
7025       result.Raise("Could not shutdown instance %s on"
7026                    " node %s" % (instance.name, src_node))
7027
7028     vgname = self.cfg.GetVGName()
7029
7030     snap_disks = []
7031
7032     # set the disks ID correctly since call_instance_start needs the
7033     # correct drbd minor to create the symlinks
7034     for disk in instance.disks:
7035       self.cfg.SetDiskID(disk, src_node)
7036
7037     try:
7038       for idx, disk in enumerate(instance.disks):
7039         # result.payload will be a snapshot of an lvm leaf of the one we passed
7040         result = self.rpc.call_blockdev_snapshot(src_node, disk)
7041         msg = result.fail_msg
7042         if msg:
7043           self.LogWarning("Could not snapshot disk/%s on node %s: %s",
7044                           idx, src_node, msg)
7045           snap_disks.append(False)
7046         else:
7047           disk_id = (vgname, result.payload)
7048           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
7049                                  logical_id=disk_id, physical_id=disk_id,
7050                                  iv_name=disk.iv_name)
7051           snap_disks.append(new_dev)
7052
7053     finally:
7054       if self.op.shutdown and instance.admin_up:
7055         result = self.rpc.call_instance_start(src_node, instance, None, None)
7056         msg = result.fail_msg
7057         if msg:
7058           _ShutdownInstanceDisks(self, instance)
7059           raise errors.OpExecError("Could not start instance: %s" % msg)
7060
7061     # TODO: check for size
7062
7063     cluster_name = self.cfg.GetClusterName()
7064     for idx, dev in enumerate(snap_disks):
7065       if dev:
7066         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
7067                                                instance, cluster_name, idx)
7068         msg = result.fail_msg
7069         if msg:
7070           self.LogWarning("Could not export disk/%s from node %s to"
7071                           " node %s: %s", idx, src_node, dst_node.name, msg)
7072         msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
7073         if msg:
7074           self.LogWarning("Could not remove snapshot for disk/%d from node"
7075                           " %s: %s", idx, src_node, msg)
7076
7077     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
7078     msg = result.fail_msg
7079     if msg:
7080       self.LogWarning("Could not finalize export for instance %s"
7081                       " on node %s: %s", instance.name, dst_node.name, msg)
7082
7083     nodelist = self.cfg.GetNodeList()
7084     nodelist.remove(dst_node.name)
7085
7086     # on one-node clusters nodelist will be empty after the removal
7087     # if we proceed the backup would be removed because OpQueryExports
7088     # substitutes an empty list with the full cluster node list.
7089     iname = instance.name
7090     if nodelist:
7091       exportlist = self.rpc.call_export_list(nodelist)
7092       for node in exportlist:
7093         if exportlist[node].fail_msg:
7094           continue
7095         if iname in exportlist[node].payload:
7096           msg = self.rpc.call_export_remove(node, iname).fail_msg
7097           if msg:
7098             self.LogWarning("Could not remove older export for instance %s"
7099                             " on node %s: %s", iname, node, msg)
7100
7101
7102 class LURemoveExport(NoHooksLU):
7103   """Remove exports related to the named instance.
7104
7105   """
7106   _OP_REQP = ["instance_name"]
7107   REQ_BGL = False
7108
7109   def ExpandNames(self):
7110     self.needed_locks = {}
7111     # We need all nodes to be locked in order for RemoveExport to work, but we
7112     # don't need to lock the instance itself, as nothing will happen to it (and
7113     # we can remove exports also for a removed instance)
7114     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
7115
7116   def CheckPrereq(self):
7117     """Check prerequisites.
7118     """
7119     pass
7120
7121   def Exec(self, feedback_fn):
7122     """Remove any export.
7123
7124     """
7125     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
7126     # If the instance was not found we'll try with the name that was passed in.
7127     # This will only work if it was an FQDN, though.
7128     fqdn_warn = False
7129     if not instance_name:
7130       fqdn_warn = True
7131       instance_name = self.op.instance_name
7132
7133     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
7134     exportlist = self.rpc.call_export_list(locked_nodes)
7135     found = False
7136     for node in exportlist:
7137       msg = exportlist[node].fail_msg
7138       if msg:
7139         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
7140         continue
7141       if instance_name in exportlist[node].payload:
7142         found = True
7143         result = self.rpc.call_export_remove(node, instance_name)
7144         msg = result.fail_msg
7145         if msg:
7146           logging.error("Could not remove export for instance %s"
7147                         " on node %s: %s", instance_name, node, msg)
7148
7149     if fqdn_warn and not found:
7150       feedback_fn("Export not found. If trying to remove an export belonging"
7151                   " to a deleted instance please use its Fully Qualified"
7152                   " Domain Name.")
7153
7154
7155 class TagsLU(NoHooksLU):
7156   """Generic tags LU.
7157
7158   This is an abstract class which is the parent of all the other tags LUs.
7159
7160   """
7161
7162   def ExpandNames(self):
7163     self.needed_locks = {}
7164     if self.op.kind == constants.TAG_NODE:
7165       name = self.cfg.ExpandNodeName(self.op.name)
7166       if name is None:
7167         raise errors.OpPrereqError("Invalid node name (%s)" %
7168                                    (self.op.name,))
7169       self.op.name = name
7170       self.needed_locks[locking.LEVEL_NODE] = name
7171     elif self.op.kind == constants.TAG_INSTANCE:
7172       name = self.cfg.ExpandInstanceName(self.op.name)
7173       if name is None:
7174         raise errors.OpPrereqError("Invalid instance name (%s)" %
7175                                    (self.op.name,))
7176       self.op.name = name
7177       self.needed_locks[locking.LEVEL_INSTANCE] = name
7178
7179   def CheckPrereq(self):
7180     """Check prerequisites.
7181
7182     """
7183     if self.op.kind == constants.TAG_CLUSTER:
7184       self.target = self.cfg.GetClusterInfo()
7185     elif self.op.kind == constants.TAG_NODE:
7186       self.target = self.cfg.GetNodeInfo(self.op.name)
7187     elif self.op.kind == constants.TAG_INSTANCE:
7188       self.target = self.cfg.GetInstanceInfo(self.op.name)
7189     else:
7190       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
7191                                  str(self.op.kind))
7192
7193
7194 class LUGetTags(TagsLU):
7195   """Returns the tags of a given object.
7196
7197   """
7198   _OP_REQP = ["kind", "name"]
7199   REQ_BGL = False
7200
7201   def Exec(self, feedback_fn):
7202     """Returns the tag list.
7203
7204     """
7205     return list(self.target.GetTags())
7206
7207
7208 class LUSearchTags(NoHooksLU):
7209   """Searches the tags for a given pattern.
7210
7211   """
7212   _OP_REQP = ["pattern"]
7213   REQ_BGL = False
7214
7215   def ExpandNames(self):
7216     self.needed_locks = {}
7217
7218   def CheckPrereq(self):
7219     """Check prerequisites.
7220
7221     This checks the pattern passed for validity by compiling it.
7222
7223     """
7224     try:
7225       self.re = re.compile(self.op.pattern)
7226     except re.error, err:
7227       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
7228                                  (self.op.pattern, err))
7229
7230   def Exec(self, feedback_fn):
7231     """Returns the tag list.
7232
7233     """
7234     cfg = self.cfg
7235     tgts = [("/cluster", cfg.GetClusterInfo())]
7236     ilist = cfg.GetAllInstancesInfo().values()
7237     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
7238     nlist = cfg.GetAllNodesInfo().values()
7239     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
7240     results = []
7241     for path, target in tgts:
7242       for tag in target.GetTags():
7243         if self.re.search(tag):
7244           results.append((path, tag))
7245     return results
7246
7247
7248 class LUAddTags(TagsLU):
7249   """Sets a tag on a given object.
7250
7251   """
7252   _OP_REQP = ["kind", "name", "tags"]
7253   REQ_BGL = False
7254
7255   def CheckPrereq(self):
7256     """Check prerequisites.
7257
7258     This checks the type and length of the tag name and value.
7259
7260     """
7261     TagsLU.CheckPrereq(self)
7262     for tag in self.op.tags:
7263       objects.TaggableObject.ValidateTag(tag)
7264
7265   def Exec(self, feedback_fn):
7266     """Sets the tag.
7267
7268     """
7269     try:
7270       for tag in self.op.tags:
7271         self.target.AddTag(tag)
7272     except errors.TagError, err:
7273       raise errors.OpExecError("Error while setting tag: %s" % str(err))
7274     try:
7275       self.cfg.Update(self.target)
7276     except errors.ConfigurationError:
7277       raise errors.OpRetryError("There has been a modification to the"
7278                                 " config file and the operation has been"
7279                                 " aborted. Please retry.")
7280
7281
7282 class LUDelTags(TagsLU):
7283   """Delete a list of tags from a given object.
7284
7285   """
7286   _OP_REQP = ["kind", "name", "tags"]
7287   REQ_BGL = False
7288
7289   def CheckPrereq(self):
7290     """Check prerequisites.
7291
7292     This checks that we have the given tag.
7293
7294     """
7295     TagsLU.CheckPrereq(self)
7296     for tag in self.op.tags:
7297       objects.TaggableObject.ValidateTag(tag)
7298     del_tags = frozenset(self.op.tags)
7299     cur_tags = self.target.GetTags()
7300     if not del_tags <= cur_tags:
7301       diff_tags = del_tags - cur_tags
7302       diff_names = ["'%s'" % tag for tag in diff_tags]
7303       diff_names.sort()
7304       raise errors.OpPrereqError("Tag(s) %s not found" %
7305                                  (",".join(diff_names)))
7306
7307   def Exec(self, feedback_fn):
7308     """Remove the tag from the object.
7309
7310     """
7311     for tag in self.op.tags:
7312       self.target.RemoveTag(tag)
7313     try:
7314       self.cfg.Update(self.target)
7315     except errors.ConfigurationError:
7316       raise errors.OpRetryError("There has been a modification to the"
7317                                 " config file and the operation has been"
7318                                 " aborted. Please retry.")
7319
7320
7321 class LUTestDelay(NoHooksLU):
7322   """Sleep for a specified amount of time.
7323
7324   This LU sleeps on the master and/or nodes for a specified amount of
7325   time.
7326
7327   """
7328   _OP_REQP = ["duration", "on_master", "on_nodes"]
7329   REQ_BGL = False
7330
7331   def ExpandNames(self):
7332     """Expand names and set required locks.
7333
7334     This expands the node list, if any.
7335
7336     """
7337     self.needed_locks = {}
7338     if self.op.on_nodes:
7339       # _GetWantedNodes can be used here, but is not always appropriate to use
7340       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
7341       # more information.
7342       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
7343       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
7344
7345   def CheckPrereq(self):
7346     """Check prerequisites.
7347
7348     """
7349
7350   def Exec(self, feedback_fn):
7351     """Do the actual sleep.
7352
7353     """
7354     if self.op.on_master:
7355       if not utils.TestDelay(self.op.duration):
7356         raise errors.OpExecError("Error during master delay test")
7357     if self.op.on_nodes:
7358       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
7359       for node, node_result in result.items():
7360         node_result.Raise("Failure during rpc call to node %s" % node)
7361
7362
7363 class IAllocator(object):
7364   """IAllocator framework.
7365
7366   An IAllocator instance has three sets of attributes:
7367     - cfg that is needed to query the cluster
7368     - input data (all members of the _KEYS class attribute are required)
7369     - four buffer attributes (in|out_data|text), that represent the
7370       input (to the external script) in text and data structure format,
7371       and the output from it, again in two formats
7372     - the result variables from the script (success, info, nodes) for
7373       easy usage
7374
7375   """
7376   _ALLO_KEYS = [
7377     "mem_size", "disks", "disk_template",
7378     "os", "tags", "nics", "vcpus", "hypervisor",
7379     ]
7380   _RELO_KEYS = [
7381     "relocate_from",
7382     ]
7383
7384   def __init__(self, cfg, rpc, mode, name, **kwargs):
7385     self.cfg = cfg
7386     self.rpc = rpc
7387     # init buffer variables
7388     self.in_text = self.out_text = self.in_data = self.out_data = None
7389     # init all input fields so that pylint is happy
7390     self.mode = mode
7391     self.name = name
7392     self.mem_size = self.disks = self.disk_template = None
7393     self.os = self.tags = self.nics = self.vcpus = None
7394     self.hypervisor = None
7395     self.relocate_from = None
7396     # computed fields
7397     self.required_nodes = None
7398     # init result fields
7399     self.success = self.info = self.nodes = None
7400     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7401       keyset = self._ALLO_KEYS
7402     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
7403       keyset = self._RELO_KEYS
7404     else:
7405       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
7406                                    " IAllocator" % self.mode)
7407     for key in kwargs:
7408       if key not in keyset:
7409         raise errors.ProgrammerError("Invalid input parameter '%s' to"
7410                                      " IAllocator" % key)
7411       setattr(self, key, kwargs[key])
7412     for key in keyset:
7413       if key not in kwargs:
7414         raise errors.ProgrammerError("Missing input parameter '%s' to"
7415                                      " IAllocator" % key)
7416     self._BuildInputData()
7417
7418   def _ComputeClusterData(self):
7419     """Compute the generic allocator input data.
7420
7421     This is the data that is independent of the actual operation.
7422
7423     """
7424     cfg = self.cfg
7425     cluster_info = cfg.GetClusterInfo()
7426     # cluster data
7427     data = {
7428       "version": constants.IALLOCATOR_VERSION,
7429       "cluster_name": cfg.GetClusterName(),
7430       "cluster_tags": list(cluster_info.GetTags()),
7431       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
7432       # we don't have job IDs
7433       }
7434     iinfo = cfg.GetAllInstancesInfo().values()
7435     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
7436
7437     # node data
7438     node_results = {}
7439     node_list = cfg.GetNodeList()
7440
7441     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7442       hypervisor_name = self.hypervisor
7443     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
7444       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
7445
7446     node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
7447                                         hypervisor_name)
7448     node_iinfo = \
7449       self.rpc.call_all_instances_info(node_list,
7450                                        cluster_info.enabled_hypervisors)
7451     for nname, nresult in node_data.items():
7452       # first fill in static (config-based) values
7453       ninfo = cfg.GetNodeInfo(nname)
7454       pnr = {
7455         "tags": list(ninfo.GetTags()),
7456         "primary_ip": ninfo.primary_ip,
7457         "secondary_ip": ninfo.secondary_ip,
7458         "offline": ninfo.offline,
7459         "drained": ninfo.drained,
7460         "master_candidate": ninfo.master_candidate,
7461         }
7462
7463       if not ninfo.offline:
7464         nresult.Raise("Can't get data for node %s" % nname)
7465         node_iinfo[nname].Raise("Can't get node instance info from node %s" %
7466                                 nname)
7467         remote_info = nresult.payload
7468         for attr in ['memory_total', 'memory_free', 'memory_dom0',
7469                      'vg_size', 'vg_free', 'cpu_total']:
7470           if attr not in remote_info:
7471             raise errors.OpExecError("Node '%s' didn't return attribute"
7472                                      " '%s'" % (nname, attr))
7473           if not isinstance(remote_info[attr], int):
7474             raise errors.OpExecError("Node '%s' returned invalid value"
7475                                      " for '%s': %s" %
7476                                      (nname, attr, remote_info[attr]))
7477         # compute memory used by primary instances
7478         i_p_mem = i_p_up_mem = 0
7479         for iinfo, beinfo in i_list:
7480           if iinfo.primary_node == nname:
7481             i_p_mem += beinfo[constants.BE_MEMORY]
7482             if iinfo.name not in node_iinfo[nname].payload:
7483               i_used_mem = 0
7484             else:
7485               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
7486             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
7487             remote_info['memory_free'] -= max(0, i_mem_diff)
7488
7489             if iinfo.admin_up:
7490               i_p_up_mem += beinfo[constants.BE_MEMORY]
7491
7492         # compute memory used by instances
7493         pnr_dyn = {
7494           "total_memory": remote_info['memory_total'],
7495           "reserved_memory": remote_info['memory_dom0'],
7496           "free_memory": remote_info['memory_free'],
7497           "total_disk": remote_info['vg_size'],
7498           "free_disk": remote_info['vg_free'],
7499           "total_cpus": remote_info['cpu_total'],
7500           "i_pri_memory": i_p_mem,
7501           "i_pri_up_memory": i_p_up_mem,
7502           }
7503         pnr.update(pnr_dyn)
7504
7505       node_results[nname] = pnr
7506     data["nodes"] = node_results
7507
7508     # instance data
7509     instance_data = {}
7510     for iinfo, beinfo in i_list:
7511       nic_data = []
7512       for nic in iinfo.nics:
7513         filled_params = objects.FillDict(
7514             cluster_info.nicparams[constants.PP_DEFAULT],
7515             nic.nicparams)
7516         nic_dict = {"mac": nic.mac,
7517                     "ip": nic.ip,
7518                     "mode": filled_params[constants.NIC_MODE],
7519                     "link": filled_params[constants.NIC_LINK],
7520                    }
7521         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7522           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7523         nic_data.append(nic_dict)
7524       pir = {
7525         "tags": list(iinfo.GetTags()),
7526         "admin_up": iinfo.admin_up,
7527         "vcpus": beinfo[constants.BE_VCPUS],
7528         "memory": beinfo[constants.BE_MEMORY],
7529         "os": iinfo.os,
7530         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7531         "nics": nic_data,
7532         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7533         "disk_template": iinfo.disk_template,
7534         "hypervisor": iinfo.hypervisor,
7535         }
7536       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7537                                                  pir["disks"])
7538       instance_data[iinfo.name] = pir
7539
7540     data["instances"] = instance_data
7541
7542     self.in_data = data
7543
7544   def _AddNewInstance(self):
7545     """Add new instance data to allocator structure.
7546
7547     This in combination with _AllocatorGetClusterData will create the
7548     correct structure needed as input for the allocator.
7549
7550     The checks for the completeness of the opcode must have already been
7551     done.
7552
7553     """
7554     data = self.in_data
7555
7556     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7557
7558     if self.disk_template in constants.DTS_NET_MIRROR:
7559       self.required_nodes = 2
7560     else:
7561       self.required_nodes = 1
7562     request = {
7563       "type": "allocate",
7564       "name": self.name,
7565       "disk_template": self.disk_template,
7566       "tags": self.tags,
7567       "os": self.os,
7568       "vcpus": self.vcpus,
7569       "memory": self.mem_size,
7570       "disks": self.disks,
7571       "disk_space_total": disk_space,
7572       "nics": self.nics,
7573       "required_nodes": self.required_nodes,
7574       }
7575     data["request"] = request
7576
7577   def _AddRelocateInstance(self):
7578     """Add relocate instance data to allocator structure.
7579
7580     This in combination with _IAllocatorGetClusterData will create the
7581     correct structure needed as input for the allocator.
7582
7583     The checks for the completeness of the opcode must have already been
7584     done.
7585
7586     """
7587     instance = self.cfg.GetInstanceInfo(self.name)
7588     if instance is None:
7589       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7590                                    " IAllocator" % self.name)
7591
7592     if instance.disk_template not in constants.DTS_NET_MIRROR:
7593       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7594
7595     if len(instance.secondary_nodes) != 1:
7596       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7597
7598     self.required_nodes = 1
7599     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7600     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7601
7602     request = {
7603       "type": "relocate",
7604       "name": self.name,
7605       "disk_space_total": disk_space,
7606       "required_nodes": self.required_nodes,
7607       "relocate_from": self.relocate_from,
7608       }
7609     self.in_data["request"] = request
7610
7611   def _BuildInputData(self):
7612     """Build input data structures.
7613
7614     """
7615     self._ComputeClusterData()
7616
7617     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7618       self._AddNewInstance()
7619     else:
7620       self._AddRelocateInstance()
7621
7622     self.in_text = serializer.Dump(self.in_data)
7623
7624   def Run(self, name, validate=True, call_fn=None):
7625     """Run an instance allocator and return the results.
7626
7627     """
7628     if call_fn is None:
7629       call_fn = self.rpc.call_iallocator_runner
7630
7631     result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
7632     result.Raise("Failure while running the iallocator script")
7633
7634     self.out_text = result.payload
7635     if validate:
7636       self._ValidateResult()
7637
7638   def _ValidateResult(self):
7639     """Process the allocator results.
7640
7641     This will process and if successful save the result in
7642     self.out_data and the other parameters.
7643
7644     """
7645     try:
7646       rdict = serializer.Load(self.out_text)
7647     except Exception, err:
7648       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7649
7650     if not isinstance(rdict, dict):
7651       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7652
7653     for key in "success", "info", "nodes":
7654       if key not in rdict:
7655         raise errors.OpExecError("Can't parse iallocator results:"
7656                                  " missing key '%s'" % key)
7657       setattr(self, key, rdict[key])
7658
7659     if not isinstance(rdict["nodes"], list):
7660       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7661                                " is not a list")
7662     self.out_data = rdict
7663
7664
7665 class LUTestAllocator(NoHooksLU):
7666   """Run allocator tests.
7667
7668   This LU runs the allocator tests
7669
7670   """
7671   _OP_REQP = ["direction", "mode", "name"]
7672
7673   def CheckPrereq(self):
7674     """Check prerequisites.
7675
7676     This checks the opcode parameters depending on the director and mode test.
7677
7678     """
7679     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7680       for attr in ["name", "mem_size", "disks", "disk_template",
7681                    "os", "tags", "nics", "vcpus"]:
7682         if not hasattr(self.op, attr):
7683           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7684                                      attr)
7685       iname = self.cfg.ExpandInstanceName(self.op.name)
7686       if iname is not None:
7687         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7688                                    iname)
7689       if not isinstance(self.op.nics, list):
7690         raise errors.OpPrereqError("Invalid parameter 'nics'")
7691       for row in self.op.nics:
7692         if (not isinstance(row, dict) or
7693             "mac" not in row or
7694             "ip" not in row or
7695             "bridge" not in row):
7696           raise errors.OpPrereqError("Invalid contents of the"
7697                                      " 'nics' parameter")
7698       if not isinstance(self.op.disks, list):
7699         raise errors.OpPrereqError("Invalid parameter 'disks'")
7700       for row in self.op.disks:
7701         if (not isinstance(row, dict) or
7702             "size" not in row or
7703             not isinstance(row["size"], int) or
7704             "mode" not in row or
7705             row["mode"] not in ['r', 'w']):
7706           raise errors.OpPrereqError("Invalid contents of the"
7707                                      " 'disks' parameter")
7708       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7709         self.op.hypervisor = self.cfg.GetHypervisorType()
7710     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7711       if not hasattr(self.op, "name"):
7712         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7713       fname = self.cfg.ExpandInstanceName(self.op.name)
7714       if fname is None:
7715         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7716                                    self.op.name)
7717       self.op.name = fname
7718       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7719     else:
7720       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7721                                  self.op.mode)
7722
7723     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7724       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7725         raise errors.OpPrereqError("Missing allocator name")
7726     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7727       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7728                                  self.op.direction)
7729
7730   def Exec(self, feedback_fn):
7731     """Run the allocator test.
7732
7733     """
7734     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7735       ial = IAllocator(self.cfg, self.rpc,
7736                        mode=self.op.mode,
7737                        name=self.op.name,
7738                        mem_size=self.op.mem_size,
7739                        disks=self.op.disks,
7740                        disk_template=self.op.disk_template,
7741                        os=self.op.os,
7742                        tags=self.op.tags,
7743                        nics=self.op.nics,
7744                        vcpus=self.op.vcpus,
7745                        hypervisor=self.op.hypervisor,
7746                        )
7747     else:
7748       ial = IAllocator(self.cfg, self.rpc,
7749                        mode=self.op.mode,
7750                        name=self.op.name,
7751                        relocate_from=list(self.relocate_from),
7752                        )
7753
7754     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7755       result = ial.in_text
7756     else:
7757       ial.Run(self.op.allocator, validate=False)
7758       result = ial.out_text
7759     return result