Merge branch 'next' into branch-2.1
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import re
30 import platform
31 import logging
32 import copy
33
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
43
44
45 class LogicalUnit(object):
46   """Logical Unit base class.
47
48   Subclasses must follow these rules:
49     - implement ExpandNames
50     - implement CheckPrereq
51     - implement Exec
52     - implement BuildHooksEnv
53     - redefine HPATH and HTYPE
54     - optionally redefine their run requirements:
55         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56
57   Note that all commands require root permissions.
58
59   @ivar dry_run_result: the value (if any) that will be returned to the caller
60       in dry-run mode (signalled by opcode dry_run parameter)
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overridden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92     # support for dry-run
93     self.dry_run_result = None
94
95     for attr_name in self._OP_REQP:
96       attr_val = getattr(op, attr_name, None)
97       if attr_val is None:
98         raise errors.OpPrereqError("Required parameter '%s' missing" %
99                                    attr_name)
100     self.CheckArguments()
101
102   def __GetSSH(self):
103     """Returns the SshRunner object
104
105     """
106     if not self.__ssh:
107       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
108     return self.__ssh
109
110   ssh = property(fget=__GetSSH)
111
112   def CheckArguments(self):
113     """Check syntactic validity for the opcode arguments.
114
115     This method is for doing a simple syntactic check and ensure
116     validity of opcode parameters, without any cluster-related
117     checks. While the same can be accomplished in ExpandNames and/or
118     CheckPrereq, doing these separate is better because:
119
120       - ExpandNames is left as as purely a lock-related function
121       - CheckPrereq is run after we have acquired locks (and possible
122         waited for them)
123
124     The function is allowed to change the self.op attribute so that
125     later methods can no longer worry about missing parameters.
126
127     """
128     pass
129
130   def ExpandNames(self):
131     """Expand names for this LU.
132
133     This method is called before starting to execute the opcode, and it should
134     update all the parameters of the opcode to their canonical form (e.g. a
135     short node name must be fully expanded after this method has successfully
136     completed). This way locking, hooks, logging, ecc. can work correctly.
137
138     LUs which implement this method must also populate the self.needed_locks
139     member, as a dict with lock levels as keys, and a list of needed lock names
140     as values. Rules:
141
142       - use an empty dict if you don't need any lock
143       - if you don't need any lock at a particular level omit that level
144       - don't put anything for the BGL level
145       - if you want all locks at a level use locking.ALL_SET as a value
146
147     If you need to share locks (rather than acquire them exclusively) at one
148     level you can modify self.share_locks, setting a true value (usually 1) for
149     that level. By default locks are not shared.
150
151     Examples::
152
153       # Acquire all nodes and one instance
154       self.needed_locks = {
155         locking.LEVEL_NODE: locking.ALL_SET,
156         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
157       }
158       # Acquire just two nodes
159       self.needed_locks = {
160         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
161       }
162       # Acquire no locks
163       self.needed_locks = {} # No, you can't leave it to the default value None
164
165     """
166     # The implementation of this method is mandatory only if the new LU is
167     # concurrent, so that old LUs don't need to be changed all at the same
168     # time.
169     if self.REQ_BGL:
170       self.needed_locks = {} # Exclusive LUs don't need locks.
171     else:
172       raise NotImplementedError
173
174   def DeclareLocks(self, level):
175     """Declare LU locking needs for a level
176
177     While most LUs can just declare their locking needs at ExpandNames time,
178     sometimes there's the need to calculate some locks after having acquired
179     the ones before. This function is called just before acquiring locks at a
180     particular level, but after acquiring the ones at lower levels, and permits
181     such calculations. It can be used to modify self.needed_locks, and by
182     default it does nothing.
183
184     This function is only called if you have something already set in
185     self.needed_locks for the level.
186
187     @param level: Locking level which is going to be locked
188     @type level: member of ganeti.locking.LEVELS
189
190     """
191
192   def CheckPrereq(self):
193     """Check prerequisites for this LU.
194
195     This method should check that the prerequisites for the execution
196     of this LU are fulfilled. It can do internode communication, but
197     it should be idempotent - no cluster or system changes are
198     allowed.
199
200     The method should raise errors.OpPrereqError in case something is
201     not fulfilled. Its return value is ignored.
202
203     This method should also update all the parameters of the opcode to
204     their canonical form if it hasn't been done by ExpandNames before.
205
206     """
207     raise NotImplementedError
208
209   def Exec(self, feedback_fn):
210     """Execute the LU.
211
212     This method should implement the actual work. It should raise
213     errors.OpExecError for failures that are somewhat dealt with in
214     code, or expected.
215
216     """
217     raise NotImplementedError
218
219   def BuildHooksEnv(self):
220     """Build hooks environment for this LU.
221
222     This method should return a three-node tuple consisting of: a dict
223     containing the environment that will be used for running the
224     specific hook for this LU, a list of node names on which the hook
225     should run before the execution, and a list of node names on which
226     the hook should run after the execution.
227
228     The keys of the dict must not have 'GANETI_' prefixed as this will
229     be handled in the hooks runner. Also note additional keys will be
230     added by the hooks runner. If the LU doesn't define any
231     environment, an empty dict (and not None) should be returned.
232
233     No nodes should be returned as an empty list (and not None).
234
235     Note that if the HPATH for a LU class is None, this function will
236     not be called.
237
238     """
239     raise NotImplementedError
240
241   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
242     """Notify the LU about the results of its hooks.
243
244     This method is called every time a hooks phase is executed, and notifies
245     the Logical Unit about the hooks' result. The LU can then use it to alter
246     its result based on the hooks.  By default the method does nothing and the
247     previous result is passed back unchanged but any LU can define it if it
248     wants to use the local cluster hook-scripts somehow.
249
250     @param phase: one of L{constants.HOOKS_PHASE_POST} or
251         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
252     @param hook_results: the results of the multi-node hooks rpc call
253     @param feedback_fn: function used send feedback back to the caller
254     @param lu_result: the previous Exec result this LU had, or None
255         in the PRE phase
256     @return: the new Exec result, based on the previous result
257         and hook results
258
259     """
260     return lu_result
261
262   def _ExpandAndLockInstance(self):
263     """Helper function to expand and lock an instance.
264
265     Many LUs that work on an instance take its name in self.op.instance_name
266     and need to expand it and then declare the expanded name for locking. This
267     function does it, and then updates self.op.instance_name to the expanded
268     name. It also initializes needed_locks as a dict, if this hasn't been done
269     before.
270
271     """
272     if self.needed_locks is None:
273       self.needed_locks = {}
274     else:
275       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
276         "_ExpandAndLockInstance called with instance-level locks set"
277     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
278     if expanded_name is None:
279       raise errors.OpPrereqError("Instance '%s' not known" %
280                                   self.op.instance_name)
281     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
282     self.op.instance_name = expanded_name
283
284   def _LockInstancesNodes(self, primary_only=False):
285     """Helper function to declare instances' nodes for locking.
286
287     This function should be called after locking one or more instances to lock
288     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
289     with all primary or secondary nodes for instances already locked and
290     present in self.needed_locks[locking.LEVEL_INSTANCE].
291
292     It should be called from DeclareLocks, and for safety only works if
293     self.recalculate_locks[locking.LEVEL_NODE] is set.
294
295     In the future it may grow parameters to just lock some instance's nodes, or
296     to just lock primaries or secondary nodes, if needed.
297
298     If should be called in DeclareLocks in a way similar to::
299
300       if level == locking.LEVEL_NODE:
301         self._LockInstancesNodes()
302
303     @type primary_only: boolean
304     @param primary_only: only lock primary nodes of locked instances
305
306     """
307     assert locking.LEVEL_NODE in self.recalculate_locks, \
308       "_LockInstancesNodes helper function called with no nodes to recalculate"
309
310     # TODO: check if we're really been called with the instance locks held
311
312     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
313     # future we might want to have different behaviors depending on the value
314     # of self.recalculate_locks[locking.LEVEL_NODE]
315     wanted_nodes = []
316     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
317       instance = self.context.cfg.GetInstanceInfo(instance_name)
318       wanted_nodes.append(instance.primary_node)
319       if not primary_only:
320         wanted_nodes.extend(instance.secondary_nodes)
321
322     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
323       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
324     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
325       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
326
327     del self.recalculate_locks[locking.LEVEL_NODE]
328
329
330 class NoHooksLU(LogicalUnit):
331   """Simple LU which runs no hooks.
332
333   This LU is intended as a parent for other LogicalUnits which will
334   run no hooks, in order to reduce duplicate code.
335
336   """
337   HPATH = None
338   HTYPE = None
339
340
341 def _GetWantedNodes(lu, nodes):
342   """Returns list of checked and expanded node names.
343
344   @type lu: L{LogicalUnit}
345   @param lu: the logical unit on whose behalf we execute
346   @type nodes: list
347   @param nodes: list of node names or None for all nodes
348   @rtype: list
349   @return: the list of nodes, sorted
350   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
351
352   """
353   if not isinstance(nodes, list):
354     raise errors.OpPrereqError("Invalid argument type 'nodes'")
355
356   if not nodes:
357     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
358       " non-empty list of nodes whose name is to be expanded.")
359
360   wanted = []
361   for name in nodes:
362     node = lu.cfg.ExpandNodeName(name)
363     if node is None:
364       raise errors.OpPrereqError("No such node name '%s'" % name)
365     wanted.append(node)
366
367   return utils.NiceSort(wanted)
368
369
370 def _GetWantedInstances(lu, instances):
371   """Returns list of checked and expanded instance names.
372
373   @type lu: L{LogicalUnit}
374   @param lu: the logical unit on whose behalf we execute
375   @type instances: list
376   @param instances: list of instance names or None for all instances
377   @rtype: list
378   @return: the list of instances, sorted
379   @raise errors.OpPrereqError: if the instances parameter is wrong type
380   @raise errors.OpPrereqError: if any of the passed instances is not found
381
382   """
383   if not isinstance(instances, list):
384     raise errors.OpPrereqError("Invalid argument type 'instances'")
385
386   if instances:
387     wanted = []
388
389     for name in instances:
390       instance = lu.cfg.ExpandInstanceName(name)
391       if instance is None:
392         raise errors.OpPrereqError("No such instance name '%s'" % name)
393       wanted.append(instance)
394
395   else:
396     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
397   return wanted
398
399
400 def _CheckOutputFields(static, dynamic, selected):
401   """Checks whether all selected fields are valid.
402
403   @type static: L{utils.FieldSet}
404   @param static: static fields set
405   @type dynamic: L{utils.FieldSet}
406   @param dynamic: dynamic fields set
407
408   """
409   f = utils.FieldSet()
410   f.Extend(static)
411   f.Extend(dynamic)
412
413   delta = f.NonMatching(selected)
414   if delta:
415     raise errors.OpPrereqError("Unknown output fields selected: %s"
416                                % ",".join(delta))
417
418
419 def _CheckBooleanOpField(op, name):
420   """Validates boolean opcode parameters.
421
422   This will ensure that an opcode parameter is either a boolean value,
423   or None (but that it always exists).
424
425   """
426   val = getattr(op, name, None)
427   if not (val is None or isinstance(val, bool)):
428     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
429                                (name, str(val)))
430   setattr(op, name, val)
431
432
433 def _CheckNodeOnline(lu, node):
434   """Ensure that a given node is online.
435
436   @param lu: the LU on behalf of which we make the check
437   @param node: the node to check
438   @raise errors.OpPrereqError: if the node is offline
439
440   """
441   if lu.cfg.GetNodeInfo(node).offline:
442     raise errors.OpPrereqError("Can't use offline node %s" % node)
443
444
445 def _CheckNodeNotDrained(lu, node):
446   """Ensure that a given node is not drained.
447
448   @param lu: the LU on behalf of which we make the check
449   @param node: the node to check
450   @raise errors.OpPrereqError: if the node is drained
451
452   """
453   if lu.cfg.GetNodeInfo(node).drained:
454     raise errors.OpPrereqError("Can't use drained node %s" % node)
455
456
457 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
458                           memory, vcpus, nics, disk_template, disks,
459                           bep, hvp, hypervisor_name):
460   """Builds instance related env variables for hooks
461
462   This builds the hook environment from individual variables.
463
464   @type name: string
465   @param name: the name of the instance
466   @type primary_node: string
467   @param primary_node: the name of the instance's primary node
468   @type secondary_nodes: list
469   @param secondary_nodes: list of secondary nodes as strings
470   @type os_type: string
471   @param os_type: the name of the instance's OS
472   @type status: boolean
473   @param status: the should_run status of the instance
474   @type memory: string
475   @param memory: the memory size of the instance
476   @type vcpus: string
477   @param vcpus: the count of VCPUs the instance has
478   @type nics: list
479   @param nics: list of tuples (ip, mac, mode, link) representing
480       the NICs the instance has
481   @type disk_template: string
482   @param disk_template: the disk template of the instance
483   @type disks: list
484   @param disks: the list of (size, mode) pairs
485   @type bep: dict
486   @param bep: the backend parameters for the instance
487   @type hvp: dict
488   @param hvp: the hypervisor parameters for the instance
489   @type hypervisor_name: string
490   @param hypervisor_name: the hypervisor for the instance
491   @rtype: dict
492   @return: the hook environment for this instance
493
494   """
495   if status:
496     str_status = "up"
497   else:
498     str_status = "down"
499   env = {
500     "OP_TARGET": name,
501     "INSTANCE_NAME": name,
502     "INSTANCE_PRIMARY": primary_node,
503     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
504     "INSTANCE_OS_TYPE": os_type,
505     "INSTANCE_STATUS": str_status,
506     "INSTANCE_MEMORY": memory,
507     "INSTANCE_VCPUS": vcpus,
508     "INSTANCE_DISK_TEMPLATE": disk_template,
509     "INSTANCE_HYPERVISOR": hypervisor_name,
510   }
511
512   if nics:
513     nic_count = len(nics)
514     for idx, (ip, mac, mode, link) in enumerate(nics):
515       if ip is None:
516         ip = ""
517       env["INSTANCE_NIC%d_IP" % idx] = ip
518       env["INSTANCE_NIC%d_MAC" % idx] = mac
519       env["INSTANCE_NIC%d_MODE" % idx] = mode
520       env["INSTANCE_NIC%d_LINK" % idx] = link
521       if mode == constants.NIC_MODE_BRIDGED:
522         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
523   else:
524     nic_count = 0
525
526   env["INSTANCE_NIC_COUNT"] = nic_count
527
528   if disks:
529     disk_count = len(disks)
530     for idx, (size, mode) in enumerate(disks):
531       env["INSTANCE_DISK%d_SIZE" % idx] = size
532       env["INSTANCE_DISK%d_MODE" % idx] = mode
533   else:
534     disk_count = 0
535
536   env["INSTANCE_DISK_COUNT"] = disk_count
537
538   for source, kind in [(bep, "BE"), (hvp, "HV")]:
539     for key, value in source.items():
540       env["INSTANCE_%s_%s" % (kind, key)] = value
541
542   return env
543
544 def _NICListToTuple(lu, nics):
545   """Build a list of nic information tuples.
546
547   This list is suitable to be passed to _BuildInstanceHookEnv or as a return
548   value in LUQueryInstanceData.
549
550   @type lu:  L{LogicalUnit}
551   @param lu: the logical unit on whose behalf we execute
552   @type nics: list of L{objects.NIC}
553   @param nics: list of nics to convert to hooks tuples
554
555   """
556   hooks_nics = []
557   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
558   for nic in nics:
559     ip = nic.ip
560     mac = nic.mac
561     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
562     mode = filled_params[constants.NIC_MODE]
563     link = filled_params[constants.NIC_LINK]
564     hooks_nics.append((ip, mac, mode, link))
565   return hooks_nics
566
567 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
568   """Builds instance related env variables for hooks from an object.
569
570   @type lu: L{LogicalUnit}
571   @param lu: the logical unit on whose behalf we execute
572   @type instance: L{objects.Instance}
573   @param instance: the instance for which we should build the
574       environment
575   @type override: dict
576   @param override: dictionary with key/values that will override
577       our values
578   @rtype: dict
579   @return: the hook environment dictionary
580
581   """
582   cluster = lu.cfg.GetClusterInfo()
583   bep = cluster.FillBE(instance)
584   hvp = cluster.FillHV(instance)
585   args = {
586     'name': instance.name,
587     'primary_node': instance.primary_node,
588     'secondary_nodes': instance.secondary_nodes,
589     'os_type': instance.os,
590     'status': instance.admin_up,
591     'memory': bep[constants.BE_MEMORY],
592     'vcpus': bep[constants.BE_VCPUS],
593     'nics': _NICListToTuple(lu, instance.nics),
594     'disk_template': instance.disk_template,
595     'disks': [(disk.size, disk.mode) for disk in instance.disks],
596     'bep': bep,
597     'hvp': hvp,
598     'hypervisor': instance.hypervisor,
599   }
600   if override:
601     args.update(override)
602   return _BuildInstanceHookEnv(**args)
603
604
605 def _AdjustCandidatePool(lu):
606   """Adjust the candidate pool after node operations.
607
608   """
609   mod_list = lu.cfg.MaintainCandidatePool()
610   if mod_list:
611     lu.LogInfo("Promoted nodes to master candidate role: %s",
612                ", ".join(node.name for node in mod_list))
613     for name in mod_list:
614       lu.context.ReaddNode(name)
615   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
616   if mc_now > mc_max:
617     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
618                (mc_now, mc_max))
619
620
621 def _CheckNicsBridgesExist(lu, target_nics, target_node,
622                                profile=constants.PP_DEFAULT):
623   """Check that the brigdes needed by a list of nics exist.
624
625   """
626   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
627   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
628                 for nic in target_nics]
629   brlist = [params[constants.NIC_LINK] for params in paramslist
630             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
631   if brlist:
632     result = lu.rpc.call_bridges_exist(target_node, brlist)
633     result.Raise("Error checking bridges on destination node '%s'" %
634                  target_node, prereq=True)
635
636
637 def _CheckInstanceBridgesExist(lu, instance, node=None):
638   """Check that the brigdes needed by an instance exist.
639
640   """
641   if node is None:
642     node = instance.primary_node
643   _CheckNicsBridgesExist(lu, instance.nics, node)
644
645
646 class LUDestroyCluster(NoHooksLU):
647   """Logical unit for destroying the cluster.
648
649   """
650   _OP_REQP = []
651
652   def CheckPrereq(self):
653     """Check prerequisites.
654
655     This checks whether the cluster is empty.
656
657     Any errors are signaled by raising errors.OpPrereqError.
658
659     """
660     master = self.cfg.GetMasterNode()
661
662     nodelist = self.cfg.GetNodeList()
663     if len(nodelist) != 1 or nodelist[0] != master:
664       raise errors.OpPrereqError("There are still %d node(s) in"
665                                  " this cluster." % (len(nodelist) - 1))
666     instancelist = self.cfg.GetInstanceList()
667     if instancelist:
668       raise errors.OpPrereqError("There are still %d instance(s) in"
669                                  " this cluster." % len(instancelist))
670
671   def Exec(self, feedback_fn):
672     """Destroys the cluster.
673
674     """
675     master = self.cfg.GetMasterNode()
676     result = self.rpc.call_node_stop_master(master, False)
677     result.Raise("Could not disable the master role")
678     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
679     utils.CreateBackup(priv_key)
680     utils.CreateBackup(pub_key)
681     return master
682
683
684 class LUVerifyCluster(LogicalUnit):
685   """Verifies the cluster status.
686
687   """
688   HPATH = "cluster-verify"
689   HTYPE = constants.HTYPE_CLUSTER
690   _OP_REQP = ["skip_checks"]
691   REQ_BGL = False
692
693   def ExpandNames(self):
694     self.needed_locks = {
695       locking.LEVEL_NODE: locking.ALL_SET,
696       locking.LEVEL_INSTANCE: locking.ALL_SET,
697     }
698     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
699
700   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
701                   node_result, feedback_fn, master_files,
702                   drbd_map, vg_name):
703     """Run multiple tests against a node.
704
705     Test list:
706
707       - compares ganeti version
708       - checks vg existence and size > 20G
709       - checks config file checksum
710       - checks ssh to other nodes
711
712     @type nodeinfo: L{objects.Node}
713     @param nodeinfo: the node to check
714     @param file_list: required list of files
715     @param local_cksum: dictionary of local files and their checksums
716     @param node_result: the results from the node
717     @param feedback_fn: function used to accumulate results
718     @param master_files: list of files that only masters should have
719     @param drbd_map: the useddrbd minors for this node, in
720         form of minor: (instance, must_exist) which correspond to instances
721         and their running status
722     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
723
724     """
725     node = nodeinfo.name
726
727     # main result, node_result should be a non-empty dict
728     if not node_result or not isinstance(node_result, dict):
729       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
730       return True
731
732     # compares ganeti version
733     local_version = constants.PROTOCOL_VERSION
734     remote_version = node_result.get('version', None)
735     if not (remote_version and isinstance(remote_version, (list, tuple)) and
736             len(remote_version) == 2):
737       feedback_fn("  - ERROR: connection to %s failed" % (node))
738       return True
739
740     if local_version != remote_version[0]:
741       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
742                   " node %s %s" % (local_version, node, remote_version[0]))
743       return True
744
745     # node seems compatible, we can actually try to look into its results
746
747     bad = False
748
749     # full package version
750     if constants.RELEASE_VERSION != remote_version[1]:
751       feedback_fn("  - WARNING: software version mismatch: master %s,"
752                   " node %s %s" %
753                   (constants.RELEASE_VERSION, node, remote_version[1]))
754
755     # checks vg existence and size > 20G
756     if vg_name is not None:
757       vglist = node_result.get(constants.NV_VGLIST, None)
758       if not vglist:
759         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
760                         (node,))
761         bad = True
762       else:
763         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
764                                               constants.MIN_VG_SIZE)
765         if vgstatus:
766           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
767           bad = True
768
769     # checks config file checksum
770
771     remote_cksum = node_result.get(constants.NV_FILELIST, None)
772     if not isinstance(remote_cksum, dict):
773       bad = True
774       feedback_fn("  - ERROR: node hasn't returned file checksum data")
775     else:
776       for file_name in file_list:
777         node_is_mc = nodeinfo.master_candidate
778         must_have_file = file_name not in master_files
779         if file_name not in remote_cksum:
780           if node_is_mc or must_have_file:
781             bad = True
782             feedback_fn("  - ERROR: file '%s' missing" % file_name)
783         elif remote_cksum[file_name] != local_cksum[file_name]:
784           if node_is_mc or must_have_file:
785             bad = True
786             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
787           else:
788             # not candidate and this is not a must-have file
789             bad = True
790             feedback_fn("  - ERROR: file '%s' should not exist on non master"
791                         " candidates (and the file is outdated)" % file_name)
792         else:
793           # all good, except non-master/non-must have combination
794           if not node_is_mc and not must_have_file:
795             feedback_fn("  - ERROR: file '%s' should not exist on non master"
796                         " candidates" % file_name)
797
798     # checks ssh to any
799
800     if constants.NV_NODELIST not in node_result:
801       bad = True
802       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
803     else:
804       if node_result[constants.NV_NODELIST]:
805         bad = True
806         for node in node_result[constants.NV_NODELIST]:
807           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
808                           (node, node_result[constants.NV_NODELIST][node]))
809
810     if constants.NV_NODENETTEST not in node_result:
811       bad = True
812       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
813     else:
814       if node_result[constants.NV_NODENETTEST]:
815         bad = True
816         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
817         for node in nlist:
818           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
819                           (node, node_result[constants.NV_NODENETTEST][node]))
820
821     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
822     if isinstance(hyp_result, dict):
823       for hv_name, hv_result in hyp_result.iteritems():
824         if hv_result is not None:
825           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
826                       (hv_name, hv_result))
827
828     # check used drbd list
829     if vg_name is not None:
830       used_minors = node_result.get(constants.NV_DRBDLIST, [])
831       if not isinstance(used_minors, (tuple, list)):
832         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
833                     str(used_minors))
834       else:
835         for minor, (iname, must_exist) in drbd_map.items():
836           if minor not in used_minors and must_exist:
837             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
838                         " not active" % (minor, iname))
839             bad = True
840         for minor in used_minors:
841           if minor not in drbd_map:
842             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
843                         minor)
844             bad = True
845
846     return bad
847
848   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
849                       node_instance, feedback_fn, n_offline):
850     """Verify an instance.
851
852     This function checks to see if the required block devices are
853     available on the instance's node.
854
855     """
856     bad = False
857
858     node_current = instanceconfig.primary_node
859
860     node_vol_should = {}
861     instanceconfig.MapLVsByNode(node_vol_should)
862
863     for node in node_vol_should:
864       if node in n_offline:
865         # ignore missing volumes on offline nodes
866         continue
867       for volume in node_vol_should[node]:
868         if node not in node_vol_is or volume not in node_vol_is[node]:
869           feedback_fn("  - ERROR: volume %s missing on node %s" %
870                           (volume, node))
871           bad = True
872
873     if instanceconfig.admin_up:
874       if ((node_current not in node_instance or
875           not instance in node_instance[node_current]) and
876           node_current not in n_offline):
877         feedback_fn("  - ERROR: instance %s not running on node %s" %
878                         (instance, node_current))
879         bad = True
880
881     for node in node_instance:
882       if (not node == node_current):
883         if instance in node_instance[node]:
884           feedback_fn("  - ERROR: instance %s should not run on node %s" %
885                           (instance, node))
886           bad = True
887
888     return bad
889
890   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
891     """Verify if there are any unknown volumes in the cluster.
892
893     The .os, .swap and backup volumes are ignored. All other volumes are
894     reported as unknown.
895
896     """
897     bad = False
898
899     for node in node_vol_is:
900       for volume in node_vol_is[node]:
901         if node not in node_vol_should or volume not in node_vol_should[node]:
902           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
903                       (volume, node))
904           bad = True
905     return bad
906
907   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
908     """Verify the list of running instances.
909
910     This checks what instances are running but unknown to the cluster.
911
912     """
913     bad = False
914     for node in node_instance:
915       for runninginstance in node_instance[node]:
916         if runninginstance not in instancelist:
917           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
918                           (runninginstance, node))
919           bad = True
920     return bad
921
922   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
923     """Verify N+1 Memory Resilience.
924
925     Check that if one single node dies we can still start all the instances it
926     was primary for.
927
928     """
929     bad = False
930
931     for node, nodeinfo in node_info.iteritems():
932       # This code checks that every node which is now listed as secondary has
933       # enough memory to host all instances it is supposed to should a single
934       # other node in the cluster fail.
935       # FIXME: not ready for failover to an arbitrary node
936       # FIXME: does not support file-backed instances
937       # WARNING: we currently take into account down instances as well as up
938       # ones, considering that even if they're down someone might want to start
939       # them even in the event of a node failure.
940       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
941         needed_mem = 0
942         for instance in instances:
943           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
944           if bep[constants.BE_AUTO_BALANCE]:
945             needed_mem += bep[constants.BE_MEMORY]
946         if nodeinfo['mfree'] < needed_mem:
947           feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
948                       " failovers should node %s fail" % (node, prinode))
949           bad = True
950     return bad
951
952   def CheckPrereq(self):
953     """Check prerequisites.
954
955     Transform the list of checks we're going to skip into a set and check that
956     all its members are valid.
957
958     """
959     self.skip_set = frozenset(self.op.skip_checks)
960     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
961       raise errors.OpPrereqError("Invalid checks to be skipped specified")
962
963   def BuildHooksEnv(self):
964     """Build hooks env.
965
966     Cluster-Verify hooks just ran in the post phase and their failure makes
967     the output be logged in the verify output and the verification to fail.
968
969     """
970     all_nodes = self.cfg.GetNodeList()
971     env = {
972       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
973       }
974     for node in self.cfg.GetAllNodesInfo().values():
975       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
976
977     return env, [], all_nodes
978
979   def Exec(self, feedback_fn):
980     """Verify integrity of cluster, performing various test on nodes.
981
982     """
983     bad = False
984     feedback_fn("* Verifying global settings")
985     for msg in self.cfg.VerifyConfig():
986       feedback_fn("  - ERROR: %s" % msg)
987
988     vg_name = self.cfg.GetVGName()
989     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
990     nodelist = utils.NiceSort(self.cfg.GetNodeList())
991     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
992     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
993     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
994                         for iname in instancelist)
995     i_non_redundant = [] # Non redundant instances
996     i_non_a_balanced = [] # Non auto-balanced instances
997     n_offline = [] # List of offline nodes
998     n_drained = [] # List of nodes being drained
999     node_volume = {}
1000     node_instance = {}
1001     node_info = {}
1002     instance_cfg = {}
1003
1004     # FIXME: verify OS list
1005     # do local checksums
1006     master_files = [constants.CLUSTER_CONF_FILE]
1007
1008     file_names = ssconf.SimpleStore().GetFileList()
1009     file_names.append(constants.SSL_CERT_FILE)
1010     file_names.append(constants.RAPI_CERT_FILE)
1011     file_names.extend(master_files)
1012
1013     local_checksums = utils.FingerprintFiles(file_names)
1014
1015     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1016     node_verify_param = {
1017       constants.NV_FILELIST: file_names,
1018       constants.NV_NODELIST: [node.name for node in nodeinfo
1019                               if not node.offline],
1020       constants.NV_HYPERVISOR: hypervisors,
1021       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1022                                   node.secondary_ip) for node in nodeinfo
1023                                  if not node.offline],
1024       constants.NV_INSTANCELIST: hypervisors,
1025       constants.NV_VERSION: None,
1026       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1027       }
1028     if vg_name is not None:
1029       node_verify_param[constants.NV_VGLIST] = None
1030       node_verify_param[constants.NV_LVLIST] = vg_name
1031       node_verify_param[constants.NV_DRBDLIST] = None
1032     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1033                                            self.cfg.GetClusterName())
1034
1035     cluster = self.cfg.GetClusterInfo()
1036     master_node = self.cfg.GetMasterNode()
1037     all_drbd_map = self.cfg.ComputeDRBDMap()
1038
1039     for node_i in nodeinfo:
1040       node = node_i.name
1041
1042       if node_i.offline:
1043         feedback_fn("* Skipping offline node %s" % (node,))
1044         n_offline.append(node)
1045         continue
1046
1047       if node == master_node:
1048         ntype = "master"
1049       elif node_i.master_candidate:
1050         ntype = "master candidate"
1051       elif node_i.drained:
1052         ntype = "drained"
1053         n_drained.append(node)
1054       else:
1055         ntype = "regular"
1056       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1057
1058       msg = all_nvinfo[node].fail_msg
1059       if msg:
1060         feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1061         bad = True
1062         continue
1063
1064       nresult = all_nvinfo[node].payload
1065       node_drbd = {}
1066       for minor, instance in all_drbd_map[node].items():
1067         if instance not in instanceinfo:
1068           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1069                       instance)
1070           # ghost instance should not be running, but otherwise we
1071           # don't give double warnings (both ghost instance and
1072           # unallocated minor in use)
1073           node_drbd[minor] = (instance, False)
1074         else:
1075           instance = instanceinfo[instance]
1076           node_drbd[minor] = (instance.name, instance.admin_up)
1077       result = self._VerifyNode(node_i, file_names, local_checksums,
1078                                 nresult, feedback_fn, master_files,
1079                                 node_drbd, vg_name)
1080       bad = bad or result
1081
1082       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1083       if vg_name is None:
1084         node_volume[node] = {}
1085       elif isinstance(lvdata, basestring):
1086         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1087                     (node, utils.SafeEncode(lvdata)))
1088         bad = True
1089         node_volume[node] = {}
1090       elif not isinstance(lvdata, dict):
1091         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1092         bad = True
1093         continue
1094       else:
1095         node_volume[node] = lvdata
1096
1097       # node_instance
1098       idata = nresult.get(constants.NV_INSTANCELIST, None)
1099       if not isinstance(idata, list):
1100         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1101                     (node,))
1102         bad = True
1103         continue
1104
1105       node_instance[node] = idata
1106
1107       # node_info
1108       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1109       if not isinstance(nodeinfo, dict):
1110         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1111         bad = True
1112         continue
1113
1114       try:
1115         node_info[node] = {
1116           "mfree": int(nodeinfo['memory_free']),
1117           "pinst": [],
1118           "sinst": [],
1119           # dictionary holding all instances this node is secondary for,
1120           # grouped by their primary node. Each key is a cluster node, and each
1121           # value is a list of instances which have the key as primary and the
1122           # current node as secondary.  this is handy to calculate N+1 memory
1123           # availability if you can only failover from a primary to its
1124           # secondary.
1125           "sinst-by-pnode": {},
1126         }
1127         # FIXME: devise a free space model for file based instances as well
1128         if vg_name is not None:
1129           if (constants.NV_VGLIST not in nresult or
1130               vg_name not in nresult[constants.NV_VGLIST]):
1131             feedback_fn("  - ERROR: node %s didn't return data for the"
1132                         " volume group '%s' - it is either missing or broken" %
1133                         (node, vg_name))
1134             bad = True
1135             continue
1136           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1137       except (ValueError, KeyError):
1138         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1139                     " from node %s" % (node,))
1140         bad = True
1141         continue
1142
1143     node_vol_should = {}
1144
1145     for instance in instancelist:
1146       feedback_fn("* Verifying instance %s" % instance)
1147       inst_config = instanceinfo[instance]
1148       result =  self._VerifyInstance(instance, inst_config, node_volume,
1149                                      node_instance, feedback_fn, n_offline)
1150       bad = bad or result
1151       inst_nodes_offline = []
1152
1153       inst_config.MapLVsByNode(node_vol_should)
1154
1155       instance_cfg[instance] = inst_config
1156
1157       pnode = inst_config.primary_node
1158       if pnode in node_info:
1159         node_info[pnode]['pinst'].append(instance)
1160       elif pnode not in n_offline:
1161         feedback_fn("  - ERROR: instance %s, connection to primary node"
1162                     " %s failed" % (instance, pnode))
1163         bad = True
1164
1165       if pnode in n_offline:
1166         inst_nodes_offline.append(pnode)
1167
1168       # If the instance is non-redundant we cannot survive losing its primary
1169       # node, so we are not N+1 compliant. On the other hand we have no disk
1170       # templates with more than one secondary so that situation is not well
1171       # supported either.
1172       # FIXME: does not support file-backed instances
1173       if len(inst_config.secondary_nodes) == 0:
1174         i_non_redundant.append(instance)
1175       elif len(inst_config.secondary_nodes) > 1:
1176         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1177                     % instance)
1178
1179       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1180         i_non_a_balanced.append(instance)
1181
1182       for snode in inst_config.secondary_nodes:
1183         if snode in node_info:
1184           node_info[snode]['sinst'].append(instance)
1185           if pnode not in node_info[snode]['sinst-by-pnode']:
1186             node_info[snode]['sinst-by-pnode'][pnode] = []
1187           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1188         elif snode not in n_offline:
1189           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1190                       " %s failed" % (instance, snode))
1191           bad = True
1192         if snode in n_offline:
1193           inst_nodes_offline.append(snode)
1194
1195       if inst_nodes_offline:
1196         # warn that the instance lives on offline nodes, and set bad=True
1197         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1198                     ", ".join(inst_nodes_offline))
1199         bad = True
1200
1201     feedback_fn("* Verifying orphan volumes")
1202     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1203                                        feedback_fn)
1204     bad = bad or result
1205
1206     feedback_fn("* Verifying remaining instances")
1207     result = self._VerifyOrphanInstances(instancelist, node_instance,
1208                                          feedback_fn)
1209     bad = bad or result
1210
1211     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1212       feedback_fn("* Verifying N+1 Memory redundancy")
1213       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1214       bad = bad or result
1215
1216     feedback_fn("* Other Notes")
1217     if i_non_redundant:
1218       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1219                   % len(i_non_redundant))
1220
1221     if i_non_a_balanced:
1222       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1223                   % len(i_non_a_balanced))
1224
1225     if n_offline:
1226       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1227
1228     if n_drained:
1229       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1230
1231     return not bad
1232
1233   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1234     """Analyze the post-hooks' result
1235
1236     This method analyses the hook result, handles it, and sends some
1237     nicely-formatted feedback back to the user.
1238
1239     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1240         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1241     @param hooks_results: the results of the multi-node hooks rpc call
1242     @param feedback_fn: function used send feedback back to the caller
1243     @param lu_result: previous Exec result
1244     @return: the new Exec result, based on the previous result
1245         and hook results
1246
1247     """
1248     # We only really run POST phase hooks, and are only interested in
1249     # their results
1250     if phase == constants.HOOKS_PHASE_POST:
1251       # Used to change hooks' output to proper indentation
1252       indent_re = re.compile('^', re.M)
1253       feedback_fn("* Hooks Results")
1254       if not hooks_results:
1255         feedback_fn("  - ERROR: general communication failure")
1256         lu_result = 1
1257       else:
1258         for node_name in hooks_results:
1259           show_node_header = True
1260           res = hooks_results[node_name]
1261           msg = res.fail_msg
1262           if msg:
1263             if res.offline:
1264               # no need to warn or set fail return value
1265               continue
1266             feedback_fn("    Communication failure in hooks execution: %s" %
1267                         msg)
1268             lu_result = 1
1269             continue
1270           for script, hkr, output in res.payload:
1271             if hkr == constants.HKR_FAIL:
1272               # The node header is only shown once, if there are
1273               # failing hooks on that node
1274               if show_node_header:
1275                 feedback_fn("  Node %s:" % node_name)
1276                 show_node_header = False
1277               feedback_fn("    ERROR: Script %s failed, output:" % script)
1278               output = indent_re.sub('      ', output)
1279               feedback_fn("%s" % output)
1280               lu_result = 1
1281
1282       return lu_result
1283
1284
1285 class LUVerifyDisks(NoHooksLU):
1286   """Verifies the cluster disks status.
1287
1288   """
1289   _OP_REQP = []
1290   REQ_BGL = False
1291
1292   def ExpandNames(self):
1293     self.needed_locks = {
1294       locking.LEVEL_NODE: locking.ALL_SET,
1295       locking.LEVEL_INSTANCE: locking.ALL_SET,
1296     }
1297     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1298
1299   def CheckPrereq(self):
1300     """Check prerequisites.
1301
1302     This has no prerequisites.
1303
1304     """
1305     pass
1306
1307   def Exec(self, feedback_fn):
1308     """Verify integrity of cluster disks.
1309
1310     @rtype: tuple of three items
1311     @return: a tuple of (dict of node-to-node_error, list of instances
1312         which need activate-disks, dict of instance: (node, volume) for
1313         missing volumes
1314
1315     """
1316     result = res_nodes, res_instances, res_missing = {}, [], {}
1317
1318     vg_name = self.cfg.GetVGName()
1319     nodes = utils.NiceSort(self.cfg.GetNodeList())
1320     instances = [self.cfg.GetInstanceInfo(name)
1321                  for name in self.cfg.GetInstanceList()]
1322
1323     nv_dict = {}
1324     for inst in instances:
1325       inst_lvs = {}
1326       if (not inst.admin_up or
1327           inst.disk_template not in constants.DTS_NET_MIRROR):
1328         continue
1329       inst.MapLVsByNode(inst_lvs)
1330       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1331       for node, vol_list in inst_lvs.iteritems():
1332         for vol in vol_list:
1333           nv_dict[(node, vol)] = inst
1334
1335     if not nv_dict:
1336       return result
1337
1338     node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1339
1340     for node in nodes:
1341       # node_volume
1342       node_res = node_lvs[node]
1343       if node_res.offline:
1344         continue
1345       msg = node_res.fail_msg
1346       if msg:
1347         logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1348         res_nodes[node] = msg
1349         continue
1350
1351       lvs = node_res.payload
1352       for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1353         inst = nv_dict.pop((node, lv_name), None)
1354         if (not lv_online and inst is not None
1355             and inst.name not in res_instances):
1356           res_instances.append(inst.name)
1357
1358     # any leftover items in nv_dict are missing LVs, let's arrange the
1359     # data better
1360     for key, inst in nv_dict.iteritems():
1361       if inst.name not in res_missing:
1362         res_missing[inst.name] = []
1363       res_missing[inst.name].append(key)
1364
1365     return result
1366
1367
1368 class LURenameCluster(LogicalUnit):
1369   """Rename the cluster.
1370
1371   """
1372   HPATH = "cluster-rename"
1373   HTYPE = constants.HTYPE_CLUSTER
1374   _OP_REQP = ["name"]
1375
1376   def BuildHooksEnv(self):
1377     """Build hooks env.
1378
1379     """
1380     env = {
1381       "OP_TARGET": self.cfg.GetClusterName(),
1382       "NEW_NAME": self.op.name,
1383       }
1384     mn = self.cfg.GetMasterNode()
1385     return env, [mn], [mn]
1386
1387   def CheckPrereq(self):
1388     """Verify that the passed name is a valid one.
1389
1390     """
1391     hostname = utils.HostInfo(self.op.name)
1392
1393     new_name = hostname.name
1394     self.ip = new_ip = hostname.ip
1395     old_name = self.cfg.GetClusterName()
1396     old_ip = self.cfg.GetMasterIP()
1397     if new_name == old_name and new_ip == old_ip:
1398       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1399                                  " cluster has changed")
1400     if new_ip != old_ip:
1401       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1402         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1403                                    " reachable on the network. Aborting." %
1404                                    new_ip)
1405
1406     self.op.name = new_name
1407
1408   def Exec(self, feedback_fn):
1409     """Rename the cluster.
1410
1411     """
1412     clustername = self.op.name
1413     ip = self.ip
1414
1415     # shutdown the master IP
1416     master = self.cfg.GetMasterNode()
1417     result = self.rpc.call_node_stop_master(master, False)
1418     result.Raise("Could not disable the master role")
1419
1420     try:
1421       cluster = self.cfg.GetClusterInfo()
1422       cluster.cluster_name = clustername
1423       cluster.master_ip = ip
1424       self.cfg.Update(cluster)
1425
1426       # update the known hosts file
1427       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1428       node_list = self.cfg.GetNodeList()
1429       try:
1430         node_list.remove(master)
1431       except ValueError:
1432         pass
1433       result = self.rpc.call_upload_file(node_list,
1434                                          constants.SSH_KNOWN_HOSTS_FILE)
1435       for to_node, to_result in result.iteritems():
1436         msg = to_result.fail_msg
1437         if msg:
1438           msg = ("Copy of file %s to node %s failed: %s" %
1439                  (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1440           self.proc.LogWarning(msg)
1441
1442     finally:
1443       result = self.rpc.call_node_start_master(master, False, False)
1444       msg = result.fail_msg
1445       if msg:
1446         self.LogWarning("Could not re-enable the master role on"
1447                         " the master, please restart manually: %s", msg)
1448
1449
1450 def _RecursiveCheckIfLVMBased(disk):
1451   """Check if the given disk or its children are lvm-based.
1452
1453   @type disk: L{objects.Disk}
1454   @param disk: the disk to check
1455   @rtype: boolean
1456   @return: boolean indicating whether a LD_LV dev_type was found or not
1457
1458   """
1459   if disk.children:
1460     for chdisk in disk.children:
1461       if _RecursiveCheckIfLVMBased(chdisk):
1462         return True
1463   return disk.dev_type == constants.LD_LV
1464
1465
1466 class LUSetClusterParams(LogicalUnit):
1467   """Change the parameters of the cluster.
1468
1469   """
1470   HPATH = "cluster-modify"
1471   HTYPE = constants.HTYPE_CLUSTER
1472   _OP_REQP = []
1473   REQ_BGL = False
1474
1475   def CheckArguments(self):
1476     """Check parameters
1477
1478     """
1479     if not hasattr(self.op, "candidate_pool_size"):
1480       self.op.candidate_pool_size = None
1481     if self.op.candidate_pool_size is not None:
1482       try:
1483         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1484       except (ValueError, TypeError), err:
1485         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1486                                    str(err))
1487       if self.op.candidate_pool_size < 1:
1488         raise errors.OpPrereqError("At least one master candidate needed")
1489
1490   def ExpandNames(self):
1491     # FIXME: in the future maybe other cluster params won't require checking on
1492     # all nodes to be modified.
1493     self.needed_locks = {
1494       locking.LEVEL_NODE: locking.ALL_SET,
1495     }
1496     self.share_locks[locking.LEVEL_NODE] = 1
1497
1498   def BuildHooksEnv(self):
1499     """Build hooks env.
1500
1501     """
1502     env = {
1503       "OP_TARGET": self.cfg.GetClusterName(),
1504       "NEW_VG_NAME": self.op.vg_name,
1505       }
1506     mn = self.cfg.GetMasterNode()
1507     return env, [mn], [mn]
1508
1509   def CheckPrereq(self):
1510     """Check prerequisites.
1511
1512     This checks whether the given params don't conflict and
1513     if the given volume group is valid.
1514
1515     """
1516     if self.op.vg_name is not None and not self.op.vg_name:
1517       instances = self.cfg.GetAllInstancesInfo().values()
1518       for inst in instances:
1519         for disk in inst.disks:
1520           if _RecursiveCheckIfLVMBased(disk):
1521             raise errors.OpPrereqError("Cannot disable lvm storage while"
1522                                        " lvm-based instances exist")
1523
1524     node_list = self.acquired_locks[locking.LEVEL_NODE]
1525
1526     # if vg_name not None, checks given volume group on all nodes
1527     if self.op.vg_name:
1528       vglist = self.rpc.call_vg_list(node_list)
1529       for node in node_list:
1530         msg = vglist[node].fail_msg
1531         if msg:
1532           # ignoring down node
1533           self.LogWarning("Error while gathering data on node %s"
1534                           " (ignoring node): %s", node, msg)
1535           continue
1536         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1537                                               self.op.vg_name,
1538                                               constants.MIN_VG_SIZE)
1539         if vgstatus:
1540           raise errors.OpPrereqError("Error on node '%s': %s" %
1541                                      (node, vgstatus))
1542
1543     self.cluster = cluster = self.cfg.GetClusterInfo()
1544     # validate params changes
1545     if self.op.beparams:
1546       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1547       self.new_beparams = objects.FillDict(
1548         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1549
1550     if self.op.nicparams:
1551       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1552       self.new_nicparams = objects.FillDict(
1553         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1554       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1555
1556     # hypervisor list/parameters
1557     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1558     if self.op.hvparams:
1559       if not isinstance(self.op.hvparams, dict):
1560         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1561       for hv_name, hv_dict in self.op.hvparams.items():
1562         if hv_name not in self.new_hvparams:
1563           self.new_hvparams[hv_name] = hv_dict
1564         else:
1565           self.new_hvparams[hv_name].update(hv_dict)
1566
1567     if self.op.enabled_hypervisors is not None:
1568       self.hv_list = self.op.enabled_hypervisors
1569     else:
1570       self.hv_list = cluster.enabled_hypervisors
1571
1572     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1573       # either the enabled list has changed, or the parameters have, validate
1574       for hv_name, hv_params in self.new_hvparams.items():
1575         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1576             (self.op.enabled_hypervisors and
1577              hv_name in self.op.enabled_hypervisors)):
1578           # either this is a new hypervisor, or its parameters have changed
1579           hv_class = hypervisor.GetHypervisor(hv_name)
1580           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1581           hv_class.CheckParameterSyntax(hv_params)
1582           _CheckHVParams(self, node_list, hv_name, hv_params)
1583
1584   def Exec(self, feedback_fn):
1585     """Change the parameters of the cluster.
1586
1587     """
1588     if self.op.vg_name is not None:
1589       new_volume = self.op.vg_name
1590       if not new_volume:
1591         new_volume = None
1592       if new_volume != self.cfg.GetVGName():
1593         self.cfg.SetVGName(new_volume)
1594       else:
1595         feedback_fn("Cluster LVM configuration already in desired"
1596                     " state, not changing")
1597     if self.op.hvparams:
1598       self.cluster.hvparams = self.new_hvparams
1599     if self.op.enabled_hypervisors is not None:
1600       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1601     if self.op.beparams:
1602       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1603     if self.op.nicparams:
1604       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1605
1606     if self.op.candidate_pool_size is not None:
1607       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1608       # we need to update the pool size here, otherwise the save will fail
1609       _AdjustCandidatePool(self)
1610
1611     self.cfg.Update(self.cluster)
1612
1613
1614 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1615   """Distribute additional files which are part of the cluster configuration.
1616
1617   ConfigWriter takes care of distributing the config and ssconf files, but
1618   there are more files which should be distributed to all nodes. This function
1619   makes sure those are copied.
1620
1621   @param lu: calling logical unit
1622   @param additional_nodes: list of nodes not in the config to distribute to
1623
1624   """
1625   # 1. Gather target nodes
1626   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1627   dist_nodes = lu.cfg.GetNodeList()
1628   if additional_nodes is not None:
1629     dist_nodes.extend(additional_nodes)
1630   if myself.name in dist_nodes:
1631     dist_nodes.remove(myself.name)
1632   # 2. Gather files to distribute
1633   dist_files = set([constants.ETC_HOSTS,
1634                     constants.SSH_KNOWN_HOSTS_FILE,
1635                     constants.RAPI_CERT_FILE,
1636                     constants.RAPI_USERS_FILE,
1637                    ])
1638
1639   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1640   for hv_name in enabled_hypervisors:
1641     hv_class = hypervisor.GetHypervisor(hv_name)
1642     dist_files.update(hv_class.GetAncillaryFiles())
1643
1644   # 3. Perform the files upload
1645   for fname in dist_files:
1646     if os.path.exists(fname):
1647       result = lu.rpc.call_upload_file(dist_nodes, fname)
1648       for to_node, to_result in result.items():
1649         msg = to_result.fail_msg
1650         if msg:
1651           msg = ("Copy of file %s to node %s failed: %s" %
1652                  (fname, to_node, msg))
1653           lu.proc.LogWarning(msg)
1654
1655
1656 class LURedistributeConfig(NoHooksLU):
1657   """Force the redistribution of cluster configuration.
1658
1659   This is a very simple LU.
1660
1661   """
1662   _OP_REQP = []
1663   REQ_BGL = False
1664
1665   def ExpandNames(self):
1666     self.needed_locks = {
1667       locking.LEVEL_NODE: locking.ALL_SET,
1668     }
1669     self.share_locks[locking.LEVEL_NODE] = 1
1670
1671   def CheckPrereq(self):
1672     """Check prerequisites.
1673
1674     """
1675
1676   def Exec(self, feedback_fn):
1677     """Redistribute the configuration.
1678
1679     """
1680     self.cfg.Update(self.cfg.GetClusterInfo())
1681     _RedistributeAncillaryFiles(self)
1682
1683
1684 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1685   """Sleep and poll for an instance's disk to sync.
1686
1687   """
1688   if not instance.disks:
1689     return True
1690
1691   if not oneshot:
1692     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1693
1694   node = instance.primary_node
1695
1696   for dev in instance.disks:
1697     lu.cfg.SetDiskID(dev, node)
1698
1699   retries = 0
1700   degr_retries = 10 # in seconds, as we sleep 1 second each time
1701   while True:
1702     max_time = 0
1703     done = True
1704     cumul_degraded = False
1705     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1706     msg = rstats.fail_msg
1707     if msg:
1708       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1709       retries += 1
1710       if retries >= 10:
1711         raise errors.RemoteError("Can't contact node %s for mirror data,"
1712                                  " aborting." % node)
1713       time.sleep(6)
1714       continue
1715     rstats = rstats.payload
1716     retries = 0
1717     for i, mstat in enumerate(rstats):
1718       if mstat is None:
1719         lu.LogWarning("Can't compute data for node %s/%s",
1720                            node, instance.disks[i].iv_name)
1721         continue
1722       # we ignore the ldisk parameter
1723       perc_done, est_time, is_degraded, _ = mstat
1724       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1725       if perc_done is not None:
1726         done = False
1727         if est_time is not None:
1728           rem_time = "%d estimated seconds remaining" % est_time
1729           max_time = est_time
1730         else:
1731           rem_time = "no time estimate"
1732         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1733                         (instance.disks[i].iv_name, perc_done, rem_time))
1734
1735     # if we're done but degraded, let's do a few small retries, to
1736     # make sure we see a stable and not transient situation; therefore
1737     # we force restart of the loop
1738     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1739       logging.info("Degraded disks found, %d retries left", degr_retries)
1740       degr_retries -= 1
1741       time.sleep(1)
1742       continue
1743
1744     if done or oneshot:
1745       break
1746
1747     time.sleep(min(60, max_time))
1748
1749   if done:
1750     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1751   return not cumul_degraded
1752
1753
1754 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1755   """Check that mirrors are not degraded.
1756
1757   The ldisk parameter, if True, will change the test from the
1758   is_degraded attribute (which represents overall non-ok status for
1759   the device(s)) to the ldisk (representing the local storage status).
1760
1761   """
1762   lu.cfg.SetDiskID(dev, node)
1763   if ldisk:
1764     idx = 6
1765   else:
1766     idx = 5
1767
1768   result = True
1769   if on_primary or dev.AssembleOnSecondary():
1770     rstats = lu.rpc.call_blockdev_find(node, dev)
1771     msg = rstats.fail_msg
1772     if msg:
1773       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1774       result = False
1775     elif not rstats.payload:
1776       lu.LogWarning("Can't find disk on node %s", node)
1777       result = False
1778     else:
1779       result = result and (not rstats.payload[idx])
1780   if dev.children:
1781     for child in dev.children:
1782       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1783
1784   return result
1785
1786
1787 class LUDiagnoseOS(NoHooksLU):
1788   """Logical unit for OS diagnose/query.
1789
1790   """
1791   _OP_REQP = ["output_fields", "names"]
1792   REQ_BGL = False
1793   _FIELDS_STATIC = utils.FieldSet()
1794   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1795
1796   def ExpandNames(self):
1797     if self.op.names:
1798       raise errors.OpPrereqError("Selective OS query not supported")
1799
1800     _CheckOutputFields(static=self._FIELDS_STATIC,
1801                        dynamic=self._FIELDS_DYNAMIC,
1802                        selected=self.op.output_fields)
1803
1804     # Lock all nodes, in shared mode
1805     # Temporary removal of locks, should be reverted later
1806     # TODO: reintroduce locks when they are lighter-weight
1807     self.needed_locks = {}
1808     #self.share_locks[locking.LEVEL_NODE] = 1
1809     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1810
1811   def CheckPrereq(self):
1812     """Check prerequisites.
1813
1814     """
1815
1816   @staticmethod
1817   def _DiagnoseByOS(node_list, rlist):
1818     """Remaps a per-node return list into an a per-os per-node dictionary
1819
1820     @param node_list: a list with the names of all nodes
1821     @param rlist: a map with node names as keys and OS objects as values
1822
1823     @rtype: dict
1824     @return: a dictionary with osnames as keys and as value another map, with
1825         nodes as keys and tuples of (path, status, diagnose) as values, eg::
1826
1827           {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1828                                      (/srv/..., False, "invalid api")],
1829                            "node2": [(/srv/..., True, "")]}
1830           }
1831
1832     """
1833     all_os = {}
1834     # we build here the list of nodes that didn't fail the RPC (at RPC
1835     # level), so that nodes with a non-responding node daemon don't
1836     # make all OSes invalid
1837     good_nodes = [node_name for node_name in rlist
1838                   if not rlist[node_name].fail_msg]
1839     for node_name, nr in rlist.items():
1840       if nr.fail_msg or not nr.payload:
1841         continue
1842       for name, path, status, diagnose in nr.payload:
1843         if name not in all_os:
1844           # build a list of nodes for this os containing empty lists
1845           # for each node in node_list
1846           all_os[name] = {}
1847           for nname in good_nodes:
1848             all_os[name][nname] = []
1849         all_os[name][node_name].append((path, status, diagnose))
1850     return all_os
1851
1852   def Exec(self, feedback_fn):
1853     """Compute the list of OSes.
1854
1855     """
1856     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1857     node_data = self.rpc.call_os_diagnose(valid_nodes)
1858     pol = self._DiagnoseByOS(valid_nodes, node_data)
1859     output = []
1860     for os_name, os_data in pol.items():
1861       row = []
1862       for field in self.op.output_fields:
1863         if field == "name":
1864           val = os_name
1865         elif field == "valid":
1866           val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1867         elif field == "node_status":
1868           # this is just a copy of the dict
1869           val = {}
1870           for node_name, nos_list in os_data.items():
1871             val[node_name] = nos_list
1872         else:
1873           raise errors.ParameterError(field)
1874         row.append(val)
1875       output.append(row)
1876
1877     return output
1878
1879
1880 class LURemoveNode(LogicalUnit):
1881   """Logical unit for removing a node.
1882
1883   """
1884   HPATH = "node-remove"
1885   HTYPE = constants.HTYPE_NODE
1886   _OP_REQP = ["node_name"]
1887
1888   def BuildHooksEnv(self):
1889     """Build hooks env.
1890
1891     This doesn't run on the target node in the pre phase as a failed
1892     node would then be impossible to remove.
1893
1894     """
1895     env = {
1896       "OP_TARGET": self.op.node_name,
1897       "NODE_NAME": self.op.node_name,
1898       }
1899     all_nodes = self.cfg.GetNodeList()
1900     all_nodes.remove(self.op.node_name)
1901     return env, all_nodes, all_nodes
1902
1903   def CheckPrereq(self):
1904     """Check prerequisites.
1905
1906     This checks:
1907      - the node exists in the configuration
1908      - it does not have primary or secondary instances
1909      - it's not the master
1910
1911     Any errors are signaled by raising errors.OpPrereqError.
1912
1913     """
1914     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1915     if node is None:
1916       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1917
1918     instance_list = self.cfg.GetInstanceList()
1919
1920     masternode = self.cfg.GetMasterNode()
1921     if node.name == masternode:
1922       raise errors.OpPrereqError("Node is the master node,"
1923                                  " you need to failover first.")
1924
1925     for instance_name in instance_list:
1926       instance = self.cfg.GetInstanceInfo(instance_name)
1927       if node.name in instance.all_nodes:
1928         raise errors.OpPrereqError("Instance %s is still running on the node,"
1929                                    " please remove first." % instance_name)
1930     self.op.node_name = node.name
1931     self.node = node
1932
1933   def Exec(self, feedback_fn):
1934     """Removes the node from the cluster.
1935
1936     """
1937     node = self.node
1938     logging.info("Stopping the node daemon and removing configs from node %s",
1939                  node.name)
1940
1941     self.context.RemoveNode(node.name)
1942
1943     result = self.rpc.call_node_leave_cluster(node.name)
1944     msg = result.fail_msg
1945     if msg:
1946       self.LogWarning("Errors encountered on the remote node while leaving"
1947                       " the cluster: %s", msg)
1948
1949     # Promote nodes to master candidate as needed
1950     _AdjustCandidatePool(self)
1951
1952
1953 class LUQueryNodes(NoHooksLU):
1954   """Logical unit for querying nodes.
1955
1956   """
1957   _OP_REQP = ["output_fields", "names", "use_locking"]
1958   REQ_BGL = False
1959   _FIELDS_DYNAMIC = utils.FieldSet(
1960     "dtotal", "dfree",
1961     "mtotal", "mnode", "mfree",
1962     "bootid",
1963     "ctotal", "cnodes", "csockets",
1964     )
1965
1966   _FIELDS_STATIC = utils.FieldSet(
1967     "name", "pinst_cnt", "sinst_cnt",
1968     "pinst_list", "sinst_list",
1969     "pip", "sip", "tags",
1970     "serial_no",
1971     "master_candidate",
1972     "master",
1973     "offline",
1974     "drained",
1975     "role",
1976     )
1977
1978   def ExpandNames(self):
1979     _CheckOutputFields(static=self._FIELDS_STATIC,
1980                        dynamic=self._FIELDS_DYNAMIC,
1981                        selected=self.op.output_fields)
1982
1983     self.needed_locks = {}
1984     self.share_locks[locking.LEVEL_NODE] = 1
1985
1986     if self.op.names:
1987       self.wanted = _GetWantedNodes(self, self.op.names)
1988     else:
1989       self.wanted = locking.ALL_SET
1990
1991     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1992     self.do_locking = self.do_node_query and self.op.use_locking
1993     if self.do_locking:
1994       # if we don't request only static fields, we need to lock the nodes
1995       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1996
1997
1998   def CheckPrereq(self):
1999     """Check prerequisites.
2000
2001     """
2002     # The validation of the node list is done in the _GetWantedNodes,
2003     # if non empty, and if empty, there's no validation to do
2004     pass
2005
2006   def Exec(self, feedback_fn):
2007     """Computes the list of nodes and their attributes.
2008
2009     """
2010     all_info = self.cfg.GetAllNodesInfo()
2011     if self.do_locking:
2012       nodenames = self.acquired_locks[locking.LEVEL_NODE]
2013     elif self.wanted != locking.ALL_SET:
2014       nodenames = self.wanted
2015       missing = set(nodenames).difference(all_info.keys())
2016       if missing:
2017         raise errors.OpExecError(
2018           "Some nodes were removed before retrieving their data: %s" % missing)
2019     else:
2020       nodenames = all_info.keys()
2021
2022     nodenames = utils.NiceSort(nodenames)
2023     nodelist = [all_info[name] for name in nodenames]
2024
2025     # begin data gathering
2026
2027     if self.do_node_query:
2028       live_data = {}
2029       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2030                                           self.cfg.GetHypervisorType())
2031       for name in nodenames:
2032         nodeinfo = node_data[name]
2033         if not nodeinfo.fail_msg and nodeinfo.payload:
2034           nodeinfo = nodeinfo.payload
2035           fn = utils.TryConvert
2036           live_data[name] = {
2037             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2038             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2039             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2040             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2041             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2042             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2043             "bootid": nodeinfo.get('bootid', None),
2044             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2045             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2046             }
2047         else:
2048           live_data[name] = {}
2049     else:
2050       live_data = dict.fromkeys(nodenames, {})
2051
2052     node_to_primary = dict([(name, set()) for name in nodenames])
2053     node_to_secondary = dict([(name, set()) for name in nodenames])
2054
2055     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2056                              "sinst_cnt", "sinst_list"))
2057     if inst_fields & frozenset(self.op.output_fields):
2058       instancelist = self.cfg.GetInstanceList()
2059
2060       for instance_name in instancelist:
2061         inst = self.cfg.GetInstanceInfo(instance_name)
2062         if inst.primary_node in node_to_primary:
2063           node_to_primary[inst.primary_node].add(inst.name)
2064         for secnode in inst.secondary_nodes:
2065           if secnode in node_to_secondary:
2066             node_to_secondary[secnode].add(inst.name)
2067
2068     master_node = self.cfg.GetMasterNode()
2069
2070     # end data gathering
2071
2072     output = []
2073     for node in nodelist:
2074       node_output = []
2075       for field in self.op.output_fields:
2076         if field == "name":
2077           val = node.name
2078         elif field == "pinst_list":
2079           val = list(node_to_primary[node.name])
2080         elif field == "sinst_list":
2081           val = list(node_to_secondary[node.name])
2082         elif field == "pinst_cnt":
2083           val = len(node_to_primary[node.name])
2084         elif field == "sinst_cnt":
2085           val = len(node_to_secondary[node.name])
2086         elif field == "pip":
2087           val = node.primary_ip
2088         elif field == "sip":
2089           val = node.secondary_ip
2090         elif field == "tags":
2091           val = list(node.GetTags())
2092         elif field == "serial_no":
2093           val = node.serial_no
2094         elif field == "master_candidate":
2095           val = node.master_candidate
2096         elif field == "master":
2097           val = node.name == master_node
2098         elif field == "offline":
2099           val = node.offline
2100         elif field == "drained":
2101           val = node.drained
2102         elif self._FIELDS_DYNAMIC.Matches(field):
2103           val = live_data[node.name].get(field, None)
2104         elif field == "role":
2105           if node.name == master_node:
2106             val = "M"
2107           elif node.master_candidate:
2108             val = "C"
2109           elif node.drained:
2110             val = "D"
2111           elif node.offline:
2112             val = "O"
2113           else:
2114             val = "R"
2115         else:
2116           raise errors.ParameterError(field)
2117         node_output.append(val)
2118       output.append(node_output)
2119
2120     return output
2121
2122
2123 class LUQueryNodeVolumes(NoHooksLU):
2124   """Logical unit for getting volumes on node(s).
2125
2126   """
2127   _OP_REQP = ["nodes", "output_fields"]
2128   REQ_BGL = False
2129   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2130   _FIELDS_STATIC = utils.FieldSet("node")
2131
2132   def ExpandNames(self):
2133     _CheckOutputFields(static=self._FIELDS_STATIC,
2134                        dynamic=self._FIELDS_DYNAMIC,
2135                        selected=self.op.output_fields)
2136
2137     self.needed_locks = {}
2138     self.share_locks[locking.LEVEL_NODE] = 1
2139     if not self.op.nodes:
2140       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2141     else:
2142       self.needed_locks[locking.LEVEL_NODE] = \
2143         _GetWantedNodes(self, self.op.nodes)
2144
2145   def CheckPrereq(self):
2146     """Check prerequisites.
2147
2148     This checks that the fields required are valid output fields.
2149
2150     """
2151     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2152
2153   def Exec(self, feedback_fn):
2154     """Computes the list of nodes and their attributes.
2155
2156     """
2157     nodenames = self.nodes
2158     volumes = self.rpc.call_node_volumes(nodenames)
2159
2160     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2161              in self.cfg.GetInstanceList()]
2162
2163     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2164
2165     output = []
2166     for node in nodenames:
2167       nresult = volumes[node]
2168       if nresult.offline:
2169         continue
2170       msg = nresult.fail_msg
2171       if msg:
2172         self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2173         continue
2174
2175       node_vols = nresult.payload[:]
2176       node_vols.sort(key=lambda vol: vol['dev'])
2177
2178       for vol in node_vols:
2179         node_output = []
2180         for field in self.op.output_fields:
2181           if field == "node":
2182             val = node
2183           elif field == "phys":
2184             val = vol['dev']
2185           elif field == "vg":
2186             val = vol['vg']
2187           elif field == "name":
2188             val = vol['name']
2189           elif field == "size":
2190             val = int(float(vol['size']))
2191           elif field == "instance":
2192             for inst in ilist:
2193               if node not in lv_by_node[inst]:
2194                 continue
2195               if vol['name'] in lv_by_node[inst][node]:
2196                 val = inst.name
2197                 break
2198             else:
2199               val = '-'
2200           else:
2201             raise errors.ParameterError(field)
2202           node_output.append(str(val))
2203
2204         output.append(node_output)
2205
2206     return output
2207
2208
2209 class LUAddNode(LogicalUnit):
2210   """Logical unit for adding node to the cluster.
2211
2212   """
2213   HPATH = "node-add"
2214   HTYPE = constants.HTYPE_NODE
2215   _OP_REQP = ["node_name"]
2216
2217   def BuildHooksEnv(self):
2218     """Build hooks env.
2219
2220     This will run on all nodes before, and on all nodes + the new node after.
2221
2222     """
2223     env = {
2224       "OP_TARGET": self.op.node_name,
2225       "NODE_NAME": self.op.node_name,
2226       "NODE_PIP": self.op.primary_ip,
2227       "NODE_SIP": self.op.secondary_ip,
2228       }
2229     nodes_0 = self.cfg.GetNodeList()
2230     nodes_1 = nodes_0 + [self.op.node_name, ]
2231     return env, nodes_0, nodes_1
2232
2233   def CheckPrereq(self):
2234     """Check prerequisites.
2235
2236     This checks:
2237      - the new node is not already in the config
2238      - it is resolvable
2239      - its parameters (single/dual homed) matches the cluster
2240
2241     Any errors are signaled by raising errors.OpPrereqError.
2242
2243     """
2244     node_name = self.op.node_name
2245     cfg = self.cfg
2246
2247     dns_data = utils.HostInfo(node_name)
2248
2249     node = dns_data.name
2250     primary_ip = self.op.primary_ip = dns_data.ip
2251     secondary_ip = getattr(self.op, "secondary_ip", None)
2252     if secondary_ip is None:
2253       secondary_ip = primary_ip
2254     if not utils.IsValidIP(secondary_ip):
2255       raise errors.OpPrereqError("Invalid secondary IP given")
2256     self.op.secondary_ip = secondary_ip
2257
2258     node_list = cfg.GetNodeList()
2259     if not self.op.readd and node in node_list:
2260       raise errors.OpPrereqError("Node %s is already in the configuration" %
2261                                  node)
2262     elif self.op.readd and node not in node_list:
2263       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2264
2265     for existing_node_name in node_list:
2266       existing_node = cfg.GetNodeInfo(existing_node_name)
2267
2268       if self.op.readd and node == existing_node_name:
2269         if (existing_node.primary_ip != primary_ip or
2270             existing_node.secondary_ip != secondary_ip):
2271           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2272                                      " address configuration as before")
2273         continue
2274
2275       if (existing_node.primary_ip == primary_ip or
2276           existing_node.secondary_ip == primary_ip or
2277           existing_node.primary_ip == secondary_ip or
2278           existing_node.secondary_ip == secondary_ip):
2279         raise errors.OpPrereqError("New node ip address(es) conflict with"
2280                                    " existing node %s" % existing_node.name)
2281
2282     # check that the type of the node (single versus dual homed) is the
2283     # same as for the master
2284     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2285     master_singlehomed = myself.secondary_ip == myself.primary_ip
2286     newbie_singlehomed = secondary_ip == primary_ip
2287     if master_singlehomed != newbie_singlehomed:
2288       if master_singlehomed:
2289         raise errors.OpPrereqError("The master has no private ip but the"
2290                                    " new node has one")
2291       else:
2292         raise errors.OpPrereqError("The master has a private ip but the"
2293                                    " new node doesn't have one")
2294
2295     # checks reachability
2296     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2297       raise errors.OpPrereqError("Node not reachable by ping")
2298
2299     if not newbie_singlehomed:
2300       # check reachability from my secondary ip to newbie's secondary ip
2301       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2302                            source=myself.secondary_ip):
2303         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2304                                    " based ping to noded port")
2305
2306     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2307     if self.op.readd:
2308       exceptions = [node]
2309     else:
2310       exceptions = []
2311     mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2312     # the new node will increase mc_max with one, so:
2313     mc_max = min(mc_max + 1, cp_size)
2314     self.master_candidate = mc_now < mc_max
2315
2316     if self.op.readd:
2317       self.new_node = self.cfg.GetNodeInfo(node)
2318       assert self.new_node is not None, "Can't retrieve locked node %s" % node
2319     else:
2320       self.new_node = objects.Node(name=node,
2321                                    primary_ip=primary_ip,
2322                                    secondary_ip=secondary_ip,
2323                                    master_candidate=self.master_candidate,
2324                                    offline=False, drained=False)
2325
2326   def Exec(self, feedback_fn):
2327     """Adds the new node to the cluster.
2328
2329     """
2330     new_node = self.new_node
2331     node = new_node.name
2332
2333     # for re-adds, reset the offline/drained/master-candidate flags;
2334     # we need to reset here, otherwise offline would prevent RPC calls
2335     # later in the procedure; this also means that if the re-add
2336     # fails, we are left with a non-offlined, broken node
2337     if self.op.readd:
2338       new_node.drained = new_node.offline = False
2339       self.LogInfo("Readding a node, the offline/drained flags were reset")
2340       # if we demote the node, we do cleanup later in the procedure
2341       new_node.master_candidate = self.master_candidate
2342
2343     # notify the user about any possible mc promotion
2344     if new_node.master_candidate:
2345       self.LogInfo("Node will be a master candidate")
2346
2347     # check connectivity
2348     result = self.rpc.call_version([node])[node]
2349     result.Raise("Can't get version information from node %s" % node)
2350     if constants.PROTOCOL_VERSION == result.payload:
2351       logging.info("Communication to node %s fine, sw version %s match",
2352                    node, result.payload)
2353     else:
2354       raise errors.OpExecError("Version mismatch master version %s,"
2355                                " node version %s" %
2356                                (constants.PROTOCOL_VERSION, result.payload))
2357
2358     # setup ssh on node
2359     logging.info("Copy ssh key to node %s", node)
2360     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2361     keyarray = []
2362     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2363                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2364                 priv_key, pub_key]
2365
2366     for i in keyfiles:
2367       f = open(i, 'r')
2368       try:
2369         keyarray.append(f.read())
2370       finally:
2371         f.close()
2372
2373     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2374                                     keyarray[2],
2375                                     keyarray[3], keyarray[4], keyarray[5])
2376     result.Raise("Cannot transfer ssh keys to the new node")
2377
2378     # Add node to our /etc/hosts, and add key to known_hosts
2379     if self.cfg.GetClusterInfo().modify_etc_hosts:
2380       utils.AddHostToEtcHosts(new_node.name)
2381
2382     if new_node.secondary_ip != new_node.primary_ip:
2383       result = self.rpc.call_node_has_ip_address(new_node.name,
2384                                                  new_node.secondary_ip)
2385       result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2386                    prereq=True)
2387       if not result.payload:
2388         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2389                                  " you gave (%s). Please fix and re-run this"
2390                                  " command." % new_node.secondary_ip)
2391
2392     node_verify_list = [self.cfg.GetMasterNode()]
2393     node_verify_param = {
2394       'nodelist': [node],
2395       # TODO: do a node-net-test as well?
2396     }
2397
2398     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2399                                        self.cfg.GetClusterName())
2400     for verifier in node_verify_list:
2401       result[verifier].Raise("Cannot communicate with node %s" % verifier)
2402       nl_payload = result[verifier].payload['nodelist']
2403       if nl_payload:
2404         for failed in nl_payload:
2405           feedback_fn("ssh/hostname verification failed %s -> %s" %
2406                       (verifier, nl_payload[failed]))
2407         raise errors.OpExecError("ssh/hostname verification failed.")
2408
2409     if self.op.readd:
2410       _RedistributeAncillaryFiles(self)
2411       self.context.ReaddNode(new_node)
2412       # make sure we redistribute the config
2413       self.cfg.Update(new_node)
2414       # and make sure the new node will not have old files around
2415       if not new_node.master_candidate:
2416         result = self.rpc.call_node_demote_from_mc(new_node.name)
2417         msg = result.RemoteFailMsg()
2418         if msg:
2419           self.LogWarning("Node failed to demote itself from master"
2420                           " candidate status: %s" % msg)
2421     else:
2422       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2423       self.context.AddNode(new_node)
2424
2425
2426 class LUSetNodeParams(LogicalUnit):
2427   """Modifies the parameters of a node.
2428
2429   """
2430   HPATH = "node-modify"
2431   HTYPE = constants.HTYPE_NODE
2432   _OP_REQP = ["node_name"]
2433   REQ_BGL = False
2434
2435   def CheckArguments(self):
2436     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2437     if node_name is None:
2438       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2439     self.op.node_name = node_name
2440     _CheckBooleanOpField(self.op, 'master_candidate')
2441     _CheckBooleanOpField(self.op, 'offline')
2442     _CheckBooleanOpField(self.op, 'drained')
2443     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2444     if all_mods.count(None) == 3:
2445       raise errors.OpPrereqError("Please pass at least one modification")
2446     if all_mods.count(True) > 1:
2447       raise errors.OpPrereqError("Can't set the node into more than one"
2448                                  " state at the same time")
2449
2450   def ExpandNames(self):
2451     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2452
2453   def BuildHooksEnv(self):
2454     """Build hooks env.
2455
2456     This runs on the master node.
2457
2458     """
2459     env = {
2460       "OP_TARGET": self.op.node_name,
2461       "MASTER_CANDIDATE": str(self.op.master_candidate),
2462       "OFFLINE": str(self.op.offline),
2463       "DRAINED": str(self.op.drained),
2464       }
2465     nl = [self.cfg.GetMasterNode(),
2466           self.op.node_name]
2467     return env, nl, nl
2468
2469   def CheckPrereq(self):
2470     """Check prerequisites.
2471
2472     This only checks the instance list against the existing names.
2473
2474     """
2475     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2476
2477     if ((self.op.master_candidate == False or self.op.offline == True or
2478          self.op.drained == True) and node.master_candidate):
2479       # we will demote the node from master_candidate
2480       if self.op.node_name == self.cfg.GetMasterNode():
2481         raise errors.OpPrereqError("The master node has to be a"
2482                                    " master candidate, online and not drained")
2483       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2484       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2485       if num_candidates <= cp_size:
2486         msg = ("Not enough master candidates (desired"
2487                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2488         if self.op.force:
2489           self.LogWarning(msg)
2490         else:
2491           raise errors.OpPrereqError(msg)
2492
2493     if (self.op.master_candidate == True and
2494         ((node.offline and not self.op.offline == False) or
2495          (node.drained and not self.op.drained == False))):
2496       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2497                                  " to master_candidate" % node.name)
2498
2499     return
2500
2501   def Exec(self, feedback_fn):
2502     """Modifies a node.
2503
2504     """
2505     node = self.node
2506
2507     result = []
2508     changed_mc = False
2509
2510     if self.op.offline is not None:
2511       node.offline = self.op.offline
2512       result.append(("offline", str(self.op.offline)))
2513       if self.op.offline == True:
2514         if node.master_candidate:
2515           node.master_candidate = False
2516           changed_mc = True
2517           result.append(("master_candidate", "auto-demotion due to offline"))
2518         if node.drained:
2519           node.drained = False
2520           result.append(("drained", "clear drained status due to offline"))
2521
2522     if self.op.master_candidate is not None:
2523       node.master_candidate = self.op.master_candidate
2524       changed_mc = True
2525       result.append(("master_candidate", str(self.op.master_candidate)))
2526       if self.op.master_candidate == False:
2527         rrc = self.rpc.call_node_demote_from_mc(node.name)
2528         msg = rrc.fail_msg
2529         if msg:
2530           self.LogWarning("Node failed to demote itself: %s" % msg)
2531
2532     if self.op.drained is not None:
2533       node.drained = self.op.drained
2534       result.append(("drained", str(self.op.drained)))
2535       if self.op.drained == True:
2536         if node.master_candidate:
2537           node.master_candidate = False
2538           changed_mc = True
2539           result.append(("master_candidate", "auto-demotion due to drain"))
2540           rrc = self.rpc.call_node_demote_from_mc(node.name)
2541           msg = rrc.RemoteFailMsg()
2542           if msg:
2543             self.LogWarning("Node failed to demote itself: %s" % msg)
2544         if node.offline:
2545           node.offline = False
2546           result.append(("offline", "clear offline status due to drain"))
2547
2548     # this will trigger configuration file update, if needed
2549     self.cfg.Update(node)
2550     # this will trigger job queue propagation or cleanup
2551     if changed_mc:
2552       self.context.ReaddNode(node)
2553
2554     return result
2555
2556
2557 class LUPowercycleNode(NoHooksLU):
2558   """Powercycles a node.
2559
2560   """
2561   _OP_REQP = ["node_name", "force"]
2562   REQ_BGL = False
2563
2564   def CheckArguments(self):
2565     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2566     if node_name is None:
2567       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2568     self.op.node_name = node_name
2569     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2570       raise errors.OpPrereqError("The node is the master and the force"
2571                                  " parameter was not set")
2572
2573   def ExpandNames(self):
2574     """Locking for PowercycleNode.
2575
2576     This is a last-resource option and shouldn't block on other
2577     jobs. Therefore, we grab no locks.
2578
2579     """
2580     self.needed_locks = {}
2581
2582   def CheckPrereq(self):
2583     """Check prerequisites.
2584
2585     This LU has no prereqs.
2586
2587     """
2588     pass
2589
2590   def Exec(self, feedback_fn):
2591     """Reboots a node.
2592
2593     """
2594     result = self.rpc.call_node_powercycle(self.op.node_name,
2595                                            self.cfg.GetHypervisorType())
2596     result.Raise("Failed to schedule the reboot")
2597     return result.payload
2598
2599
2600 class LUQueryClusterInfo(NoHooksLU):
2601   """Query cluster configuration.
2602
2603   """
2604   _OP_REQP = []
2605   REQ_BGL = False
2606
2607   def ExpandNames(self):
2608     self.needed_locks = {}
2609
2610   def CheckPrereq(self):
2611     """No prerequsites needed for this LU.
2612
2613     """
2614     pass
2615
2616   def Exec(self, feedback_fn):
2617     """Return cluster config.
2618
2619     """
2620     cluster = self.cfg.GetClusterInfo()
2621     result = {
2622       "software_version": constants.RELEASE_VERSION,
2623       "protocol_version": constants.PROTOCOL_VERSION,
2624       "config_version": constants.CONFIG_VERSION,
2625       "os_api_version": max(constants.OS_API_VERSIONS),
2626       "export_version": constants.EXPORT_VERSION,
2627       "architecture": (platform.architecture()[0], platform.machine()),
2628       "name": cluster.cluster_name,
2629       "master": cluster.master_node,
2630       "default_hypervisor": cluster.default_hypervisor,
2631       "enabled_hypervisors": cluster.enabled_hypervisors,
2632       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2633                         for hypervisor_name in cluster.enabled_hypervisors]),
2634       "beparams": cluster.beparams,
2635       "nicparams": cluster.nicparams,
2636       "candidate_pool_size": cluster.candidate_pool_size,
2637       "master_netdev": cluster.master_netdev,
2638       "volume_group_name": cluster.volume_group_name,
2639       "file_storage_dir": cluster.file_storage_dir,
2640       }
2641
2642     return result
2643
2644
2645 class LUQueryConfigValues(NoHooksLU):
2646   """Return configuration values.
2647
2648   """
2649   _OP_REQP = []
2650   REQ_BGL = False
2651   _FIELDS_DYNAMIC = utils.FieldSet()
2652   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2653
2654   def ExpandNames(self):
2655     self.needed_locks = {}
2656
2657     _CheckOutputFields(static=self._FIELDS_STATIC,
2658                        dynamic=self._FIELDS_DYNAMIC,
2659                        selected=self.op.output_fields)
2660
2661   def CheckPrereq(self):
2662     """No prerequisites.
2663
2664     """
2665     pass
2666
2667   def Exec(self, feedback_fn):
2668     """Dump a representation of the cluster config to the standard output.
2669
2670     """
2671     values = []
2672     for field in self.op.output_fields:
2673       if field == "cluster_name":
2674         entry = self.cfg.GetClusterName()
2675       elif field == "master_node":
2676         entry = self.cfg.GetMasterNode()
2677       elif field == "drain_flag":
2678         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2679       else:
2680         raise errors.ParameterError(field)
2681       values.append(entry)
2682     return values
2683
2684
2685 class LUActivateInstanceDisks(NoHooksLU):
2686   """Bring up an instance's disks.
2687
2688   """
2689   _OP_REQP = ["instance_name"]
2690   REQ_BGL = False
2691
2692   def ExpandNames(self):
2693     self._ExpandAndLockInstance()
2694     self.needed_locks[locking.LEVEL_NODE] = []
2695     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2696
2697   def DeclareLocks(self, level):
2698     if level == locking.LEVEL_NODE:
2699       self._LockInstancesNodes()
2700
2701   def CheckPrereq(self):
2702     """Check prerequisites.
2703
2704     This checks that the instance is in the cluster.
2705
2706     """
2707     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2708     assert self.instance is not None, \
2709       "Cannot retrieve locked instance %s" % self.op.instance_name
2710     _CheckNodeOnline(self, self.instance.primary_node)
2711
2712   def Exec(self, feedback_fn):
2713     """Activate the disks.
2714
2715     """
2716     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2717     if not disks_ok:
2718       raise errors.OpExecError("Cannot activate block devices")
2719
2720     return disks_info
2721
2722
2723 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2724   """Prepare the block devices for an instance.
2725
2726   This sets up the block devices on all nodes.
2727
2728   @type lu: L{LogicalUnit}
2729   @param lu: the logical unit on whose behalf we execute
2730   @type instance: L{objects.Instance}
2731   @param instance: the instance for whose disks we assemble
2732   @type ignore_secondaries: boolean
2733   @param ignore_secondaries: if true, errors on secondary nodes
2734       won't result in an error return from the function
2735   @return: False if the operation failed, otherwise a list of
2736       (host, instance_visible_name, node_visible_name)
2737       with the mapping from node devices to instance devices
2738
2739   """
2740   device_info = []
2741   disks_ok = True
2742   iname = instance.name
2743   # With the two passes mechanism we try to reduce the window of
2744   # opportunity for the race condition of switching DRBD to primary
2745   # before handshaking occured, but we do not eliminate it
2746
2747   # The proper fix would be to wait (with some limits) until the
2748   # connection has been made and drbd transitions from WFConnection
2749   # into any other network-connected state (Connected, SyncTarget,
2750   # SyncSource, etc.)
2751
2752   # 1st pass, assemble on all nodes in secondary mode
2753   for inst_disk in instance.disks:
2754     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2755       lu.cfg.SetDiskID(node_disk, node)
2756       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2757       msg = result.fail_msg
2758       if msg:
2759         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2760                            " (is_primary=False, pass=1): %s",
2761                            inst_disk.iv_name, node, msg)
2762         if not ignore_secondaries:
2763           disks_ok = False
2764
2765   # FIXME: race condition on drbd migration to primary
2766
2767   # 2nd pass, do only the primary node
2768   for inst_disk in instance.disks:
2769     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2770       if node != instance.primary_node:
2771         continue
2772       lu.cfg.SetDiskID(node_disk, node)
2773       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2774       msg = result.fail_msg
2775       if msg:
2776         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2777                            " (is_primary=True, pass=2): %s",
2778                            inst_disk.iv_name, node, msg)
2779         disks_ok = False
2780     device_info.append((instance.primary_node, inst_disk.iv_name,
2781                         result.payload))
2782
2783   # leave the disks configured for the primary node
2784   # this is a workaround that would be fixed better by
2785   # improving the logical/physical id handling
2786   for disk in instance.disks:
2787     lu.cfg.SetDiskID(disk, instance.primary_node)
2788
2789   return disks_ok, device_info
2790
2791
2792 def _StartInstanceDisks(lu, instance, force):
2793   """Start the disks of an instance.
2794
2795   """
2796   disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2797                                            ignore_secondaries=force)
2798   if not disks_ok:
2799     _ShutdownInstanceDisks(lu, instance)
2800     if force is not None and not force:
2801       lu.proc.LogWarning("", hint="If the message above refers to a"
2802                          " secondary node,"
2803                          " you can retry the operation using '--force'.")
2804     raise errors.OpExecError("Disk consistency error")
2805
2806
2807 class LUDeactivateInstanceDisks(NoHooksLU):
2808   """Shutdown an instance's disks.
2809
2810   """
2811   _OP_REQP = ["instance_name"]
2812   REQ_BGL = False
2813
2814   def ExpandNames(self):
2815     self._ExpandAndLockInstance()
2816     self.needed_locks[locking.LEVEL_NODE] = []
2817     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2818
2819   def DeclareLocks(self, level):
2820     if level == locking.LEVEL_NODE:
2821       self._LockInstancesNodes()
2822
2823   def CheckPrereq(self):
2824     """Check prerequisites.
2825
2826     This checks that the instance is in the cluster.
2827
2828     """
2829     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2830     assert self.instance is not None, \
2831       "Cannot retrieve locked instance %s" % self.op.instance_name
2832
2833   def Exec(self, feedback_fn):
2834     """Deactivate the disks
2835
2836     """
2837     instance = self.instance
2838     _SafeShutdownInstanceDisks(self, instance)
2839
2840
2841 def _SafeShutdownInstanceDisks(lu, instance):
2842   """Shutdown block devices of an instance.
2843
2844   This function checks if an instance is running, before calling
2845   _ShutdownInstanceDisks.
2846
2847   """
2848   pnode = instance.primary_node
2849   ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2850   ins_l.Raise("Can't contact node %s" % pnode)
2851
2852   if instance.name in ins_l.payload:
2853     raise errors.OpExecError("Instance is running, can't shutdown"
2854                              " block devices.")
2855
2856   _ShutdownInstanceDisks(lu, instance)
2857
2858
2859 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2860   """Shutdown block devices of an instance.
2861
2862   This does the shutdown on all nodes of the instance.
2863
2864   If the ignore_primary is false, errors on the primary node are
2865   ignored.
2866
2867   """
2868   all_result = True
2869   for disk in instance.disks:
2870     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2871       lu.cfg.SetDiskID(top_disk, node)
2872       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2873       msg = result.fail_msg
2874       if msg:
2875         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2876                       disk.iv_name, node, msg)
2877         if not ignore_primary or node != instance.primary_node:
2878           all_result = False
2879   return all_result
2880
2881
2882 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2883   """Checks if a node has enough free memory.
2884
2885   This function check if a given node has the needed amount of free
2886   memory. In case the node has less memory or we cannot get the
2887   information from the node, this function raise an OpPrereqError
2888   exception.
2889
2890   @type lu: C{LogicalUnit}
2891   @param lu: a logical unit from which we get configuration data
2892   @type node: C{str}
2893   @param node: the node to check
2894   @type reason: C{str}
2895   @param reason: string to use in the error message
2896   @type requested: C{int}
2897   @param requested: the amount of memory in MiB to check for
2898   @type hypervisor_name: C{str}
2899   @param hypervisor_name: the hypervisor to ask for memory stats
2900   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2901       we cannot check the node
2902
2903   """
2904   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2905   nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2906   free_mem = nodeinfo[node].payload.get('memory_free', None)
2907   if not isinstance(free_mem, int):
2908     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2909                                " was '%s'" % (node, free_mem))
2910   if requested > free_mem:
2911     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2912                                " needed %s MiB, available %s MiB" %
2913                                (node, reason, requested, free_mem))
2914
2915
2916 class LUStartupInstance(LogicalUnit):
2917   """Starts an instance.
2918
2919   """
2920   HPATH = "instance-start"
2921   HTYPE = constants.HTYPE_INSTANCE
2922   _OP_REQP = ["instance_name", "force"]
2923   REQ_BGL = False
2924
2925   def ExpandNames(self):
2926     self._ExpandAndLockInstance()
2927
2928   def BuildHooksEnv(self):
2929     """Build hooks env.
2930
2931     This runs on master, primary and secondary nodes of the instance.
2932
2933     """
2934     env = {
2935       "FORCE": self.op.force,
2936       }
2937     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2938     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2939     return env, nl, nl
2940
2941   def CheckPrereq(self):
2942     """Check prerequisites.
2943
2944     This checks that the instance is in the cluster.
2945
2946     """
2947     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2948     assert self.instance is not None, \
2949       "Cannot retrieve locked instance %s" % self.op.instance_name
2950
2951     # extra beparams
2952     self.beparams = getattr(self.op, "beparams", {})
2953     if self.beparams:
2954       if not isinstance(self.beparams, dict):
2955         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2956                                    " dict" % (type(self.beparams), ))
2957       # fill the beparams dict
2958       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2959       self.op.beparams = self.beparams
2960
2961     # extra hvparams
2962     self.hvparams = getattr(self.op, "hvparams", {})
2963     if self.hvparams:
2964       if not isinstance(self.hvparams, dict):
2965         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2966                                    " dict" % (type(self.hvparams), ))
2967
2968       # check hypervisor parameter syntax (locally)
2969       cluster = self.cfg.GetClusterInfo()
2970       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2971       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2972                                     instance.hvparams)
2973       filled_hvp.update(self.hvparams)
2974       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2975       hv_type.CheckParameterSyntax(filled_hvp)
2976       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2977       self.op.hvparams = self.hvparams
2978
2979     _CheckNodeOnline(self, instance.primary_node)
2980
2981     bep = self.cfg.GetClusterInfo().FillBE(instance)
2982     # check bridges existence
2983     _CheckInstanceBridgesExist(self, instance)
2984
2985     remote_info = self.rpc.call_instance_info(instance.primary_node,
2986                                               instance.name,
2987                                               instance.hypervisor)
2988     remote_info.Raise("Error checking node %s" % instance.primary_node,
2989                       prereq=True)
2990     if not remote_info.payload: # not running already
2991       _CheckNodeFreeMemory(self, instance.primary_node,
2992                            "starting instance %s" % instance.name,
2993                            bep[constants.BE_MEMORY], instance.hypervisor)
2994
2995   def Exec(self, feedback_fn):
2996     """Start the instance.
2997
2998     """
2999     instance = self.instance
3000     force = self.op.force
3001
3002     self.cfg.MarkInstanceUp(instance.name)
3003
3004     node_current = instance.primary_node
3005
3006     _StartInstanceDisks(self, instance, force)
3007
3008     result = self.rpc.call_instance_start(node_current, instance,
3009                                           self.hvparams, self.beparams)
3010     msg = result.fail_msg
3011     if msg:
3012       _ShutdownInstanceDisks(self, instance)
3013       raise errors.OpExecError("Could not start instance: %s" % msg)
3014
3015
3016 class LURebootInstance(LogicalUnit):
3017   """Reboot an instance.
3018
3019   """
3020   HPATH = "instance-reboot"
3021   HTYPE = constants.HTYPE_INSTANCE
3022   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3023   REQ_BGL = False
3024
3025   def ExpandNames(self):
3026     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3027                                    constants.INSTANCE_REBOOT_HARD,
3028                                    constants.INSTANCE_REBOOT_FULL]:
3029       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3030                                   (constants.INSTANCE_REBOOT_SOFT,
3031                                    constants.INSTANCE_REBOOT_HARD,
3032                                    constants.INSTANCE_REBOOT_FULL))
3033     self._ExpandAndLockInstance()
3034
3035   def BuildHooksEnv(self):
3036     """Build hooks env.
3037
3038     This runs on master, primary and secondary nodes of the instance.
3039
3040     """
3041     env = {
3042       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3043       "REBOOT_TYPE": self.op.reboot_type,
3044       }
3045     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3046     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3047     return env, nl, nl
3048
3049   def CheckPrereq(self):
3050     """Check prerequisites.
3051
3052     This checks that the instance is in the cluster.
3053
3054     """
3055     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3056     assert self.instance is not None, \
3057       "Cannot retrieve locked instance %s" % self.op.instance_name
3058
3059     _CheckNodeOnline(self, instance.primary_node)
3060
3061     # check bridges existence
3062     _CheckInstanceBridgesExist(self, instance)
3063
3064   def Exec(self, feedback_fn):
3065     """Reboot the instance.
3066
3067     """
3068     instance = self.instance
3069     ignore_secondaries = self.op.ignore_secondaries
3070     reboot_type = self.op.reboot_type
3071
3072     node_current = instance.primary_node
3073
3074     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3075                        constants.INSTANCE_REBOOT_HARD]:
3076       for disk in instance.disks:
3077         self.cfg.SetDiskID(disk, node_current)
3078       result = self.rpc.call_instance_reboot(node_current, instance,
3079                                              reboot_type)
3080       result.Raise("Could not reboot instance")
3081     else:
3082       result = self.rpc.call_instance_shutdown(node_current, instance)
3083       result.Raise("Could not shutdown instance for full reboot")
3084       _ShutdownInstanceDisks(self, instance)
3085       _StartInstanceDisks(self, instance, ignore_secondaries)
3086       result = self.rpc.call_instance_start(node_current, instance, None, None)
3087       msg = result.fail_msg
3088       if msg:
3089         _ShutdownInstanceDisks(self, instance)
3090         raise errors.OpExecError("Could not start instance for"
3091                                  " full reboot: %s" % msg)
3092
3093     self.cfg.MarkInstanceUp(instance.name)
3094
3095
3096 class LUShutdownInstance(LogicalUnit):
3097   """Shutdown an instance.
3098
3099   """
3100   HPATH = "instance-stop"
3101   HTYPE = constants.HTYPE_INSTANCE
3102   _OP_REQP = ["instance_name"]
3103   REQ_BGL = False
3104
3105   def ExpandNames(self):
3106     self._ExpandAndLockInstance()
3107
3108   def BuildHooksEnv(self):
3109     """Build hooks env.
3110
3111     This runs on master, primary and secondary nodes of the instance.
3112
3113     """
3114     env = _BuildInstanceHookEnvByObject(self, self.instance)
3115     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3116     return env, nl, nl
3117
3118   def CheckPrereq(self):
3119     """Check prerequisites.
3120
3121     This checks that the instance is in the cluster.
3122
3123     """
3124     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3125     assert self.instance is not None, \
3126       "Cannot retrieve locked instance %s" % self.op.instance_name
3127     _CheckNodeOnline(self, self.instance.primary_node)
3128
3129   def Exec(self, feedback_fn):
3130     """Shutdown the instance.
3131
3132     """
3133     instance = self.instance
3134     node_current = instance.primary_node
3135     self.cfg.MarkInstanceDown(instance.name)
3136     result = self.rpc.call_instance_shutdown(node_current, instance)
3137     msg = result.fail_msg
3138     if msg:
3139       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3140
3141     _ShutdownInstanceDisks(self, instance)
3142
3143
3144 class LUReinstallInstance(LogicalUnit):
3145   """Reinstall an instance.
3146
3147   """
3148   HPATH = "instance-reinstall"
3149   HTYPE = constants.HTYPE_INSTANCE
3150   _OP_REQP = ["instance_name"]
3151   REQ_BGL = False
3152
3153   def ExpandNames(self):
3154     self._ExpandAndLockInstance()
3155
3156   def BuildHooksEnv(self):
3157     """Build hooks env.
3158
3159     This runs on master, primary and secondary nodes of the instance.
3160
3161     """
3162     env = _BuildInstanceHookEnvByObject(self, self.instance)
3163     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3164     return env, nl, nl
3165
3166   def CheckPrereq(self):
3167     """Check prerequisites.
3168
3169     This checks that the instance is in the cluster and is not running.
3170
3171     """
3172     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3173     assert instance is not None, \
3174       "Cannot retrieve locked instance %s" % self.op.instance_name
3175     _CheckNodeOnline(self, instance.primary_node)
3176
3177     if instance.disk_template == constants.DT_DISKLESS:
3178       raise errors.OpPrereqError("Instance '%s' has no disks" %
3179                                  self.op.instance_name)
3180     if instance.admin_up:
3181       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3182                                  self.op.instance_name)
3183     remote_info = self.rpc.call_instance_info(instance.primary_node,
3184                                               instance.name,
3185                                               instance.hypervisor)
3186     remote_info.Raise("Error checking node %s" % instance.primary_node,
3187                       prereq=True)
3188     if remote_info.payload:
3189       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3190                                  (self.op.instance_name,
3191                                   instance.primary_node))
3192
3193     self.op.os_type = getattr(self.op, "os_type", None)
3194     if self.op.os_type is not None:
3195       # OS verification
3196       pnode = self.cfg.GetNodeInfo(
3197         self.cfg.ExpandNodeName(instance.primary_node))
3198       if pnode is None:
3199         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3200                                    self.op.pnode)
3201       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3202       result.Raise("OS '%s' not in supported OS list for primary node %s" %
3203                    (self.op.os_type, pnode.name), prereq=True)
3204
3205     self.instance = instance
3206
3207   def Exec(self, feedback_fn):
3208     """Reinstall the instance.
3209
3210     """
3211     inst = self.instance
3212
3213     if self.op.os_type is not None:
3214       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3215       inst.os = self.op.os_type
3216       self.cfg.Update(inst)
3217
3218     _StartInstanceDisks(self, inst, None)
3219     try:
3220       feedback_fn("Running the instance OS create scripts...")
3221       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3222       result.Raise("Could not install OS for instance %s on node %s" %
3223                    (inst.name, inst.primary_node))
3224     finally:
3225       _ShutdownInstanceDisks(self, inst)
3226
3227
3228 class LURenameInstance(LogicalUnit):
3229   """Rename an instance.
3230
3231   """
3232   HPATH = "instance-rename"
3233   HTYPE = constants.HTYPE_INSTANCE
3234   _OP_REQP = ["instance_name", "new_name"]
3235
3236   def BuildHooksEnv(self):
3237     """Build hooks env.
3238
3239     This runs on master, primary and secondary nodes of the instance.
3240
3241     """
3242     env = _BuildInstanceHookEnvByObject(self, self.instance)
3243     env["INSTANCE_NEW_NAME"] = self.op.new_name
3244     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3245     return env, nl, nl
3246
3247   def CheckPrereq(self):
3248     """Check prerequisites.
3249
3250     This checks that the instance is in the cluster and is not running.
3251
3252     """
3253     instance = self.cfg.GetInstanceInfo(
3254       self.cfg.ExpandInstanceName(self.op.instance_name))
3255     if instance is None:
3256       raise errors.OpPrereqError("Instance '%s' not known" %
3257                                  self.op.instance_name)
3258     _CheckNodeOnline(self, instance.primary_node)
3259
3260     if instance.admin_up:
3261       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3262                                  self.op.instance_name)
3263     remote_info = self.rpc.call_instance_info(instance.primary_node,
3264                                               instance.name,
3265                                               instance.hypervisor)
3266     remote_info.Raise("Error checking node %s" % instance.primary_node,
3267                       prereq=True)
3268     if remote_info.payload:
3269       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3270                                  (self.op.instance_name,
3271                                   instance.primary_node))
3272     self.instance = instance
3273
3274     # new name verification
3275     name_info = utils.HostInfo(self.op.new_name)
3276
3277     self.op.new_name = new_name = name_info.name
3278     instance_list = self.cfg.GetInstanceList()
3279     if new_name in instance_list:
3280       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3281                                  new_name)
3282
3283     if not getattr(self.op, "ignore_ip", False):
3284       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3285         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3286                                    (name_info.ip, new_name))
3287
3288
3289   def Exec(self, feedback_fn):
3290     """Reinstall the instance.
3291
3292     """
3293     inst = self.instance
3294     old_name = inst.name
3295
3296     if inst.disk_template == constants.DT_FILE:
3297       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3298
3299     self.cfg.RenameInstance(inst.name, self.op.new_name)
3300     # Change the instance lock. This is definitely safe while we hold the BGL
3301     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3302     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3303
3304     # re-read the instance from the configuration after rename
3305     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3306
3307     if inst.disk_template == constants.DT_FILE:
3308       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3309       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3310                                                      old_file_storage_dir,
3311                                                      new_file_storage_dir)
3312       result.Raise("Could not rename on node %s directory '%s' to '%s'"
3313                    " (but the instance has been renamed in Ganeti)" %
3314                    (inst.primary_node, old_file_storage_dir,
3315                     new_file_storage_dir))
3316
3317     _StartInstanceDisks(self, inst, None)
3318     try:
3319       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3320                                                  old_name)
3321       msg = result.fail_msg
3322       if msg:
3323         msg = ("Could not run OS rename script for instance %s on node %s"
3324                " (but the instance has been renamed in Ganeti): %s" %
3325                (inst.name, inst.primary_node, msg))
3326         self.proc.LogWarning(msg)
3327     finally:
3328       _ShutdownInstanceDisks(self, inst)
3329
3330
3331 class LURemoveInstance(LogicalUnit):
3332   """Remove an instance.
3333
3334   """
3335   HPATH = "instance-remove"
3336   HTYPE = constants.HTYPE_INSTANCE
3337   _OP_REQP = ["instance_name", "ignore_failures"]
3338   REQ_BGL = False
3339
3340   def ExpandNames(self):
3341     self._ExpandAndLockInstance()
3342     self.needed_locks[locking.LEVEL_NODE] = []
3343     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3344
3345   def DeclareLocks(self, level):
3346     if level == locking.LEVEL_NODE:
3347       self._LockInstancesNodes()
3348
3349   def BuildHooksEnv(self):
3350     """Build hooks env.
3351
3352     This runs on master, primary and secondary nodes of the instance.
3353
3354     """
3355     env = _BuildInstanceHookEnvByObject(self, self.instance)
3356     nl = [self.cfg.GetMasterNode()]
3357     return env, nl, nl
3358
3359   def CheckPrereq(self):
3360     """Check prerequisites.
3361
3362     This checks that the instance is in the cluster.
3363
3364     """
3365     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3366     assert self.instance is not None, \
3367       "Cannot retrieve locked instance %s" % self.op.instance_name
3368
3369   def Exec(self, feedback_fn):
3370     """Remove the instance.
3371
3372     """
3373     instance = self.instance
3374     logging.info("Shutting down instance %s on node %s",
3375                  instance.name, instance.primary_node)
3376
3377     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3378     msg = result.fail_msg
3379     if msg:
3380       if self.op.ignore_failures:
3381         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3382       else:
3383         raise errors.OpExecError("Could not shutdown instance %s on"
3384                                  " node %s: %s" %
3385                                  (instance.name, instance.primary_node, msg))
3386
3387     logging.info("Removing block devices for instance %s", instance.name)
3388
3389     if not _RemoveDisks(self, instance):
3390       if self.op.ignore_failures:
3391         feedback_fn("Warning: can't remove instance's disks")
3392       else:
3393         raise errors.OpExecError("Can't remove instance's disks")
3394
3395     logging.info("Removing instance %s out of cluster config", instance.name)
3396
3397     self.cfg.RemoveInstance(instance.name)
3398     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3399
3400
3401 class LUQueryInstances(NoHooksLU):
3402   """Logical unit for querying instances.
3403
3404   """
3405   _OP_REQP = ["output_fields", "names", "use_locking"]
3406   REQ_BGL = False
3407   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3408                                     "admin_state",
3409                                     "disk_template", "ip", "mac", "bridge",
3410                                     "nic_mode", "nic_link",
3411                                     "sda_size", "sdb_size", "vcpus", "tags",
3412                                     "network_port", "beparams",
3413                                     r"(disk)\.(size)/([0-9]+)",
3414                                     r"(disk)\.(sizes)", "disk_usage",
3415                                     r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3416                                     r"(nic)\.(bridge)/([0-9]+)",
3417                                     r"(nic)\.(macs|ips|modes|links|bridges)",
3418                                     r"(disk|nic)\.(count)",
3419                                     "serial_no", "hypervisor", "hvparams",] +
3420                                   ["hv/%s" % name
3421                                    for name in constants.HVS_PARAMETERS] +
3422                                   ["be/%s" % name
3423                                    for name in constants.BES_PARAMETERS])
3424   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3425
3426
3427   def ExpandNames(self):
3428     _CheckOutputFields(static=self._FIELDS_STATIC,
3429                        dynamic=self._FIELDS_DYNAMIC,
3430                        selected=self.op.output_fields)
3431
3432     self.needed_locks = {}
3433     self.share_locks[locking.LEVEL_INSTANCE] = 1
3434     self.share_locks[locking.LEVEL_NODE] = 1
3435
3436     if self.op.names:
3437       self.wanted = _GetWantedInstances(self, self.op.names)
3438     else:
3439       self.wanted = locking.ALL_SET
3440
3441     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3442     self.do_locking = self.do_node_query and self.op.use_locking
3443     if self.do_locking:
3444       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3445       self.needed_locks[locking.LEVEL_NODE] = []
3446       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3447
3448   def DeclareLocks(self, level):
3449     if level == locking.LEVEL_NODE and self.do_locking:
3450       self._LockInstancesNodes()
3451
3452   def CheckPrereq(self):
3453     """Check prerequisites.
3454
3455     """
3456     pass
3457
3458   def Exec(self, feedback_fn):
3459     """Computes the list of nodes and their attributes.
3460
3461     """
3462     all_info = self.cfg.GetAllInstancesInfo()
3463     if self.wanted == locking.ALL_SET:
3464       # caller didn't specify instance names, so ordering is not important
3465       if self.do_locking:
3466         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3467       else:
3468         instance_names = all_info.keys()
3469       instance_names = utils.NiceSort(instance_names)
3470     else:
3471       # caller did specify names, so we must keep the ordering
3472       if self.do_locking:
3473         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3474       else:
3475         tgt_set = all_info.keys()
3476       missing = set(self.wanted).difference(tgt_set)
3477       if missing:
3478         raise errors.OpExecError("Some instances were removed before"
3479                                  " retrieving their data: %s" % missing)
3480       instance_names = self.wanted
3481
3482     instance_list = [all_info[iname] for iname in instance_names]
3483
3484     # begin data gathering
3485
3486     nodes = frozenset([inst.primary_node for inst in instance_list])
3487     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3488
3489     bad_nodes = []
3490     off_nodes = []
3491     if self.do_node_query:
3492       live_data = {}
3493       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3494       for name in nodes:
3495         result = node_data[name]
3496         if result.offline:
3497           # offline nodes will be in both lists
3498           off_nodes.append(name)
3499         if result.failed or result.fail_msg:
3500           bad_nodes.append(name)
3501         else:
3502           if result.payload:
3503             live_data.update(result.payload)
3504           # else no instance is alive
3505     else:
3506       live_data = dict([(name, {}) for name in instance_names])
3507
3508     # end data gathering
3509
3510     HVPREFIX = "hv/"
3511     BEPREFIX = "be/"
3512     output = []
3513     cluster = self.cfg.GetClusterInfo()
3514     for instance in instance_list:
3515       iout = []
3516       i_hv = cluster.FillHV(instance)
3517       i_be = cluster.FillBE(instance)
3518       i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
3519                                  nic.nicparams) for nic in instance.nics]
3520       for field in self.op.output_fields:
3521         st_match = self._FIELDS_STATIC.Matches(field)
3522         if field == "name":
3523           val = instance.name
3524         elif field == "os":
3525           val = instance.os
3526         elif field == "pnode":
3527           val = instance.primary_node
3528         elif field == "snodes":
3529           val = list(instance.secondary_nodes)
3530         elif field == "admin_state":
3531           val = instance.admin_up
3532         elif field == "oper_state":
3533           if instance.primary_node in bad_nodes:
3534             val = None
3535           else:
3536             val = bool(live_data.get(instance.name))
3537         elif field == "status":
3538           if instance.primary_node in off_nodes:
3539             val = "ERROR_nodeoffline"
3540           elif instance.primary_node in bad_nodes:
3541             val = "ERROR_nodedown"
3542           else:
3543             running = bool(live_data.get(instance.name))
3544             if running:
3545               if instance.admin_up:
3546                 val = "running"
3547               else:
3548                 val = "ERROR_up"
3549             else:
3550               if instance.admin_up:
3551                 val = "ERROR_down"
3552               else:
3553                 val = "ADMIN_down"
3554         elif field == "oper_ram":
3555           if instance.primary_node in bad_nodes:
3556             val = None
3557           elif instance.name in live_data:
3558             val = live_data[instance.name].get("memory", "?")
3559           else:
3560             val = "-"
3561         elif field == "vcpus":
3562           val = i_be[constants.BE_VCPUS]
3563         elif field == "disk_template":
3564           val = instance.disk_template
3565         elif field == "ip":
3566           if instance.nics:
3567             val = instance.nics[0].ip
3568           else:
3569             val = None
3570         elif field == "nic_mode":
3571           if instance.nics:
3572             val = i_nicp[0][constants.NIC_MODE]
3573           else:
3574             val = None
3575         elif field == "nic_link":
3576           if instance.nics:
3577             val = i_nicp[0][constants.NIC_LINK]
3578           else:
3579             val = None
3580         elif field == "bridge":
3581           if (instance.nics and
3582               i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
3583             val = i_nicp[0][constants.NIC_LINK]
3584           else:
3585             val = None
3586         elif field == "mac":
3587           if instance.nics:
3588             val = instance.nics[0].mac
3589           else:
3590             val = None
3591         elif field == "sda_size" or field == "sdb_size":
3592           idx = ord(field[2]) - ord('a')
3593           try:
3594             val = instance.FindDisk(idx).size
3595           except errors.OpPrereqError:
3596             val = None
3597         elif field == "disk_usage": # total disk usage per node
3598           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3599           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3600         elif field == "tags":
3601           val = list(instance.GetTags())
3602         elif field == "serial_no":
3603           val = instance.serial_no
3604         elif field == "network_port":
3605           val = instance.network_port
3606         elif field == "hypervisor":
3607           val = instance.hypervisor
3608         elif field == "hvparams":
3609           val = i_hv
3610         elif (field.startswith(HVPREFIX) and
3611               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3612           val = i_hv.get(field[len(HVPREFIX):], None)
3613         elif field == "beparams":
3614           val = i_be
3615         elif (field.startswith(BEPREFIX) and
3616               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3617           val = i_be.get(field[len(BEPREFIX):], None)
3618         elif st_match and st_match.groups():
3619           # matches a variable list
3620           st_groups = st_match.groups()
3621           if st_groups and st_groups[0] == "disk":
3622             if st_groups[1] == "count":
3623               val = len(instance.disks)
3624             elif st_groups[1] == "sizes":
3625               val = [disk.size for disk in instance.disks]
3626             elif st_groups[1] == "size":
3627               try:
3628                 val = instance.FindDisk(st_groups[2]).size
3629               except errors.OpPrereqError:
3630                 val = None
3631             else:
3632               assert False, "Unhandled disk parameter"
3633           elif st_groups[0] == "nic":
3634             if st_groups[1] == "count":
3635               val = len(instance.nics)
3636             elif st_groups[1] == "macs":
3637               val = [nic.mac for nic in instance.nics]
3638             elif st_groups[1] == "ips":
3639               val = [nic.ip for nic in instance.nics]
3640             elif st_groups[1] == "modes":
3641               val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
3642             elif st_groups[1] == "links":
3643               val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
3644             elif st_groups[1] == "bridges":
3645               val = []
3646               for nicp in i_nicp:
3647                 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3648                   val.append(nicp[constants.NIC_LINK])
3649                 else:
3650                   val.append(None)
3651             else:
3652               # index-based item
3653               nic_idx = int(st_groups[2])
3654               if nic_idx >= len(instance.nics):
3655                 val = None
3656               else:
3657                 if st_groups[1] == "mac":
3658                   val = instance.nics[nic_idx].mac
3659                 elif st_groups[1] == "ip":
3660                   val = instance.nics[nic_idx].ip
3661                 elif st_groups[1] == "mode":
3662                   val = i_nicp[nic_idx][constants.NIC_MODE]
3663                 elif st_groups[1] == "link":
3664                   val = i_nicp[nic_idx][constants.NIC_LINK]
3665                 elif st_groups[1] == "bridge":
3666                   nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
3667                   if nic_mode == constants.NIC_MODE_BRIDGED:
3668                     val = i_nicp[nic_idx][constants.NIC_LINK]
3669                   else:
3670                     val = None
3671                 else:
3672                   assert False, "Unhandled NIC parameter"
3673           else:
3674             assert False, ("Declared but unhandled variable parameter '%s'" %
3675                            field)
3676         else:
3677           assert False, "Declared but unhandled parameter '%s'" % field
3678         iout.append(val)
3679       output.append(iout)
3680
3681     return output
3682
3683
3684 class LUFailoverInstance(LogicalUnit):
3685   """Failover an instance.
3686
3687   """
3688   HPATH = "instance-failover"
3689   HTYPE = constants.HTYPE_INSTANCE
3690   _OP_REQP = ["instance_name", "ignore_consistency"]
3691   REQ_BGL = False
3692
3693   def ExpandNames(self):
3694     self._ExpandAndLockInstance()
3695     self.needed_locks[locking.LEVEL_NODE] = []
3696     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3697
3698   def DeclareLocks(self, level):
3699     if level == locking.LEVEL_NODE:
3700       self._LockInstancesNodes()
3701
3702   def BuildHooksEnv(self):
3703     """Build hooks env.
3704
3705     This runs on master, primary and secondary nodes of the instance.
3706
3707     """
3708     env = {
3709       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3710       }
3711     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3712     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3713     return env, nl, nl
3714
3715   def CheckPrereq(self):
3716     """Check prerequisites.
3717
3718     This checks that the instance is in the cluster.
3719
3720     """
3721     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3722     assert self.instance is not None, \
3723       "Cannot retrieve locked instance %s" % self.op.instance_name
3724
3725     bep = self.cfg.GetClusterInfo().FillBE(instance)
3726     if instance.disk_template not in constants.DTS_NET_MIRROR:
3727       raise errors.OpPrereqError("Instance's disk layout is not"
3728                                  " network mirrored, cannot failover.")
3729
3730     secondary_nodes = instance.secondary_nodes
3731     if not secondary_nodes:
3732       raise errors.ProgrammerError("no secondary node but using "
3733                                    "a mirrored disk template")
3734
3735     target_node = secondary_nodes[0]
3736     _CheckNodeOnline(self, target_node)
3737     _CheckNodeNotDrained(self, target_node)
3738     if instance.admin_up:
3739       # check memory requirements on the secondary node
3740       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3741                            instance.name, bep[constants.BE_MEMORY],
3742                            instance.hypervisor)
3743     else:
3744       self.LogInfo("Not checking memory on the secondary node as"
3745                    " instance will not be started")
3746
3747     # check bridge existance
3748     _CheckInstanceBridgesExist(self, instance, node=target_node)
3749
3750   def Exec(self, feedback_fn):
3751     """Failover an instance.
3752
3753     The failover is done by shutting it down on its present node and
3754     starting it on the secondary.
3755
3756     """
3757     instance = self.instance
3758
3759     source_node = instance.primary_node
3760     target_node = instance.secondary_nodes[0]
3761
3762     feedback_fn("* checking disk consistency between source and target")
3763     for dev in instance.disks:
3764       # for drbd, these are drbd over lvm
3765       if not _CheckDiskConsistency(self, dev, target_node, False):
3766         if instance.admin_up and not self.op.ignore_consistency:
3767           raise errors.OpExecError("Disk %s is degraded on target node,"
3768                                    " aborting failover." % dev.iv_name)
3769
3770     feedback_fn("* shutting down instance on source node")
3771     logging.info("Shutting down instance %s on node %s",
3772                  instance.name, source_node)
3773
3774     result = self.rpc.call_instance_shutdown(source_node, instance)
3775     msg = result.fail_msg
3776     if msg:
3777       if self.op.ignore_consistency:
3778         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3779                              " Proceeding anyway. Please make sure node"
3780                              " %s is down. Error details: %s",
3781                              instance.name, source_node, source_node, msg)
3782       else:
3783         raise errors.OpExecError("Could not shutdown instance %s on"
3784                                  " node %s: %s" %
3785                                  (instance.name, source_node, msg))
3786
3787     feedback_fn("* deactivating the instance's disks on source node")
3788     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3789       raise errors.OpExecError("Can't shut down the instance's disks.")
3790
3791     instance.primary_node = target_node
3792     # distribute new instance config to the other nodes
3793     self.cfg.Update(instance)
3794
3795     # Only start the instance if it's marked as up
3796     if instance.admin_up:
3797       feedback_fn("* activating the instance's disks on target node")
3798       logging.info("Starting instance %s on node %s",
3799                    instance.name, target_node)
3800
3801       disks_ok, _ = _AssembleInstanceDisks(self, instance,
3802                                                ignore_secondaries=True)
3803       if not disks_ok:
3804         _ShutdownInstanceDisks(self, instance)
3805         raise errors.OpExecError("Can't activate the instance's disks")
3806
3807       feedback_fn("* starting the instance on the target node")
3808       result = self.rpc.call_instance_start(target_node, instance, None, None)
3809       msg = result.fail_msg
3810       if msg:
3811         _ShutdownInstanceDisks(self, instance)
3812         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3813                                  (instance.name, target_node, msg))
3814
3815
3816 class LUMigrateInstance(LogicalUnit):
3817   """Migrate an instance.
3818
3819   This is migration without shutting down, compared to the failover,
3820   which is done with shutdown.
3821
3822   """
3823   HPATH = "instance-migrate"
3824   HTYPE = constants.HTYPE_INSTANCE
3825   _OP_REQP = ["instance_name", "live", "cleanup"]
3826
3827   REQ_BGL = False
3828
3829   def ExpandNames(self):
3830     self._ExpandAndLockInstance()
3831     self.needed_locks[locking.LEVEL_NODE] = []
3832     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3833
3834   def DeclareLocks(self, level):
3835     if level == locking.LEVEL_NODE:
3836       self._LockInstancesNodes()
3837
3838   def BuildHooksEnv(self):
3839     """Build hooks env.
3840
3841     This runs on master, primary and secondary nodes of the instance.
3842
3843     """
3844     env = _BuildInstanceHookEnvByObject(self, self.instance)
3845     env["MIGRATE_LIVE"] = self.op.live
3846     env["MIGRATE_CLEANUP"] = self.op.cleanup
3847     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3848     return env, nl, nl
3849
3850   def CheckPrereq(self):
3851     """Check prerequisites.
3852
3853     This checks that the instance is in the cluster.
3854
3855     """
3856     instance = self.cfg.GetInstanceInfo(
3857       self.cfg.ExpandInstanceName(self.op.instance_name))
3858     if instance is None:
3859       raise errors.OpPrereqError("Instance '%s' not known" %
3860                                  self.op.instance_name)
3861
3862     if instance.disk_template != constants.DT_DRBD8:
3863       raise errors.OpPrereqError("Instance's disk layout is not"
3864                                  " drbd8, cannot migrate.")
3865
3866     secondary_nodes = instance.secondary_nodes
3867     if not secondary_nodes:
3868       raise errors.ConfigurationError("No secondary node but using"
3869                                       " drbd8 disk template")
3870
3871     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3872
3873     target_node = secondary_nodes[0]
3874     # check memory requirements on the secondary node
3875     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3876                          instance.name, i_be[constants.BE_MEMORY],
3877                          instance.hypervisor)
3878
3879     # check bridge existance
3880     _CheckInstanceBridgesExist(self, instance, node=target_node)
3881
3882     if not self.op.cleanup:
3883       _CheckNodeNotDrained(self, target_node)
3884       result = self.rpc.call_instance_migratable(instance.primary_node,
3885                                                  instance)
3886       result.Raise("Can't migrate, please use failover", prereq=True)
3887
3888     self.instance = instance
3889
3890   def _WaitUntilSync(self):
3891     """Poll with custom rpc for disk sync.
3892
3893     This uses our own step-based rpc call.
3894
3895     """
3896     self.feedback_fn("* wait until resync is done")
3897     all_done = False
3898     while not all_done:
3899       all_done = True
3900       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3901                                             self.nodes_ip,
3902                                             self.instance.disks)
3903       min_percent = 100
3904       for node, nres in result.items():
3905         nres.Raise("Cannot resync disks on node %s" % node)
3906         node_done, node_percent = nres.payload
3907         all_done = all_done and node_done
3908         if node_percent is not None:
3909           min_percent = min(min_percent, node_percent)
3910       if not all_done:
3911         if min_percent < 100:
3912           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3913         time.sleep(2)
3914
3915   def _EnsureSecondary(self, node):
3916     """Demote a node to secondary.
3917
3918     """
3919     self.feedback_fn("* switching node %s to secondary mode" % node)
3920
3921     for dev in self.instance.disks:
3922       self.cfg.SetDiskID(dev, node)
3923
3924     result = self.rpc.call_blockdev_close(node, self.instance.name,
3925                                           self.instance.disks)
3926     result.Raise("Cannot change disk to secondary on node %s" % node)
3927
3928   def _GoStandalone(self):
3929     """Disconnect from the network.
3930
3931     """
3932     self.feedback_fn("* changing into standalone mode")
3933     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3934                                                self.instance.disks)
3935     for node, nres in result.items():
3936       nres.Raise("Cannot disconnect disks node %s" % node)
3937
3938   def _GoReconnect(self, multimaster):
3939     """Reconnect to the network.
3940
3941     """
3942     if multimaster:
3943       msg = "dual-master"
3944     else:
3945       msg = "single-master"
3946     self.feedback_fn("* changing disks into %s mode" % msg)
3947     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3948                                            self.instance.disks,
3949                                            self.instance.name, multimaster)
3950     for node, nres in result.items():
3951       nres.Raise("Cannot change disks config on node %s" % node)
3952
3953   def _ExecCleanup(self):
3954     """Try to cleanup after a failed migration.
3955
3956     The cleanup is done by:
3957       - check that the instance is running only on one node
3958         (and update the config if needed)
3959       - change disks on its secondary node to secondary
3960       - wait until disks are fully synchronized
3961       - disconnect from the network
3962       - change disks into single-master mode
3963       - wait again until disks are fully synchronized
3964
3965     """
3966     instance = self.instance
3967     target_node = self.target_node
3968     source_node = self.source_node
3969
3970     # check running on only one node
3971     self.feedback_fn("* checking where the instance actually runs"
3972                      " (if this hangs, the hypervisor might be in"
3973                      " a bad state)")
3974     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3975     for node, result in ins_l.items():
3976       result.Raise("Can't contact node %s" % node)
3977
3978     runningon_source = instance.name in ins_l[source_node].payload
3979     runningon_target = instance.name in ins_l[target_node].payload
3980
3981     if runningon_source and runningon_target:
3982       raise errors.OpExecError("Instance seems to be running on two nodes,"
3983                                " or the hypervisor is confused. You will have"
3984                                " to ensure manually that it runs only on one"
3985                                " and restart this operation.")
3986
3987     if not (runningon_source or runningon_target):
3988       raise errors.OpExecError("Instance does not seem to be running at all."
3989                                " In this case, it's safer to repair by"
3990                                " running 'gnt-instance stop' to ensure disk"
3991                                " shutdown, and then restarting it.")
3992
3993     if runningon_target:
3994       # the migration has actually succeeded, we need to update the config
3995       self.feedback_fn("* instance running on secondary node (%s),"
3996                        " updating config" % target_node)
3997       instance.primary_node = target_node
3998       self.cfg.Update(instance)
3999       demoted_node = source_node
4000     else:
4001       self.feedback_fn("* instance confirmed to be running on its"
4002                        " primary node (%s)" % source_node)
4003       demoted_node = target_node
4004
4005     self._EnsureSecondary(demoted_node)
4006     try:
4007       self._WaitUntilSync()
4008     except errors.OpExecError:
4009       # we ignore here errors, since if the device is standalone, it
4010       # won't be able to sync
4011       pass
4012     self._GoStandalone()
4013     self._GoReconnect(False)
4014     self._WaitUntilSync()
4015
4016     self.feedback_fn("* done")
4017
4018   def _RevertDiskStatus(self):
4019     """Try to revert the disk status after a failed migration.
4020
4021     """
4022     target_node = self.target_node
4023     try:
4024       self._EnsureSecondary(target_node)
4025       self._GoStandalone()
4026       self._GoReconnect(False)
4027       self._WaitUntilSync()
4028     except errors.OpExecError, err:
4029       self.LogWarning("Migration failed and I can't reconnect the"
4030                       " drives: error '%s'\n"
4031                       "Please look and recover the instance status" %
4032                       str(err))
4033
4034   def _AbortMigration(self):
4035     """Call the hypervisor code to abort a started migration.
4036
4037     """
4038     instance = self.instance
4039     target_node = self.target_node
4040     migration_info = self.migration_info
4041
4042     abort_result = self.rpc.call_finalize_migration(target_node,
4043                                                     instance,
4044                                                     migration_info,
4045                                                     False)
4046     abort_msg = abort_result.fail_msg
4047     if abort_msg:
4048       logging.error("Aborting migration failed on target node %s: %s" %
4049                     (target_node, abort_msg))
4050       # Don't raise an exception here, as we stil have to try to revert the
4051       # disk status, even if this step failed.
4052
4053   def _ExecMigration(self):
4054     """Migrate an instance.
4055
4056     The migrate is done by:
4057       - change the disks into dual-master mode
4058       - wait until disks are fully synchronized again
4059       - migrate the instance
4060       - change disks on the new secondary node (the old primary) to secondary
4061       - wait until disks are fully synchronized
4062       - change disks into single-master mode
4063
4064     """
4065     instance = self.instance
4066     target_node = self.target_node
4067     source_node = self.source_node
4068
4069     self.feedback_fn("* checking disk consistency between source and target")
4070     for dev in instance.disks:
4071       if not _CheckDiskConsistency(self, dev, target_node, False):
4072         raise errors.OpExecError("Disk %s is degraded or not fully"
4073                                  " synchronized on target node,"
4074                                  " aborting migrate." % dev.iv_name)
4075
4076     # First get the migration information from the remote node
4077     result = self.rpc.call_migration_info(source_node, instance)
4078     msg = result.fail_msg
4079     if msg:
4080       log_err = ("Failed fetching source migration information from %s: %s" %
4081                  (source_node, msg))
4082       logging.error(log_err)
4083       raise errors.OpExecError(log_err)
4084
4085     self.migration_info = migration_info = result.payload
4086
4087     # Then switch the disks to master/master mode
4088     self._EnsureSecondary(target_node)
4089     self._GoStandalone()
4090     self._GoReconnect(True)
4091     self._WaitUntilSync()
4092
4093     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4094     result = self.rpc.call_accept_instance(target_node,
4095                                            instance,
4096                                            migration_info,
4097                                            self.nodes_ip[target_node])
4098
4099     msg = result.fail_msg
4100     if msg:
4101       logging.error("Instance pre-migration failed, trying to revert"
4102                     " disk status: %s", msg)
4103       self._AbortMigration()
4104       self._RevertDiskStatus()
4105       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4106                                (instance.name, msg))
4107
4108     self.feedback_fn("* migrating instance to %s" % target_node)
4109     time.sleep(10)
4110     result = self.rpc.call_instance_migrate(source_node, instance,
4111                                             self.nodes_ip[target_node],
4112                                             self.op.live)
4113     msg = result.fail_msg
4114     if msg:
4115       logging.error("Instance migration failed, trying to revert"
4116                     " disk status: %s", msg)
4117       self._AbortMigration()
4118       self._RevertDiskStatus()
4119       raise errors.OpExecError("Could not migrate instance %s: %s" %
4120                                (instance.name, msg))
4121     time.sleep(10)
4122
4123     instance.primary_node = target_node
4124     # distribute new instance config to the other nodes
4125     self.cfg.Update(instance)
4126
4127     result = self.rpc.call_finalize_migration(target_node,
4128                                               instance,
4129                                               migration_info,
4130                                               True)
4131     msg = result.fail_msg
4132     if msg:
4133       logging.error("Instance migration succeeded, but finalization failed:"
4134                     " %s" % msg)
4135       raise errors.OpExecError("Could not finalize instance migration: %s" %
4136                                msg)
4137
4138     self._EnsureSecondary(source_node)
4139     self._WaitUntilSync()
4140     self._GoStandalone()
4141     self._GoReconnect(False)
4142     self._WaitUntilSync()
4143
4144     self.feedback_fn("* done")
4145
4146   def Exec(self, feedback_fn):
4147     """Perform the migration.
4148
4149     """
4150     self.feedback_fn = feedback_fn
4151
4152     self.source_node = self.instance.primary_node
4153     self.target_node = self.instance.secondary_nodes[0]
4154     self.all_nodes = [self.source_node, self.target_node]
4155     self.nodes_ip = {
4156       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4157       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4158       }
4159     if self.op.cleanup:
4160       return self._ExecCleanup()
4161     else:
4162       return self._ExecMigration()
4163
4164
4165 def _CreateBlockDev(lu, node, instance, device, force_create,
4166                     info, force_open):
4167   """Create a tree of block devices on a given node.
4168
4169   If this device type has to be created on secondaries, create it and
4170   all its children.
4171
4172   If not, just recurse to children keeping the same 'force' value.
4173
4174   @param lu: the lu on whose behalf we execute
4175   @param node: the node on which to create the device
4176   @type instance: L{objects.Instance}
4177   @param instance: the instance which owns the device
4178   @type device: L{objects.Disk}
4179   @param device: the device to create
4180   @type force_create: boolean
4181   @param force_create: whether to force creation of this device; this
4182       will be change to True whenever we find a device which has
4183       CreateOnSecondary() attribute
4184   @param info: the extra 'metadata' we should attach to the device
4185       (this will be represented as a LVM tag)
4186   @type force_open: boolean
4187   @param force_open: this parameter will be passes to the
4188       L{backend.BlockdevCreate} function where it specifies
4189       whether we run on primary or not, and it affects both
4190       the child assembly and the device own Open() execution
4191
4192   """
4193   if device.CreateOnSecondary():
4194     force_create = True
4195
4196   if device.children:
4197     for child in device.children:
4198       _CreateBlockDev(lu, node, instance, child, force_create,
4199                       info, force_open)
4200
4201   if not force_create:
4202     return
4203
4204   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4205
4206
4207 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4208   """Create a single block device on a given node.
4209
4210   This will not recurse over children of the device, so they must be
4211   created in advance.
4212
4213   @param lu: the lu on whose behalf we execute
4214   @param node: the node on which to create the device
4215   @type instance: L{objects.Instance}
4216   @param instance: the instance which owns the device
4217   @type device: L{objects.Disk}
4218   @param device: the device to create
4219   @param info: the extra 'metadata' we should attach to the device
4220       (this will be represented as a LVM tag)
4221   @type force_open: boolean
4222   @param force_open: this parameter will be passes to the
4223       L{backend.BlockdevCreate} function where it specifies
4224       whether we run on primary or not, and it affects both
4225       the child assembly and the device own Open() execution
4226
4227   """
4228   lu.cfg.SetDiskID(device, node)
4229   result = lu.rpc.call_blockdev_create(node, device, device.size,
4230                                        instance.name, force_open, info)
4231   result.Raise("Can't create block device %s on"
4232                " node %s for instance %s" % (device, node, instance.name))
4233   if device.physical_id is None:
4234     device.physical_id = result.payload
4235
4236
4237 def _GenerateUniqueNames(lu, exts):
4238   """Generate a suitable LV name.
4239
4240   This will generate a logical volume name for the given instance.
4241
4242   """
4243   results = []
4244   for val in exts:
4245     new_id = lu.cfg.GenerateUniqueID()
4246     results.append("%s%s" % (new_id, val))
4247   return results
4248
4249
4250 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4251                          p_minor, s_minor):
4252   """Generate a drbd8 device complete with its children.
4253
4254   """
4255   port = lu.cfg.AllocatePort()
4256   vgname = lu.cfg.GetVGName()
4257   shared_secret = lu.cfg.GenerateDRBDSecret()
4258   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4259                           logical_id=(vgname, names[0]))
4260   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4261                           logical_id=(vgname, names[1]))
4262   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4263                           logical_id=(primary, secondary, port,
4264                                       p_minor, s_minor,
4265                                       shared_secret),
4266                           children=[dev_data, dev_meta],
4267                           iv_name=iv_name)
4268   return drbd_dev
4269
4270
4271 def _GenerateDiskTemplate(lu, template_name,
4272                           instance_name, primary_node,
4273                           secondary_nodes, disk_info,
4274                           file_storage_dir, file_driver,
4275                           base_index):
4276   """Generate the entire disk layout for a given template type.
4277
4278   """
4279   #TODO: compute space requirements
4280
4281   vgname = lu.cfg.GetVGName()
4282   disk_count = len(disk_info)
4283   disks = []
4284   if template_name == constants.DT_DISKLESS:
4285     pass
4286   elif template_name == constants.DT_PLAIN:
4287     if len(secondary_nodes) != 0:
4288       raise errors.ProgrammerError("Wrong template configuration")
4289
4290     names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4291                                       for i in range(disk_count)])
4292     for idx, disk in enumerate(disk_info):
4293       disk_index = idx + base_index
4294       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4295                               logical_id=(vgname, names[idx]),
4296                               iv_name="disk/%d" % disk_index,
4297                               mode=disk["mode"])
4298       disks.append(disk_dev)
4299   elif template_name == constants.DT_DRBD8:
4300     if len(secondary_nodes) != 1:
4301       raise errors.ProgrammerError("Wrong template configuration")
4302     remote_node = secondary_nodes[0]
4303     minors = lu.cfg.AllocateDRBDMinor(
4304       [primary_node, remote_node] * len(disk_info), instance_name)
4305
4306     names = []
4307     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4308                                                for i in range(disk_count)]):
4309       names.append(lv_prefix + "_data")
4310       names.append(lv_prefix + "_meta")
4311     for idx, disk in enumerate(disk_info):
4312       disk_index = idx + base_index
4313       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4314                                       disk["size"], names[idx*2:idx*2+2],
4315                                       "disk/%d" % disk_index,
4316                                       minors[idx*2], minors[idx*2+1])
4317       disk_dev.mode = disk["mode"]
4318       disks.append(disk_dev)
4319   elif template_name == constants.DT_FILE:
4320     if len(secondary_nodes) != 0:
4321       raise errors.ProgrammerError("Wrong template configuration")
4322
4323     for idx, disk in enumerate(disk_info):
4324       disk_index = idx + base_index
4325       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4326                               iv_name="disk/%d" % disk_index,
4327                               logical_id=(file_driver,
4328                                           "%s/disk%d" % (file_storage_dir,
4329                                                          disk_index)),
4330                               mode=disk["mode"])
4331       disks.append(disk_dev)
4332   else:
4333     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4334   return disks
4335
4336
4337 def _GetInstanceInfoText(instance):
4338   """Compute that text that should be added to the disk's metadata.
4339
4340   """
4341   return "originstname+%s" % instance.name
4342
4343
4344 def _CreateDisks(lu, instance):
4345   """Create all disks for an instance.
4346
4347   This abstracts away some work from AddInstance.
4348
4349   @type lu: L{LogicalUnit}
4350   @param lu: the logical unit on whose behalf we execute
4351   @type instance: L{objects.Instance}
4352   @param instance: the instance whose disks we should create
4353   @rtype: boolean
4354   @return: the success of the creation
4355
4356   """
4357   info = _GetInstanceInfoText(instance)
4358   pnode = instance.primary_node
4359
4360   if instance.disk_template == constants.DT_FILE:
4361     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4362     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4363
4364     result.Raise("Failed to create directory '%s' on"
4365                  " node %s: %s" % (file_storage_dir, pnode))
4366
4367   # Note: this needs to be kept in sync with adding of disks in
4368   # LUSetInstanceParams
4369   for device in instance.disks:
4370     logging.info("Creating volume %s for instance %s",
4371                  device.iv_name, instance.name)
4372     #HARDCODE
4373     for node in instance.all_nodes:
4374       f_create = node == pnode
4375       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4376
4377
4378 def _RemoveDisks(lu, instance):
4379   """Remove all disks for an instance.
4380
4381   This abstracts away some work from `AddInstance()` and
4382   `RemoveInstance()`. Note that in case some of the devices couldn't
4383   be removed, the removal will continue with the other ones (compare
4384   with `_CreateDisks()`).
4385
4386   @type lu: L{LogicalUnit}
4387   @param lu: the logical unit on whose behalf we execute
4388   @type instance: L{objects.Instance}
4389   @param instance: the instance whose disks we should remove
4390   @rtype: boolean
4391   @return: the success of the removal
4392
4393   """
4394   logging.info("Removing block devices for instance %s", instance.name)
4395
4396   all_result = True
4397   for device in instance.disks:
4398     for node, disk in device.ComputeNodeTree(instance.primary_node):
4399       lu.cfg.SetDiskID(disk, node)
4400       msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4401       if msg:
4402         lu.LogWarning("Could not remove block device %s on node %s,"
4403                       " continuing anyway: %s", device.iv_name, node, msg)
4404         all_result = False
4405
4406   if instance.disk_template == constants.DT_FILE:
4407     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4408     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4409                                                  file_storage_dir)
4410     msg = result.fail_msg
4411     if msg:
4412       lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4413                     file_storage_dir, instance.primary_node, msg)
4414       all_result = False
4415
4416   return all_result
4417
4418
4419 def _ComputeDiskSize(disk_template, disks):
4420   """Compute disk size requirements in the volume group
4421
4422   """
4423   # Required free disk space as a function of disk and swap space
4424   req_size_dict = {
4425     constants.DT_DISKLESS: None,
4426     constants.DT_PLAIN: sum(d["size"] for d in disks),
4427     # 128 MB are added for drbd metadata for each disk
4428     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4429     constants.DT_FILE: None,
4430   }
4431
4432   if disk_template not in req_size_dict:
4433     raise errors.ProgrammerError("Disk template '%s' size requirement"
4434                                  " is unknown" %  disk_template)
4435
4436   return req_size_dict[disk_template]
4437
4438
4439 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4440   """Hypervisor parameter validation.
4441
4442   This function abstract the hypervisor parameter validation to be
4443   used in both instance create and instance modify.
4444
4445   @type lu: L{LogicalUnit}
4446   @param lu: the logical unit for which we check
4447   @type nodenames: list
4448   @param nodenames: the list of nodes on which we should check
4449   @type hvname: string
4450   @param hvname: the name of the hypervisor we should use
4451   @type hvparams: dict
4452   @param hvparams: the parameters which we need to check
4453   @raise errors.OpPrereqError: if the parameters are not valid
4454
4455   """
4456   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4457                                                   hvname,
4458                                                   hvparams)
4459   for node in nodenames:
4460     info = hvinfo[node]
4461     if info.offline:
4462       continue
4463     info.Raise("Hypervisor parameter validation failed on node %s" % node)
4464
4465
4466 class LUCreateInstance(LogicalUnit):
4467   """Create an instance.
4468
4469   """
4470   HPATH = "instance-add"
4471   HTYPE = constants.HTYPE_INSTANCE
4472   _OP_REQP = ["instance_name", "disks", "disk_template",
4473               "mode", "start",
4474               "wait_for_sync", "ip_check", "nics",
4475               "hvparams", "beparams"]
4476   REQ_BGL = False
4477
4478   def _ExpandNode(self, node):
4479     """Expands and checks one node name.
4480
4481     """
4482     node_full = self.cfg.ExpandNodeName(node)
4483     if node_full is None:
4484       raise errors.OpPrereqError("Unknown node %s" % node)
4485     return node_full
4486
4487   def ExpandNames(self):
4488     """ExpandNames for CreateInstance.
4489
4490     Figure out the right locks for instance creation.
4491
4492     """
4493     self.needed_locks = {}
4494
4495     # set optional parameters to none if they don't exist
4496     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4497       if not hasattr(self.op, attr):
4498         setattr(self.op, attr, None)
4499
4500     # cheap checks, mostly valid constants given
4501
4502     # verify creation mode
4503     if self.op.mode not in (constants.INSTANCE_CREATE,
4504                             constants.INSTANCE_IMPORT):
4505       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4506                                  self.op.mode)
4507
4508     # disk template and mirror node verification
4509     if self.op.disk_template not in constants.DISK_TEMPLATES:
4510       raise errors.OpPrereqError("Invalid disk template name")
4511
4512     if self.op.hypervisor is None:
4513       self.op.hypervisor = self.cfg.GetHypervisorType()
4514
4515     cluster = self.cfg.GetClusterInfo()
4516     enabled_hvs = cluster.enabled_hypervisors
4517     if self.op.hypervisor not in enabled_hvs:
4518       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4519                                  " cluster (%s)" % (self.op.hypervisor,
4520                                   ",".join(enabled_hvs)))
4521
4522     # check hypervisor parameter syntax (locally)
4523     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4524     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4525                                   self.op.hvparams)
4526     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4527     hv_type.CheckParameterSyntax(filled_hvp)
4528     self.hv_full = filled_hvp
4529
4530     # fill and remember the beparams dict
4531     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4532     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4533                                     self.op.beparams)
4534
4535     #### instance parameters check
4536
4537     # instance name verification
4538     hostname1 = utils.HostInfo(self.op.instance_name)
4539     self.op.instance_name = instance_name = hostname1.name
4540
4541     # this is just a preventive check, but someone might still add this
4542     # instance in the meantime, and creation will fail at lock-add time
4543     if instance_name in self.cfg.GetInstanceList():
4544       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4545                                  instance_name)
4546
4547     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4548
4549     # NIC buildup
4550     self.nics = []
4551     for idx, nic in enumerate(self.op.nics):
4552       nic_mode_req = nic.get("mode", None)
4553       nic_mode = nic_mode_req
4554       if nic_mode is None:
4555         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4556
4557       # in routed mode, for the first nic, the default ip is 'auto'
4558       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4559         default_ip_mode = constants.VALUE_AUTO
4560       else:
4561         default_ip_mode = constants.VALUE_NONE
4562
4563       # ip validity checks
4564       ip = nic.get("ip", default_ip_mode)
4565       if ip is None or ip.lower() == constants.VALUE_NONE:
4566         nic_ip = None
4567       elif ip.lower() == constants.VALUE_AUTO:
4568         nic_ip = hostname1.ip
4569       else:
4570         if not utils.IsValidIP(ip):
4571           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4572                                      " like a valid IP" % ip)
4573         nic_ip = ip
4574
4575       # TODO: check the ip for uniqueness !!
4576       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4577         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4578
4579       # MAC address verification
4580       mac = nic.get("mac", constants.VALUE_AUTO)
4581       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4582         if not utils.IsValidMac(mac.lower()):
4583           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4584                                      mac)
4585       # bridge verification
4586       bridge = nic.get("bridge", None)
4587       link = nic.get("link", None)
4588       if bridge and link:
4589         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
4590                                    " at the same time")
4591       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4592         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4593       elif bridge:
4594         link = bridge
4595
4596       nicparams = {}
4597       if nic_mode_req:
4598         nicparams[constants.NIC_MODE] = nic_mode_req
4599       if link:
4600         nicparams[constants.NIC_LINK] = link
4601
4602       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4603                                       nicparams)
4604       objects.NIC.CheckParameterSyntax(check_params)
4605       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4606
4607     # disk checks/pre-build
4608     self.disks = []
4609     for disk in self.op.disks:
4610       mode = disk.get("mode", constants.DISK_RDWR)
4611       if mode not in constants.DISK_ACCESS_SET:
4612         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4613                                    mode)
4614       size = disk.get("size", None)
4615       if size is None:
4616         raise errors.OpPrereqError("Missing disk size")
4617       try:
4618         size = int(size)
4619       except ValueError:
4620         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4621       self.disks.append({"size": size, "mode": mode})
4622
4623     # used in CheckPrereq for ip ping check
4624     self.check_ip = hostname1.ip
4625
4626     # file storage checks
4627     if (self.op.file_driver and
4628         not self.op.file_driver in constants.FILE_DRIVER):
4629       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4630                                  self.op.file_driver)
4631
4632     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4633       raise errors.OpPrereqError("File storage directory path not absolute")
4634
4635     ### Node/iallocator related checks
4636     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4637       raise errors.OpPrereqError("One and only one of iallocator and primary"
4638                                  " node must be given")
4639
4640     if self.op.iallocator:
4641       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4642     else:
4643       self.op.pnode = self._ExpandNode(self.op.pnode)
4644       nodelist = [self.op.pnode]
4645       if self.op.snode is not None:
4646         self.op.snode = self._ExpandNode(self.op.snode)
4647         nodelist.append(self.op.snode)
4648       self.needed_locks[locking.LEVEL_NODE] = nodelist
4649
4650     # in case of import lock the source node too
4651     if self.op.mode == constants.INSTANCE_IMPORT:
4652       src_node = getattr(self.op, "src_node", None)
4653       src_path = getattr(self.op, "src_path", None)
4654
4655       if src_path is None:
4656         self.op.src_path = src_path = self.op.instance_name
4657
4658       if src_node is None:
4659         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4660         self.op.src_node = None
4661         if os.path.isabs(src_path):
4662           raise errors.OpPrereqError("Importing an instance from an absolute"
4663                                      " path requires a source node option.")
4664       else:
4665         self.op.src_node = src_node = self._ExpandNode(src_node)
4666         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4667           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4668         if not os.path.isabs(src_path):
4669           self.op.src_path = src_path = \
4670             os.path.join(constants.EXPORT_DIR, src_path)
4671
4672     else: # INSTANCE_CREATE
4673       if getattr(self.op, "os_type", None) is None:
4674         raise errors.OpPrereqError("No guest OS specified")
4675
4676   def _RunAllocator(self):
4677     """Run the allocator based on input opcode.
4678
4679     """
4680     nics = [n.ToDict() for n in self.nics]
4681     ial = IAllocator(self,
4682                      mode=constants.IALLOCATOR_MODE_ALLOC,
4683                      name=self.op.instance_name,
4684                      disk_template=self.op.disk_template,
4685                      tags=[],
4686                      os=self.op.os_type,
4687                      vcpus=self.be_full[constants.BE_VCPUS],
4688                      mem_size=self.be_full[constants.BE_MEMORY],
4689                      disks=self.disks,
4690                      nics=nics,
4691                      hypervisor=self.op.hypervisor,
4692                      )
4693
4694     ial.Run(self.op.iallocator)
4695
4696     if not ial.success:
4697       raise errors.OpPrereqError("Can't compute nodes using"
4698                                  " iallocator '%s': %s" % (self.op.iallocator,
4699                                                            ial.info))
4700     if len(ial.nodes) != ial.required_nodes:
4701       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4702                                  " of nodes (%s), required %s" %
4703                                  (self.op.iallocator, len(ial.nodes),
4704                                   ial.required_nodes))
4705     self.op.pnode = ial.nodes[0]
4706     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4707                  self.op.instance_name, self.op.iallocator,
4708                  ", ".join(ial.nodes))
4709     if ial.required_nodes == 2:
4710       self.op.snode = ial.nodes[1]
4711
4712   def BuildHooksEnv(self):
4713     """Build hooks env.
4714
4715     This runs on master, primary and secondary nodes of the instance.
4716
4717     """
4718     env = {
4719       "ADD_MODE": self.op.mode,
4720       }
4721     if self.op.mode == constants.INSTANCE_IMPORT:
4722       env["SRC_NODE"] = self.op.src_node
4723       env["SRC_PATH"] = self.op.src_path
4724       env["SRC_IMAGES"] = self.src_images
4725
4726     env.update(_BuildInstanceHookEnv(
4727       name=self.op.instance_name,
4728       primary_node=self.op.pnode,
4729       secondary_nodes=self.secondaries,
4730       status=self.op.start,
4731       os_type=self.op.os_type,
4732       memory=self.be_full[constants.BE_MEMORY],
4733       vcpus=self.be_full[constants.BE_VCPUS],
4734       nics=_NICListToTuple(self, self.nics),
4735       disk_template=self.op.disk_template,
4736       disks=[(d["size"], d["mode"]) for d in self.disks],
4737       bep=self.be_full,
4738       hvp=self.hv_full,
4739       hypervisor=self.op.hypervisor,
4740     ))
4741
4742     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4743           self.secondaries)
4744     return env, nl, nl
4745
4746
4747   def CheckPrereq(self):
4748     """Check prerequisites.
4749
4750     """
4751     if (not self.cfg.GetVGName() and
4752         self.op.disk_template not in constants.DTS_NOT_LVM):
4753       raise errors.OpPrereqError("Cluster does not support lvm-based"
4754                                  " instances")
4755
4756     if self.op.mode == constants.INSTANCE_IMPORT:
4757       src_node = self.op.src_node
4758       src_path = self.op.src_path
4759
4760       if src_node is None:
4761         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4762         exp_list = self.rpc.call_export_list(locked_nodes)
4763         found = False
4764         for node in exp_list:
4765           if exp_list[node].fail_msg:
4766             continue
4767           if src_path in exp_list[node].payload:
4768             found = True
4769             self.op.src_node = src_node = node
4770             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4771                                                        src_path)
4772             break
4773         if not found:
4774           raise errors.OpPrereqError("No export found for relative path %s" %
4775                                       src_path)
4776
4777       _CheckNodeOnline(self, src_node)
4778       result = self.rpc.call_export_info(src_node, src_path)
4779       result.Raise("No export or invalid export found in dir %s" % src_path)
4780
4781       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4782       if not export_info.has_section(constants.INISECT_EXP):
4783         raise errors.ProgrammerError("Corrupted export config")
4784
4785       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4786       if (int(ei_version) != constants.EXPORT_VERSION):
4787         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4788                                    (ei_version, constants.EXPORT_VERSION))
4789
4790       # Check that the new instance doesn't have less disks than the export
4791       instance_disks = len(self.disks)
4792       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4793       if instance_disks < export_disks:
4794         raise errors.OpPrereqError("Not enough disks to import."
4795                                    " (instance: %d, export: %d)" %
4796                                    (instance_disks, export_disks))
4797
4798       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4799       disk_images = []
4800       for idx in range(export_disks):
4801         option = 'disk%d_dump' % idx
4802         if export_info.has_option(constants.INISECT_INS, option):
4803           # FIXME: are the old os-es, disk sizes, etc. useful?
4804           export_name = export_info.get(constants.INISECT_INS, option)
4805           image = os.path.join(src_path, export_name)
4806           disk_images.append(image)
4807         else:
4808           disk_images.append(False)
4809
4810       self.src_images = disk_images
4811
4812       old_name = export_info.get(constants.INISECT_INS, 'name')
4813       # FIXME: int() here could throw a ValueError on broken exports
4814       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4815       if self.op.instance_name == old_name:
4816         for idx, nic in enumerate(self.nics):
4817           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4818             nic_mac_ini = 'nic%d_mac' % idx
4819             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4820
4821     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4822     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4823     if self.op.start and not self.op.ip_check:
4824       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4825                                  " adding an instance in start mode")
4826
4827     if self.op.ip_check:
4828       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4829         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4830                                    (self.check_ip, self.op.instance_name))
4831
4832     #### mac address generation
4833     # By generating here the mac address both the allocator and the hooks get
4834     # the real final mac address rather than the 'auto' or 'generate' value.
4835     # There is a race condition between the generation and the instance object
4836     # creation, which means that we know the mac is valid now, but we're not
4837     # sure it will be when we actually add the instance. If things go bad
4838     # adding the instance will abort because of a duplicate mac, and the
4839     # creation job will fail.
4840     for nic in self.nics:
4841       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4842         nic.mac = self.cfg.GenerateMAC()
4843
4844     #### allocator run
4845
4846     if self.op.iallocator is not None:
4847       self._RunAllocator()
4848
4849     #### node related checks
4850
4851     # check primary node
4852     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4853     assert self.pnode is not None, \
4854       "Cannot retrieve locked node %s" % self.op.pnode
4855     if pnode.offline:
4856       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4857                                  pnode.name)
4858     if pnode.drained:
4859       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4860                                  pnode.name)
4861
4862     self.secondaries = []
4863
4864     # mirror node verification
4865     if self.op.disk_template in constants.DTS_NET_MIRROR:
4866       if self.op.snode is None:
4867         raise errors.OpPrereqError("The networked disk templates need"
4868                                    " a mirror node")
4869       if self.op.snode == pnode.name:
4870         raise errors.OpPrereqError("The secondary node cannot be"
4871                                    " the primary node.")
4872       _CheckNodeOnline(self, self.op.snode)
4873       _CheckNodeNotDrained(self, self.op.snode)
4874       self.secondaries.append(self.op.snode)
4875
4876     nodenames = [pnode.name] + self.secondaries
4877
4878     req_size = _ComputeDiskSize(self.op.disk_template,
4879                                 self.disks)
4880
4881     # Check lv size requirements
4882     if req_size is not None:
4883       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4884                                          self.op.hypervisor)
4885       for node in nodenames:
4886         info = nodeinfo[node]
4887         info.Raise("Cannot get current information from node %s" % node)
4888         info = info.payload
4889         vg_free = info.get('vg_free', None)
4890         if not isinstance(vg_free, int):
4891           raise errors.OpPrereqError("Can't compute free disk space on"
4892                                      " node %s" % node)
4893         if req_size > vg_free:
4894           raise errors.OpPrereqError("Not enough disk space on target node %s."
4895                                      " %d MB available, %d MB required" %
4896                                      (node, vg_free, req_size))
4897
4898     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4899
4900     # os verification
4901     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4902     result.Raise("OS '%s' not in supported os list for primary node %s" %
4903                  (self.op.os_type, pnode.name), prereq=True)
4904
4905     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4906
4907     # memory check on primary node
4908     if self.op.start:
4909       _CheckNodeFreeMemory(self, self.pnode.name,
4910                            "creating instance %s" % self.op.instance_name,
4911                            self.be_full[constants.BE_MEMORY],
4912                            self.op.hypervisor)
4913
4914     self.dry_run_result = list(nodenames)
4915
4916   def Exec(self, feedback_fn):
4917     """Create and add the instance to the cluster.
4918
4919     """
4920     instance = self.op.instance_name
4921     pnode_name = self.pnode.name
4922
4923     ht_kind = self.op.hypervisor
4924     if ht_kind in constants.HTS_REQ_PORT:
4925       network_port = self.cfg.AllocatePort()
4926     else:
4927       network_port = None
4928
4929     ##if self.op.vnc_bind_address is None:
4930     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4931
4932     # this is needed because os.path.join does not accept None arguments
4933     if self.op.file_storage_dir is None:
4934       string_file_storage_dir = ""
4935     else:
4936       string_file_storage_dir = self.op.file_storage_dir
4937
4938     # build the full file storage dir path
4939     file_storage_dir = os.path.normpath(os.path.join(
4940                                         self.cfg.GetFileStorageDir(),
4941                                         string_file_storage_dir, instance))
4942
4943
4944     disks = _GenerateDiskTemplate(self,
4945                                   self.op.disk_template,
4946                                   instance, pnode_name,
4947                                   self.secondaries,
4948                                   self.disks,
4949                                   file_storage_dir,
4950                                   self.op.file_driver,
4951                                   0)
4952
4953     iobj = objects.Instance(name=instance, os=self.op.os_type,
4954                             primary_node=pnode_name,
4955                             nics=self.nics, disks=disks,
4956                             disk_template=self.op.disk_template,
4957                             admin_up=False,
4958                             network_port=network_port,
4959                             beparams=self.op.beparams,
4960                             hvparams=self.op.hvparams,
4961                             hypervisor=self.op.hypervisor,
4962                             )
4963
4964     feedback_fn("* creating instance disks...")
4965     try:
4966       _CreateDisks(self, iobj)
4967     except errors.OpExecError:
4968       self.LogWarning("Device creation failed, reverting...")
4969       try:
4970         _RemoveDisks(self, iobj)
4971       finally:
4972         self.cfg.ReleaseDRBDMinors(instance)
4973         raise
4974
4975     feedback_fn("adding instance %s to cluster config" % instance)
4976
4977     self.cfg.AddInstance(iobj)
4978     # Declare that we don't want to remove the instance lock anymore, as we've
4979     # added the instance to the config
4980     del self.remove_locks[locking.LEVEL_INSTANCE]
4981     # Unlock all the nodes
4982     if self.op.mode == constants.INSTANCE_IMPORT:
4983       nodes_keep = [self.op.src_node]
4984       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4985                        if node != self.op.src_node]
4986       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4987       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4988     else:
4989       self.context.glm.release(locking.LEVEL_NODE)
4990       del self.acquired_locks[locking.LEVEL_NODE]
4991
4992     if self.op.wait_for_sync:
4993       disk_abort = not _WaitForSync(self, iobj)
4994     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4995       # make sure the disks are not degraded (still sync-ing is ok)
4996       time.sleep(15)
4997       feedback_fn("* checking mirrors status")
4998       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4999     else:
5000       disk_abort = False
5001
5002     if disk_abort:
5003       _RemoveDisks(self, iobj)
5004       self.cfg.RemoveInstance(iobj.name)
5005       # Make sure the instance lock gets removed
5006       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5007       raise errors.OpExecError("There are some degraded disks for"
5008                                " this instance")
5009
5010     feedback_fn("creating os for instance %s on node %s" %
5011                 (instance, pnode_name))
5012
5013     if iobj.disk_template != constants.DT_DISKLESS:
5014       if self.op.mode == constants.INSTANCE_CREATE:
5015         feedback_fn("* running the instance OS create scripts...")
5016         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
5017         result.Raise("Could not add os for instance %s"
5018                      " on node %s" % (instance, pnode_name))
5019
5020       elif self.op.mode == constants.INSTANCE_IMPORT:
5021         feedback_fn("* running the instance OS import scripts...")
5022         src_node = self.op.src_node
5023         src_images = self.src_images
5024         cluster_name = self.cfg.GetClusterName()
5025         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5026                                                          src_node, src_images,
5027                                                          cluster_name)
5028         msg = import_result.fail_msg
5029         if msg:
5030           self.LogWarning("Error while importing the disk images for instance"
5031                           " %s on node %s: %s" % (instance, pnode_name, msg))
5032       else:
5033         # also checked in the prereq part
5034         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5035                                      % self.op.mode)
5036
5037     if self.op.start:
5038       iobj.admin_up = True
5039       self.cfg.Update(iobj)
5040       logging.info("Starting instance %s on node %s", instance, pnode_name)
5041       feedback_fn("* starting instance...")
5042       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5043       result.Raise("Could not start instance")
5044
5045     return list(iobj.all_nodes)
5046
5047
5048 class LUConnectConsole(NoHooksLU):
5049   """Connect to an instance's console.
5050
5051   This is somewhat special in that it returns the command line that
5052   you need to run on the master node in order to connect to the
5053   console.
5054
5055   """
5056   _OP_REQP = ["instance_name"]
5057   REQ_BGL = False
5058
5059   def ExpandNames(self):
5060     self._ExpandAndLockInstance()
5061
5062   def CheckPrereq(self):
5063     """Check prerequisites.
5064
5065     This checks that the instance is in the cluster.
5066
5067     """
5068     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5069     assert self.instance is not None, \
5070       "Cannot retrieve locked instance %s" % self.op.instance_name
5071     _CheckNodeOnline(self, self.instance.primary_node)
5072
5073   def Exec(self, feedback_fn):
5074     """Connect to the console of an instance
5075
5076     """
5077     instance = self.instance
5078     node = instance.primary_node
5079
5080     node_insts = self.rpc.call_instance_list([node],
5081                                              [instance.hypervisor])[node]
5082     node_insts.Raise("Can't get node information from %s" % node)
5083
5084     if instance.name not in node_insts.payload:
5085       raise errors.OpExecError("Instance %s is not running." % instance.name)
5086
5087     logging.debug("Connecting to console of %s on %s", instance.name, node)
5088
5089     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5090     cluster = self.cfg.GetClusterInfo()
5091     # beparams and hvparams are passed separately, to avoid editing the
5092     # instance and then saving the defaults in the instance itself.
5093     hvparams = cluster.FillHV(instance)
5094     beparams = cluster.FillBE(instance)
5095     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5096
5097     # build ssh cmdline
5098     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5099
5100
5101 class LUReplaceDisks(LogicalUnit):
5102   """Replace the disks of an instance.
5103
5104   """
5105   HPATH = "mirrors-replace"
5106   HTYPE = constants.HTYPE_INSTANCE
5107   _OP_REQP = ["instance_name", "mode", "disks"]
5108   REQ_BGL = False
5109
5110   def CheckArguments(self):
5111     if not hasattr(self.op, "remote_node"):
5112       self.op.remote_node = None
5113     if not hasattr(self.op, "iallocator"):
5114       self.op.iallocator = None
5115
5116     # check for valid parameter combination
5117     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5118     if self.op.mode == constants.REPLACE_DISK_CHG:
5119       if cnt == 2:
5120         raise errors.OpPrereqError("When changing the secondary either an"
5121                                    " iallocator script must be used or the"
5122                                    " new node given")
5123       elif cnt == 0:
5124         raise errors.OpPrereqError("Give either the iallocator or the new"
5125                                    " secondary, not both")
5126     else: # not replacing the secondary
5127       if cnt != 2:
5128         raise errors.OpPrereqError("The iallocator and new node options can"
5129                                    " be used only when changing the"
5130                                    " secondary node")
5131
5132   def ExpandNames(self):
5133     self._ExpandAndLockInstance()
5134
5135     if self.op.iallocator is not None:
5136       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5137     elif self.op.remote_node is not None:
5138       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5139       if remote_node is None:
5140         raise errors.OpPrereqError("Node '%s' not known" %
5141                                    self.op.remote_node)
5142       self.op.remote_node = remote_node
5143       # Warning: do not remove the locking of the new secondary here
5144       # unless DRBD8.AddChildren is changed to work in parallel;
5145       # currently it doesn't since parallel invocations of
5146       # FindUnusedMinor will conflict
5147       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5148       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5149     else:
5150       self.needed_locks[locking.LEVEL_NODE] = []
5151       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5152
5153   def DeclareLocks(self, level):
5154     # If we're not already locking all nodes in the set we have to declare the
5155     # instance's primary/secondary nodes.
5156     if (level == locking.LEVEL_NODE and
5157         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5158       self._LockInstancesNodes()
5159
5160   def _RunAllocator(self):
5161     """Compute a new secondary node using an IAllocator.
5162
5163     """
5164     ial = IAllocator(self,
5165                      mode=constants.IALLOCATOR_MODE_RELOC,
5166                      name=self.op.instance_name,
5167                      relocate_from=[self.sec_node])
5168
5169     ial.Run(self.op.iallocator)
5170
5171     if not ial.success:
5172       raise errors.OpPrereqError("Can't compute nodes using"
5173                                  " iallocator '%s': %s" % (self.op.iallocator,
5174                                                            ial.info))
5175     if len(ial.nodes) != ial.required_nodes:
5176       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5177                                  " of nodes (%s), required %s" %
5178                                  (len(ial.nodes), ial.required_nodes))
5179     self.op.remote_node = ial.nodes[0]
5180     self.LogInfo("Selected new secondary for the instance: %s",
5181                  self.op.remote_node)
5182
5183   def BuildHooksEnv(self):
5184     """Build hooks env.
5185
5186     This runs on the master, the primary and all the secondaries.
5187
5188     """
5189     env = {
5190       "MODE": self.op.mode,
5191       "NEW_SECONDARY": self.op.remote_node,
5192       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5193       }
5194     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5195     nl = [
5196       self.cfg.GetMasterNode(),
5197       self.instance.primary_node,
5198       ]
5199     if self.op.remote_node is not None:
5200       nl.append(self.op.remote_node)
5201     return env, nl, nl
5202
5203   def CheckPrereq(self):
5204     """Check prerequisites.
5205
5206     This checks that the instance is in the cluster.
5207
5208     """
5209     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5210     assert instance is not None, \
5211       "Cannot retrieve locked instance %s" % self.op.instance_name
5212     self.instance = instance
5213
5214     if instance.disk_template != constants.DT_DRBD8:
5215       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5216                                  " instances")
5217
5218     if len(instance.secondary_nodes) != 1:
5219       raise errors.OpPrereqError("The instance has a strange layout,"
5220                                  " expected one secondary but found %d" %
5221                                  len(instance.secondary_nodes))
5222
5223     self.sec_node = instance.secondary_nodes[0]
5224
5225     if self.op.iallocator is not None:
5226       self._RunAllocator()
5227
5228     remote_node = self.op.remote_node
5229     if remote_node is not None:
5230       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5231       assert self.remote_node_info is not None, \
5232         "Cannot retrieve locked node %s" % remote_node
5233     else:
5234       self.remote_node_info = None
5235     if remote_node == instance.primary_node:
5236       raise errors.OpPrereqError("The specified node is the primary node of"
5237                                  " the instance.")
5238     elif remote_node == self.sec_node:
5239       raise errors.OpPrereqError("The specified node is already the"
5240                                  " secondary node of the instance.")
5241
5242     if self.op.mode == constants.REPLACE_DISK_PRI:
5243       n1 = self.tgt_node = instance.primary_node
5244       n2 = self.oth_node = self.sec_node
5245     elif self.op.mode == constants.REPLACE_DISK_SEC:
5246       n1 = self.tgt_node = self.sec_node
5247       n2 = self.oth_node = instance.primary_node
5248     elif self.op.mode == constants.REPLACE_DISK_CHG:
5249       n1 = self.new_node = remote_node
5250       n2 = self.oth_node = instance.primary_node
5251       self.tgt_node = self.sec_node
5252       _CheckNodeNotDrained(self, remote_node)
5253     else:
5254       raise errors.ProgrammerError("Unhandled disk replace mode")
5255
5256     _CheckNodeOnline(self, n1)
5257     _CheckNodeOnline(self, n2)
5258
5259     if not self.op.disks:
5260       self.op.disks = range(len(instance.disks))
5261
5262     for disk_idx in self.op.disks:
5263       instance.FindDisk(disk_idx)
5264
5265   def _ExecD8DiskOnly(self, feedback_fn):
5266     """Replace a disk on the primary or secondary for dbrd8.
5267
5268     The algorithm for replace is quite complicated:
5269
5270       1. for each disk to be replaced:
5271
5272         1. create new LVs on the target node with unique names
5273         1. detach old LVs from the drbd device
5274         1. rename old LVs to name_replaced.<time_t>
5275         1. rename new LVs to old LVs
5276         1. attach the new LVs (with the old names now) to the drbd device
5277
5278       1. wait for sync across all devices
5279
5280       1. for each modified disk:
5281
5282         1. remove old LVs (which have the name name_replaces.<time_t>)
5283
5284     Failures are not very well handled.
5285
5286     """
5287     steps_total = 6
5288     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5289     instance = self.instance
5290     iv_names = {}
5291     vgname = self.cfg.GetVGName()
5292     # start of work
5293     cfg = self.cfg
5294     tgt_node = self.tgt_node
5295     oth_node = self.oth_node
5296
5297     # Step: check device activation
5298     self.proc.LogStep(1, steps_total, "check device existence")
5299     info("checking volume groups")
5300     my_vg = cfg.GetVGName()
5301     results = self.rpc.call_vg_list([oth_node, tgt_node])
5302     if not results:
5303       raise errors.OpExecError("Can't list volume groups on the nodes")
5304     for node in oth_node, tgt_node:
5305       res = results[node]
5306       res.Raise("Error checking node %s" % node)
5307       if my_vg not in res.payload:
5308         raise errors.OpExecError("Volume group '%s' not found on %s" %
5309                                  (my_vg, node))
5310     for idx, dev in enumerate(instance.disks):
5311       if idx not in self.op.disks:
5312         continue
5313       for node in tgt_node, oth_node:
5314         info("checking disk/%d on %s" % (idx, node))
5315         cfg.SetDiskID(dev, node)
5316         result = self.rpc.call_blockdev_find(node, dev)
5317         msg = result.fail_msg
5318         if not msg and not result.payload:
5319           msg = "disk not found"
5320         if msg:
5321           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5322                                    (idx, node, msg))
5323
5324     # Step: check other node consistency
5325     self.proc.LogStep(2, steps_total, "check peer consistency")
5326     for idx, dev in enumerate(instance.disks):
5327       if idx not in self.op.disks:
5328         continue
5329       info("checking disk/%d consistency on %s" % (idx, oth_node))
5330       if not _CheckDiskConsistency(self, dev, oth_node,
5331                                    oth_node==instance.primary_node):
5332         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5333                                  " to replace disks on this node (%s)" %
5334                                  (oth_node, tgt_node))
5335
5336     # Step: create new storage
5337     self.proc.LogStep(3, steps_total, "allocate new storage")
5338     for idx, dev in enumerate(instance.disks):
5339       if idx not in self.op.disks:
5340         continue
5341       size = dev.size
5342       cfg.SetDiskID(dev, tgt_node)
5343       lv_names = [".disk%d_%s" % (idx, suf)
5344                   for suf in ["data", "meta"]]
5345       names = _GenerateUniqueNames(self, lv_names)
5346       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5347                              logical_id=(vgname, names[0]))
5348       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5349                              logical_id=(vgname, names[1]))
5350       new_lvs = [lv_data, lv_meta]
5351       old_lvs = dev.children
5352       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5353       info("creating new local storage on %s for %s" %
5354            (tgt_node, dev.iv_name))
5355       # we pass force_create=True to force the LVM creation
5356       for new_lv in new_lvs:
5357         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5358                         _GetInstanceInfoText(instance), False)
5359
5360     # Step: for each lv, detach+rename*2+attach
5361     self.proc.LogStep(4, steps_total, "change drbd configuration")
5362     for dev, old_lvs, new_lvs in iv_names.itervalues():
5363       info("detaching %s drbd from local storage" % dev.iv_name)
5364       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5365       result.Raise("Can't detach drbd from local storage on node"
5366                    " %s for device %s" % (tgt_node, dev.iv_name))
5367       #dev.children = []
5368       #cfg.Update(instance)
5369
5370       # ok, we created the new LVs, so now we know we have the needed
5371       # storage; as such, we proceed on the target node to rename
5372       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5373       # using the assumption that logical_id == physical_id (which in
5374       # turn is the unique_id on that node)
5375
5376       # FIXME(iustin): use a better name for the replaced LVs
5377       temp_suffix = int(time.time())
5378       ren_fn = lambda d, suff: (d.physical_id[0],
5379                                 d.physical_id[1] + "_replaced-%s" % suff)
5380       # build the rename list based on what LVs exist on the node
5381       rlist = []
5382       for to_ren in old_lvs:
5383         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5384         if not result.fail_msg and result.payload:
5385           # device exists
5386           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5387
5388       info("renaming the old LVs on the target node")
5389       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5390       result.Raise("Can't rename old LVs on node %s" % tgt_node)
5391       # now we rename the new LVs to the old LVs
5392       info("renaming the new LVs on the target node")
5393       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5394       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5395       result.Raise("Can't rename new LVs on node %s" % tgt_node)
5396
5397       for old, new in zip(old_lvs, new_lvs):
5398         new.logical_id = old.logical_id
5399         cfg.SetDiskID(new, tgt_node)
5400
5401       for disk in old_lvs:
5402         disk.logical_id = ren_fn(disk, temp_suffix)
5403         cfg.SetDiskID(disk, tgt_node)
5404
5405       # now that the new lvs have the old name, we can add them to the device
5406       info("adding new mirror component on %s" % tgt_node)
5407       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5408       msg = result.fail_msg
5409       if msg:
5410         for new_lv in new_lvs:
5411           msg2 = self.rpc.call_blockdev_remove(tgt_node, new_lv).fail_msg
5412           if msg2:
5413             warning("Can't rollback device %s: %s", dev, msg2,
5414                     hint="cleanup manually the unused logical volumes")
5415         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5416
5417       dev.children = new_lvs
5418       cfg.Update(instance)
5419
5420     # Step: wait for sync
5421
5422     # this can fail as the old devices are degraded and _WaitForSync
5423     # does a combined result over all disks, so we don't check its
5424     # return value
5425     self.proc.LogStep(5, steps_total, "sync devices")
5426     _WaitForSync(self, instance, unlock=True)
5427
5428     # so check manually all the devices
5429     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5430       cfg.SetDiskID(dev, instance.primary_node)
5431       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5432       msg = result.fail_msg
5433       if not msg and not result.payload:
5434         msg = "disk not found"
5435       if msg:
5436         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5437                                  (name, msg))
5438       if result.payload[5]:
5439         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5440
5441     # Step: remove old storage
5442     self.proc.LogStep(6, steps_total, "removing old storage")
5443     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5444       info("remove logical volumes for %s" % name)
5445       for lv in old_lvs:
5446         cfg.SetDiskID(lv, tgt_node)
5447         msg = self.rpc.call_blockdev_remove(tgt_node, lv).fail_msg
5448         if msg:
5449           warning("Can't remove old LV: %s" % msg,
5450                   hint="manually remove unused LVs")
5451           continue
5452
5453   def _ExecD8Secondary(self, feedback_fn):
5454     """Replace the secondary node for drbd8.
5455
5456     The algorithm for replace is quite complicated:
5457       - for all disks of the instance:
5458         - create new LVs on the new node with same names
5459         - shutdown the drbd device on the old secondary
5460         - disconnect the drbd network on the primary
5461         - create the drbd device on the new secondary
5462         - network attach the drbd on the primary, using an artifice:
5463           the drbd code for Attach() will connect to the network if it
5464           finds a device which is connected to the good local disks but
5465           not network enabled
5466       - wait for sync across all devices
5467       - remove all disks from the old secondary
5468
5469     Failures are not very well handled.
5470
5471     """
5472     steps_total = 6
5473     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5474     instance = self.instance
5475     iv_names = {}
5476     # start of work
5477     cfg = self.cfg
5478     old_node = self.tgt_node
5479     new_node = self.new_node
5480     pri_node = instance.primary_node
5481     nodes_ip = {
5482       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5483       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5484       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5485       }
5486
5487     # Step: check device activation
5488     self.proc.LogStep(1, steps_total, "check device existence")
5489     info("checking volume groups")
5490     my_vg = cfg.GetVGName()
5491     results = self.rpc.call_vg_list([pri_node, new_node])
5492     for node in pri_node, new_node:
5493       res = results[node]
5494       res.Raise("Error checking node %s" % node)
5495       if my_vg not in res.payload:
5496         raise errors.OpExecError("Volume group '%s' not found on %s" %
5497                                  (my_vg, node))
5498     for idx, dev in enumerate(instance.disks):
5499       if idx not in self.op.disks:
5500         continue
5501       info("checking disk/%d on %s" % (idx, pri_node))
5502       cfg.SetDiskID(dev, pri_node)
5503       result = self.rpc.call_blockdev_find(pri_node, dev)
5504       msg = result.fail_msg
5505       if not msg and not result.payload:
5506         msg = "disk not found"
5507       if msg:
5508         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5509                                  (idx, pri_node, msg))
5510
5511     # Step: check other node consistency
5512     self.proc.LogStep(2, steps_total, "check peer consistency")
5513     for idx, dev in enumerate(instance.disks):
5514       if idx not in self.op.disks:
5515         continue
5516       info("checking disk/%d consistency on %s" % (idx, pri_node))
5517       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5518         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5519                                  " unsafe to replace the secondary" %
5520                                  pri_node)
5521
5522     # Step: create new storage
5523     self.proc.LogStep(3, steps_total, "allocate new storage")
5524     for idx, dev in enumerate(instance.disks):
5525       info("adding new local storage on %s for disk/%d" %
5526            (new_node, idx))
5527       # we pass force_create=True to force LVM creation
5528       for new_lv in dev.children:
5529         _CreateBlockDev(self, new_node, instance, new_lv, True,
5530                         _GetInstanceInfoText(instance), False)
5531
5532     # Step 4: dbrd minors and drbd setups changes
5533     # after this, we must manually remove the drbd minors on both the
5534     # error and the success paths
5535     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5536                                    instance.name)
5537     logging.debug("Allocated minors %s" % (minors,))
5538     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5539     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5540       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5541       # create new devices on new_node; note that we create two IDs:
5542       # one without port, so the drbd will be activated without
5543       # networking information on the new node at this stage, and one
5544       # with network, for the latter activation in step 4
5545       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5546       if pri_node == o_node1:
5547         p_minor = o_minor1
5548       else:
5549         p_minor = o_minor2
5550
5551       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5552       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5553
5554       iv_names[idx] = (dev, dev.children, new_net_id)
5555       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5556                     new_net_id)
5557       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5558                               logical_id=new_alone_id,
5559                               children=dev.children,
5560                               size=dev.size)
5561       try:
5562         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5563                               _GetInstanceInfoText(instance), False)
5564       except errors.GenericError:
5565         self.cfg.ReleaseDRBDMinors(instance.name)
5566         raise
5567
5568     for idx, dev in enumerate(instance.disks):
5569       # we have new devices, shutdown the drbd on the old secondary
5570       info("shutting down drbd for disk/%d on old node" % idx)
5571       cfg.SetDiskID(dev, old_node)
5572       msg = self.rpc.call_blockdev_shutdown(old_node, dev).fail_msg
5573       if msg:
5574         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5575                 (idx, msg),
5576                 hint="Please cleanup this device manually as soon as possible")
5577
5578     info("detaching primary drbds from the network (=> standalone)")
5579     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5580                                                instance.disks)[pri_node]
5581
5582     msg = result.fail_msg
5583     if msg:
5584       # detaches didn't succeed (unlikely)
5585       self.cfg.ReleaseDRBDMinors(instance.name)
5586       raise errors.OpExecError("Can't detach the disks from the network on"
5587                                " old node: %s" % (msg,))
5588
5589     # if we managed to detach at least one, we update all the disks of
5590     # the instance to point to the new secondary
5591     info("updating instance configuration")
5592     for dev, _, new_logical_id in iv_names.itervalues():
5593       dev.logical_id = new_logical_id
5594       cfg.SetDiskID(dev, pri_node)
5595     cfg.Update(instance)
5596
5597     # and now perform the drbd attach
5598     info("attaching primary drbds to new secondary (standalone => connected)")
5599     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5600                                            instance.disks, instance.name,
5601                                            False)
5602     for to_node, to_result in result.items():
5603       msg = to_result.fail_msg
5604       if msg:
5605         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5606                 hint="please do a gnt-instance info to see the"
5607                 " status of disks")
5608
5609     # this can fail as the old devices are degraded and _WaitForSync
5610     # does a combined result over all disks, so we don't check its
5611     # return value
5612     self.proc.LogStep(5, steps_total, "sync devices")
5613     _WaitForSync(self, instance, unlock=True)
5614
5615     # so check manually all the devices
5616     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5617       cfg.SetDiskID(dev, pri_node)
5618       result = self.rpc.call_blockdev_find(pri_node, dev)
5619       msg = result.fail_msg
5620       if not msg and not result.payload:
5621         msg = "disk not found"
5622       if msg:
5623         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5624                                  (idx, msg))
5625       if result.payload[5]:
5626         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5627
5628     self.proc.LogStep(6, steps_total, "removing old storage")
5629     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5630       info("remove logical volumes for disk/%d" % idx)
5631       for lv in old_lvs:
5632         cfg.SetDiskID(lv, old_node)
5633         msg = self.rpc.call_blockdev_remove(old_node, lv).fail_msg
5634         if msg:
5635           warning("Can't remove LV on old secondary: %s", msg,
5636                   hint="Cleanup stale volumes by hand")
5637
5638   def Exec(self, feedback_fn):
5639     """Execute disk replacement.
5640
5641     This dispatches the disk replacement to the appropriate handler.
5642
5643     """
5644     instance = self.instance
5645
5646     # Activate the instance disks if we're replacing them on a down instance
5647     if not instance.admin_up:
5648       _StartInstanceDisks(self, instance, True)
5649
5650     if self.op.mode == constants.REPLACE_DISK_CHG:
5651       fn = self._ExecD8Secondary
5652     else:
5653       fn = self._ExecD8DiskOnly
5654
5655     ret = fn(feedback_fn)
5656
5657     # Deactivate the instance disks if we're replacing them on a down instance
5658     if not instance.admin_up:
5659       _SafeShutdownInstanceDisks(self, instance)
5660
5661     return ret
5662
5663
5664 class LUGrowDisk(LogicalUnit):
5665   """Grow a disk of an instance.
5666
5667   """
5668   HPATH = "disk-grow"
5669   HTYPE = constants.HTYPE_INSTANCE
5670   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5671   REQ_BGL = False
5672
5673   def ExpandNames(self):
5674     self._ExpandAndLockInstance()
5675     self.needed_locks[locking.LEVEL_NODE] = []
5676     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5677
5678   def DeclareLocks(self, level):
5679     if level == locking.LEVEL_NODE:
5680       self._LockInstancesNodes()
5681
5682   def BuildHooksEnv(self):
5683     """Build hooks env.
5684
5685     This runs on the master, the primary and all the secondaries.
5686
5687     """
5688     env = {
5689       "DISK": self.op.disk,
5690       "AMOUNT": self.op.amount,
5691       }
5692     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5693     nl = [
5694       self.cfg.GetMasterNode(),
5695       self.instance.primary_node,
5696       ]
5697     return env, nl, nl
5698
5699   def CheckPrereq(self):
5700     """Check prerequisites.
5701
5702     This checks that the instance is in the cluster.
5703
5704     """
5705     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5706     assert instance is not None, \
5707       "Cannot retrieve locked instance %s" % self.op.instance_name
5708     nodenames = list(instance.all_nodes)
5709     for node in nodenames:
5710       _CheckNodeOnline(self, node)
5711
5712
5713     self.instance = instance
5714
5715     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5716       raise errors.OpPrereqError("Instance's disk layout does not support"
5717                                  " growing.")
5718
5719     self.disk = instance.FindDisk(self.op.disk)
5720
5721     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5722                                        instance.hypervisor)
5723     for node in nodenames:
5724       info = nodeinfo[node]
5725       info.Raise("Cannot get current information from node %s" % node)
5726       vg_free = info.payload.get('vg_free', None)
5727       if not isinstance(vg_free, int):
5728         raise errors.OpPrereqError("Can't compute free disk space on"
5729                                    " node %s" % node)
5730       if self.op.amount > vg_free:
5731         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5732                                    " %d MiB available, %d MiB required" %
5733                                    (node, vg_free, self.op.amount))
5734
5735   def Exec(self, feedback_fn):
5736     """Execute disk grow.
5737
5738     """
5739     instance = self.instance
5740     disk = self.disk
5741     for node in instance.all_nodes:
5742       self.cfg.SetDiskID(disk, node)
5743       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5744       result.Raise("Grow request failed to node %s" % node)
5745     disk.RecordGrow(self.op.amount)
5746     self.cfg.Update(instance)
5747     if self.op.wait_for_sync:
5748       disk_abort = not _WaitForSync(self, instance)
5749       if disk_abort:
5750         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5751                              " status.\nPlease check the instance.")
5752
5753
5754 class LUQueryInstanceData(NoHooksLU):
5755   """Query runtime instance data.
5756
5757   """
5758   _OP_REQP = ["instances", "static"]
5759   REQ_BGL = False
5760
5761   def ExpandNames(self):
5762     self.needed_locks = {}
5763     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5764
5765     if not isinstance(self.op.instances, list):
5766       raise errors.OpPrereqError("Invalid argument type 'instances'")
5767
5768     if self.op.instances:
5769       self.wanted_names = []
5770       for name in self.op.instances:
5771         full_name = self.cfg.ExpandInstanceName(name)
5772         if full_name is None:
5773           raise errors.OpPrereqError("Instance '%s' not known" % name)
5774         self.wanted_names.append(full_name)
5775       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5776     else:
5777       self.wanted_names = None
5778       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5779
5780     self.needed_locks[locking.LEVEL_NODE] = []
5781     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5782
5783   def DeclareLocks(self, level):
5784     if level == locking.LEVEL_NODE:
5785       self._LockInstancesNodes()
5786
5787   def CheckPrereq(self):
5788     """Check prerequisites.
5789
5790     This only checks the optional instance list against the existing names.
5791
5792     """
5793     if self.wanted_names is None:
5794       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5795
5796     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5797                              in self.wanted_names]
5798     return
5799
5800   def _ComputeDiskStatus(self, instance, snode, dev):
5801     """Compute block device status.
5802
5803     """
5804     static = self.op.static
5805     if not static:
5806       self.cfg.SetDiskID(dev, instance.primary_node)
5807       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5808       if dev_pstatus.offline:
5809         dev_pstatus = None
5810       else:
5811         dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
5812         dev_pstatus = dev_pstatus.payload
5813     else:
5814       dev_pstatus = None
5815
5816     if dev.dev_type in constants.LDS_DRBD:
5817       # we change the snode then (otherwise we use the one passed in)
5818       if dev.logical_id[0] == instance.primary_node:
5819         snode = dev.logical_id[1]
5820       else:
5821         snode = dev.logical_id[0]
5822
5823     if snode and not static:
5824       self.cfg.SetDiskID(dev, snode)
5825       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5826       if dev_sstatus.offline:
5827         dev_sstatus = None
5828       else:
5829         dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
5830         dev_sstatus = dev_sstatus.payload
5831     else:
5832       dev_sstatus = None
5833
5834     if dev.children:
5835       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5836                       for child in dev.children]
5837     else:
5838       dev_children = []
5839
5840     data = {
5841       "iv_name": dev.iv_name,
5842       "dev_type": dev.dev_type,
5843       "logical_id": dev.logical_id,
5844       "physical_id": dev.physical_id,
5845       "pstatus": dev_pstatus,
5846       "sstatus": dev_sstatus,
5847       "children": dev_children,
5848       "mode": dev.mode,
5849       "size": dev.size,
5850       }
5851
5852     return data
5853
5854   def Exec(self, feedback_fn):
5855     """Gather and return data"""
5856     result = {}
5857
5858     cluster = self.cfg.GetClusterInfo()
5859
5860     for instance in self.wanted_instances:
5861       if not self.op.static:
5862         remote_info = self.rpc.call_instance_info(instance.primary_node,
5863                                                   instance.name,
5864                                                   instance.hypervisor)
5865         remote_info.Raise("Error checking node %s" % instance.primary_node)
5866         remote_info = remote_info.payload
5867         if remote_info and "state" in remote_info:
5868           remote_state = "up"
5869         else:
5870           remote_state = "down"
5871       else:
5872         remote_state = None
5873       if instance.admin_up:
5874         config_state = "up"
5875       else:
5876         config_state = "down"
5877
5878       disks = [self._ComputeDiskStatus(instance, None, device)
5879                for device in instance.disks]
5880
5881       idict = {
5882         "name": instance.name,
5883         "config_state": config_state,
5884         "run_state": remote_state,
5885         "pnode": instance.primary_node,
5886         "snodes": instance.secondary_nodes,
5887         "os": instance.os,
5888         # this happens to be the same format used for hooks
5889         "nics": _NICListToTuple(self, instance.nics),
5890         "disks": disks,
5891         "hypervisor": instance.hypervisor,
5892         "network_port": instance.network_port,
5893         "hv_instance": instance.hvparams,
5894         "hv_actual": cluster.FillHV(instance),
5895         "be_instance": instance.beparams,
5896         "be_actual": cluster.FillBE(instance),
5897         }
5898
5899       result[instance.name] = idict
5900
5901     return result
5902
5903
5904 class LUSetInstanceParams(LogicalUnit):
5905   """Modifies an instances's parameters.
5906
5907   """
5908   HPATH = "instance-modify"
5909   HTYPE = constants.HTYPE_INSTANCE
5910   _OP_REQP = ["instance_name"]
5911   REQ_BGL = False
5912
5913   def CheckArguments(self):
5914     if not hasattr(self.op, 'nics'):
5915       self.op.nics = []
5916     if not hasattr(self.op, 'disks'):
5917       self.op.disks = []
5918     if not hasattr(self.op, 'beparams'):
5919       self.op.beparams = {}
5920     if not hasattr(self.op, 'hvparams'):
5921       self.op.hvparams = {}
5922     self.op.force = getattr(self.op, "force", False)
5923     if not (self.op.nics or self.op.disks or
5924             self.op.hvparams or self.op.beparams):
5925       raise errors.OpPrereqError("No changes submitted")
5926
5927     # Disk validation
5928     disk_addremove = 0
5929     for disk_op, disk_dict in self.op.disks:
5930       if disk_op == constants.DDM_REMOVE:
5931         disk_addremove += 1
5932         continue
5933       elif disk_op == constants.DDM_ADD:
5934         disk_addremove += 1
5935       else:
5936         if not isinstance(disk_op, int):
5937           raise errors.OpPrereqError("Invalid disk index")
5938         if not isinstance(disk_dict, dict):
5939           msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
5940           raise errors.OpPrereqError(msg)
5941
5942       if disk_op == constants.DDM_ADD:
5943         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5944         if mode not in constants.DISK_ACCESS_SET:
5945           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5946         size = disk_dict.get('size', None)
5947         if size is None:
5948           raise errors.OpPrereqError("Required disk parameter size missing")
5949         try:
5950           size = int(size)
5951         except ValueError, err:
5952           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5953                                      str(err))
5954         disk_dict['size'] = size
5955       else:
5956         # modification of disk
5957         if 'size' in disk_dict:
5958           raise errors.OpPrereqError("Disk size change not possible, use"
5959                                      " grow-disk")
5960
5961     if disk_addremove > 1:
5962       raise errors.OpPrereqError("Only one disk add or remove operation"
5963                                  " supported at a time")
5964
5965     # NIC validation
5966     nic_addremove = 0
5967     for nic_op, nic_dict in self.op.nics:
5968       if nic_op == constants.DDM_REMOVE:
5969         nic_addremove += 1
5970         continue
5971       elif nic_op == constants.DDM_ADD:
5972         nic_addremove += 1
5973       else:
5974         if not isinstance(nic_op, int):
5975           raise errors.OpPrereqError("Invalid nic index")
5976         if not isinstance(nic_dict, dict):
5977           msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
5978           raise errors.OpPrereqError(msg)
5979
5980       # nic_dict should be a dict
5981       nic_ip = nic_dict.get('ip', None)
5982       if nic_ip is not None:
5983         if nic_ip.lower() == constants.VALUE_NONE:
5984           nic_dict['ip'] = None
5985         else:
5986           if not utils.IsValidIP(nic_ip):
5987             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5988
5989       nic_bridge = nic_dict.get('bridge', None)
5990       nic_link = nic_dict.get('link', None)
5991       if nic_bridge and nic_link:
5992         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5993                                    " at the same time")
5994       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5995         nic_dict['bridge'] = None
5996       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5997         nic_dict['link'] = None
5998
5999       if nic_op == constants.DDM_ADD:
6000         nic_mac = nic_dict.get('mac', None)
6001         if nic_mac is None:
6002           nic_dict['mac'] = constants.VALUE_AUTO
6003
6004       if 'mac' in nic_dict:
6005         nic_mac = nic_dict['mac']
6006         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6007           if not utils.IsValidMac(nic_mac):
6008             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6009         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6010           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6011                                      " modifying an existing nic")
6012
6013     if nic_addremove > 1:
6014       raise errors.OpPrereqError("Only one NIC add or remove operation"
6015                                  " supported at a time")
6016
6017   def ExpandNames(self):
6018     self._ExpandAndLockInstance()
6019     self.needed_locks[locking.LEVEL_NODE] = []
6020     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6021
6022   def DeclareLocks(self, level):
6023     if level == locking.LEVEL_NODE:
6024       self._LockInstancesNodes()
6025
6026   def BuildHooksEnv(self):
6027     """Build hooks env.
6028
6029     This runs on the master, primary and secondaries.
6030
6031     """
6032     args = dict()
6033     if constants.BE_MEMORY in self.be_new:
6034       args['memory'] = self.be_new[constants.BE_MEMORY]
6035     if constants.BE_VCPUS in self.be_new:
6036       args['vcpus'] = self.be_new[constants.BE_VCPUS]
6037     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6038     # information at all.
6039     if self.op.nics:
6040       args['nics'] = []
6041       nic_override = dict(self.op.nics)
6042       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6043       for idx, nic in enumerate(self.instance.nics):
6044         if idx in nic_override:
6045           this_nic_override = nic_override[idx]
6046         else:
6047           this_nic_override = {}
6048         if 'ip' in this_nic_override:
6049           ip = this_nic_override['ip']
6050         else:
6051           ip = nic.ip
6052         if 'mac' in this_nic_override:
6053           mac = this_nic_override['mac']
6054         else:
6055           mac = nic.mac
6056         if idx in self.nic_pnew:
6057           nicparams = self.nic_pnew[idx]
6058         else:
6059           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6060         mode = nicparams[constants.NIC_MODE]
6061         link = nicparams[constants.NIC_LINK]
6062         args['nics'].append((ip, mac, mode, link))
6063       if constants.DDM_ADD in nic_override:
6064         ip = nic_override[constants.DDM_ADD].get('ip', None)
6065         mac = nic_override[constants.DDM_ADD]['mac']
6066         nicparams = self.nic_pnew[constants.DDM_ADD]
6067         mode = nicparams[constants.NIC_MODE]
6068         link = nicparams[constants.NIC_LINK]
6069         args['nics'].append((ip, mac, mode, link))
6070       elif constants.DDM_REMOVE in nic_override:
6071         del args['nics'][-1]
6072
6073     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6074     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6075     return env, nl, nl
6076
6077   def _GetUpdatedParams(self, old_params, update_dict,
6078                         default_values, parameter_types):
6079     """Return the new params dict for the given params.
6080
6081     @type old_params: dict
6082     @param old_params: old parameters
6083     @type update_dict: dict
6084     @param update_dict: dict containing new parameter values,
6085                         or constants.VALUE_DEFAULT to reset the
6086                         parameter to its default value
6087     @type default_values: dict
6088     @param default_values: default values for the filled parameters
6089     @type parameter_types: dict
6090     @param parameter_types: dict mapping target dict keys to types
6091                             in constants.ENFORCEABLE_TYPES
6092     @rtype: (dict, dict)
6093     @return: (new_parameters, filled_parameters)
6094
6095     """
6096     params_copy = copy.deepcopy(old_params)
6097     for key, val in update_dict.iteritems():
6098       if val == constants.VALUE_DEFAULT:
6099         try:
6100           del params_copy[key]
6101         except KeyError:
6102           pass
6103       else:
6104         params_copy[key] = val
6105     utils.ForceDictType(params_copy, parameter_types)
6106     params_filled = objects.FillDict(default_values, params_copy)
6107     return (params_copy, params_filled)
6108
6109   def CheckPrereq(self):
6110     """Check prerequisites.
6111
6112     This only checks the instance list against the existing names.
6113
6114     """
6115     self.force = self.op.force
6116
6117     # checking the new params on the primary/secondary nodes
6118
6119     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6120     cluster = self.cluster = self.cfg.GetClusterInfo()
6121     assert self.instance is not None, \
6122       "Cannot retrieve locked instance %s" % self.op.instance_name
6123     pnode = instance.primary_node
6124     nodelist = list(instance.all_nodes)
6125
6126     # hvparams processing
6127     if self.op.hvparams:
6128       i_hvdict, hv_new = self._GetUpdatedParams(
6129                              instance.hvparams, self.op.hvparams,
6130                              cluster.hvparams[instance.hypervisor],
6131                              constants.HVS_PARAMETER_TYPES)
6132       # local check
6133       hypervisor.GetHypervisor(
6134         instance.hypervisor).CheckParameterSyntax(hv_new)
6135       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6136       self.hv_new = hv_new # the new actual values
6137       self.hv_inst = i_hvdict # the new dict (without defaults)
6138     else:
6139       self.hv_new = self.hv_inst = {}
6140
6141     # beparams processing
6142     if self.op.beparams:
6143       i_bedict, be_new = self._GetUpdatedParams(
6144                              instance.beparams, self.op.beparams,
6145                              cluster.beparams[constants.PP_DEFAULT],
6146                              constants.BES_PARAMETER_TYPES)
6147       self.be_new = be_new # the new actual values
6148       self.be_inst = i_bedict # the new dict (without defaults)
6149     else:
6150       self.be_new = self.be_inst = {}
6151
6152     self.warn = []
6153
6154     if constants.BE_MEMORY in self.op.beparams and not self.force:
6155       mem_check_list = [pnode]
6156       if be_new[constants.BE_AUTO_BALANCE]:
6157         # either we changed auto_balance to yes or it was from before
6158         mem_check_list.extend(instance.secondary_nodes)
6159       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6160                                                   instance.hypervisor)
6161       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6162                                          instance.hypervisor)
6163       pninfo = nodeinfo[pnode]
6164       msg = pninfo.fail_msg
6165       if msg:
6166         # Assume the primary node is unreachable and go ahead
6167         self.warn.append("Can't get info from primary node %s: %s" %
6168                          (pnode,  msg))
6169       elif not isinstance(pninfo.payload.get('memory_free', None), int):
6170         self.warn.append("Node data from primary node %s doesn't contain"
6171                          " free memory information" % pnode)
6172       elif instance_info.fail_msg:
6173         self.warn.append("Can't get instance runtime information: %s" %
6174                         instance_info.fail_msg)
6175       else:
6176         if instance_info.payload:
6177           current_mem = int(instance_info.payload['memory'])
6178         else:
6179           # Assume instance not running
6180           # (there is a slight race condition here, but it's not very probable,
6181           # and we have no other way to check)
6182           current_mem = 0
6183         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6184                     pninfo.payload['memory_free'])
6185         if miss_mem > 0:
6186           raise errors.OpPrereqError("This change will prevent the instance"
6187                                      " from starting, due to %d MB of memory"
6188                                      " missing on its primary node" % miss_mem)
6189
6190       if be_new[constants.BE_AUTO_BALANCE]:
6191         for node, nres in nodeinfo.items():
6192           if node not in instance.secondary_nodes:
6193             continue
6194           msg = nres.fail_msg
6195           if msg:
6196             self.warn.append("Can't get info from secondary node %s: %s" %
6197                              (node, msg))
6198           elif not isinstance(nres.payload.get('memory_free', None), int):
6199             self.warn.append("Secondary node %s didn't return free"
6200                              " memory information" % node)
6201           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6202             self.warn.append("Not enough memory to failover instance to"
6203                              " secondary node %s" % node)
6204
6205     # NIC processing
6206     self.nic_pnew = {}
6207     self.nic_pinst = {}
6208     for nic_op, nic_dict in self.op.nics:
6209       if nic_op == constants.DDM_REMOVE:
6210         if not instance.nics:
6211           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6212         continue
6213       if nic_op != constants.DDM_ADD:
6214         # an existing nic
6215         if nic_op < 0 or nic_op >= len(instance.nics):
6216           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6217                                      " are 0 to %d" %
6218                                      (nic_op, len(instance.nics)))
6219         old_nic_params = instance.nics[nic_op].nicparams
6220         old_nic_ip = instance.nics[nic_op].ip
6221       else:
6222         old_nic_params = {}
6223         old_nic_ip = None
6224
6225       update_params_dict = dict([(key, nic_dict[key])
6226                                  for key in constants.NICS_PARAMETERS
6227                                  if key in nic_dict])
6228
6229       if 'bridge' in nic_dict:
6230         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6231
6232       new_nic_params, new_filled_nic_params = \
6233           self._GetUpdatedParams(old_nic_params, update_params_dict,
6234                                  cluster.nicparams[constants.PP_DEFAULT],
6235                                  constants.NICS_PARAMETER_TYPES)
6236       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6237       self.nic_pinst[nic_op] = new_nic_params
6238       self.nic_pnew[nic_op] = new_filled_nic_params
6239       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6240
6241       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6242         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6243         msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6244         if msg:
6245           msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6246           if self.force:
6247             self.warn.append(msg)
6248           else:
6249             raise errors.OpPrereqError(msg)
6250       if new_nic_mode == constants.NIC_MODE_ROUTED:
6251         if 'ip' in nic_dict:
6252           nic_ip = nic_dict['ip']
6253         else:
6254           nic_ip = old_nic_ip
6255         if nic_ip is None:
6256           raise errors.OpPrereqError('Cannot set the nic ip to None'
6257                                      ' on a routed nic')
6258       if 'mac' in nic_dict:
6259         nic_mac = nic_dict['mac']
6260         if nic_mac is None:
6261           raise errors.OpPrereqError('Cannot set the nic mac to None')
6262         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6263           # otherwise generate the mac
6264           nic_dict['mac'] = self.cfg.GenerateMAC()
6265         else:
6266           # or validate/reserve the current one
6267           if self.cfg.IsMacInUse(nic_mac):
6268             raise errors.OpPrereqError("MAC address %s already in use"
6269                                        " in cluster" % nic_mac)
6270
6271     # DISK processing
6272     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6273       raise errors.OpPrereqError("Disk operations not supported for"
6274                                  " diskless instances")
6275     for disk_op, disk_dict in self.op.disks:
6276       if disk_op == constants.DDM_REMOVE:
6277         if len(instance.disks) == 1:
6278           raise errors.OpPrereqError("Cannot remove the last disk of"
6279                                      " an instance")
6280         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6281         ins_l = ins_l[pnode]
6282         msg = ins_l.fail_msg
6283         if msg:
6284           raise errors.OpPrereqError("Can't contact node %s: %s" %
6285                                      (pnode, msg))
6286         if instance.name in ins_l.payload:
6287           raise errors.OpPrereqError("Instance is running, can't remove"
6288                                      " disks.")
6289
6290       if (disk_op == constants.DDM_ADD and
6291           len(instance.nics) >= constants.MAX_DISKS):
6292         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6293                                    " add more" % constants.MAX_DISKS)
6294       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6295         # an existing disk
6296         if disk_op < 0 or disk_op >= len(instance.disks):
6297           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6298                                      " are 0 to %d" %
6299                                      (disk_op, len(instance.disks)))
6300
6301     return
6302
6303   def Exec(self, feedback_fn):
6304     """Modifies an instance.
6305
6306     All parameters take effect only at the next restart of the instance.
6307
6308     """
6309     # Process here the warnings from CheckPrereq, as we don't have a
6310     # feedback_fn there.
6311     for warn in self.warn:
6312       feedback_fn("WARNING: %s" % warn)
6313
6314     result = []
6315     instance = self.instance
6316     cluster = self.cluster
6317     # disk changes
6318     for disk_op, disk_dict in self.op.disks:
6319       if disk_op == constants.DDM_REMOVE:
6320         # remove the last disk
6321         device = instance.disks.pop()
6322         device_idx = len(instance.disks)
6323         for node, disk in device.ComputeNodeTree(instance.primary_node):
6324           self.cfg.SetDiskID(disk, node)
6325           msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6326           if msg:
6327             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6328                             " continuing anyway", device_idx, node, msg)
6329         result.append(("disk/%d" % device_idx, "remove"))
6330       elif disk_op == constants.DDM_ADD:
6331         # add a new disk
6332         if instance.disk_template == constants.DT_FILE:
6333           file_driver, file_path = instance.disks[0].logical_id
6334           file_path = os.path.dirname(file_path)
6335         else:
6336           file_driver = file_path = None
6337         disk_idx_base = len(instance.disks)
6338         new_disk = _GenerateDiskTemplate(self,
6339                                          instance.disk_template,
6340                                          instance.name, instance.primary_node,
6341                                          instance.secondary_nodes,
6342                                          [disk_dict],
6343                                          file_path,
6344                                          file_driver,
6345                                          disk_idx_base)[0]
6346         instance.disks.append(new_disk)
6347         info = _GetInstanceInfoText(instance)
6348
6349         logging.info("Creating volume %s for instance %s",
6350                      new_disk.iv_name, instance.name)
6351         # Note: this needs to be kept in sync with _CreateDisks
6352         #HARDCODE
6353         for node in instance.all_nodes:
6354           f_create = node == instance.primary_node
6355           try:
6356             _CreateBlockDev(self, node, instance, new_disk,
6357                             f_create, info, f_create)
6358           except errors.OpExecError, err:
6359             self.LogWarning("Failed to create volume %s (%s) on"
6360                             " node %s: %s",
6361                             new_disk.iv_name, new_disk, node, err)
6362         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6363                        (new_disk.size, new_disk.mode)))
6364       else:
6365         # change a given disk
6366         instance.disks[disk_op].mode = disk_dict['mode']
6367         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6368     # NIC changes
6369     for nic_op, nic_dict in self.op.nics:
6370       if nic_op == constants.DDM_REMOVE:
6371         # remove the last nic
6372         del instance.nics[-1]
6373         result.append(("nic.%d" % len(instance.nics), "remove"))
6374       elif nic_op == constants.DDM_ADD:
6375         # mac and bridge should be set, by now
6376         mac = nic_dict['mac']
6377         ip = nic_dict.get('ip', None)
6378         nicparams = self.nic_pinst[constants.DDM_ADD]
6379         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6380         instance.nics.append(new_nic)
6381         result.append(("nic.%d" % (len(instance.nics) - 1),
6382                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6383                        (new_nic.mac, new_nic.ip,
6384                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6385                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6386                        )))
6387       else:
6388         for key in 'mac', 'ip':
6389           if key in nic_dict:
6390             setattr(instance.nics[nic_op], key, nic_dict[key])
6391         if nic_op in self.nic_pnew:
6392           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6393         for key, val in nic_dict.iteritems():
6394           result.append(("nic.%s/%d" % (key, nic_op), val))
6395
6396     # hvparams changes
6397     if self.op.hvparams:
6398       instance.hvparams = self.hv_inst
6399       for key, val in self.op.hvparams.iteritems():
6400         result.append(("hv/%s" % key, val))
6401
6402     # beparams changes
6403     if self.op.beparams:
6404       instance.beparams = self.be_inst
6405       for key, val in self.op.beparams.iteritems():
6406         result.append(("be/%s" % key, val))
6407
6408     self.cfg.Update(instance)
6409
6410     return result
6411
6412
6413 class LUQueryExports(NoHooksLU):
6414   """Query the exports list
6415
6416   """
6417   _OP_REQP = ['nodes']
6418   REQ_BGL = False
6419
6420   def ExpandNames(self):
6421     self.needed_locks = {}
6422     self.share_locks[locking.LEVEL_NODE] = 1
6423     if not self.op.nodes:
6424       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6425     else:
6426       self.needed_locks[locking.LEVEL_NODE] = \
6427         _GetWantedNodes(self, self.op.nodes)
6428
6429   def CheckPrereq(self):
6430     """Check prerequisites.
6431
6432     """
6433     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6434
6435   def Exec(self, feedback_fn):
6436     """Compute the list of all the exported system images.
6437
6438     @rtype: dict
6439     @return: a dictionary with the structure node->(export-list)
6440         where export-list is a list of the instances exported on
6441         that node.
6442
6443     """
6444     rpcresult = self.rpc.call_export_list(self.nodes)
6445     result = {}
6446     for node in rpcresult:
6447       if rpcresult[node].fail_msg:
6448         result[node] = False
6449       else:
6450         result[node] = rpcresult[node].payload
6451
6452     return result
6453
6454
6455 class LUExportInstance(LogicalUnit):
6456   """Export an instance to an image in the cluster.
6457
6458   """
6459   HPATH = "instance-export"
6460   HTYPE = constants.HTYPE_INSTANCE
6461   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6462   REQ_BGL = False
6463
6464   def ExpandNames(self):
6465     self._ExpandAndLockInstance()
6466     # FIXME: lock only instance primary and destination node
6467     #
6468     # Sad but true, for now we have do lock all nodes, as we don't know where
6469     # the previous export might be, and and in this LU we search for it and
6470     # remove it from its current node. In the future we could fix this by:
6471     #  - making a tasklet to search (share-lock all), then create the new one,
6472     #    then one to remove, after
6473     #  - removing the removal operation altogether
6474     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6475
6476   def DeclareLocks(self, level):
6477     """Last minute lock declaration."""
6478     # All nodes are locked anyway, so nothing to do here.
6479
6480   def BuildHooksEnv(self):
6481     """Build hooks env.
6482
6483     This will run on the master, primary node and target node.
6484
6485     """
6486     env = {
6487       "EXPORT_NODE": self.op.target_node,
6488       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6489       }
6490     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6491     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6492           self.op.target_node]
6493     return env, nl, nl
6494
6495   def CheckPrereq(self):
6496     """Check prerequisites.
6497
6498     This checks that the instance and node names are valid.
6499
6500     """
6501     instance_name = self.op.instance_name
6502     self.instance = self.cfg.GetInstanceInfo(instance_name)
6503     assert self.instance is not None, \
6504           "Cannot retrieve locked instance %s" % self.op.instance_name
6505     _CheckNodeOnline(self, self.instance.primary_node)
6506
6507     self.dst_node = self.cfg.GetNodeInfo(
6508       self.cfg.ExpandNodeName(self.op.target_node))
6509
6510     if self.dst_node is None:
6511       # This is wrong node name, not a non-locked node
6512       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6513     _CheckNodeOnline(self, self.dst_node.name)
6514     _CheckNodeNotDrained(self, self.dst_node.name)
6515
6516     # instance disk type verification
6517     for disk in self.instance.disks:
6518       if disk.dev_type == constants.LD_FILE:
6519         raise errors.OpPrereqError("Export not supported for instances with"
6520                                    " file-based disks")
6521
6522   def Exec(self, feedback_fn):
6523     """Export an instance to an image in the cluster.
6524
6525     """
6526     instance = self.instance
6527     dst_node = self.dst_node
6528     src_node = instance.primary_node
6529     if self.op.shutdown:
6530       # shutdown the instance, but not the disks
6531       result = self.rpc.call_instance_shutdown(src_node, instance)
6532       result.Raise("Could not shutdown instance %s on"
6533                    " node %s" % (instance.name, src_node))
6534
6535     vgname = self.cfg.GetVGName()
6536
6537     snap_disks = []
6538
6539     # set the disks ID correctly since call_instance_start needs the
6540     # correct drbd minor to create the symlinks
6541     for disk in instance.disks:
6542       self.cfg.SetDiskID(disk, src_node)
6543
6544     try:
6545       for idx, disk in enumerate(instance.disks):
6546         # result.payload will be a snapshot of an lvm leaf of the one we passed
6547         result = self.rpc.call_blockdev_snapshot(src_node, disk)
6548         msg = result.fail_msg
6549         if msg:
6550           self.LogWarning("Could not snapshot disk/%s on node %s: %s",
6551                           idx, src_node, msg)
6552           snap_disks.append(False)
6553         else:
6554           disk_id = (vgname, result.payload)
6555           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6556                                  logical_id=disk_id, physical_id=disk_id,
6557                                  iv_name=disk.iv_name)
6558           snap_disks.append(new_dev)
6559
6560     finally:
6561       if self.op.shutdown and instance.admin_up:
6562         result = self.rpc.call_instance_start(src_node, instance, None, None)
6563         msg = result.fail_msg
6564         if msg:
6565           _ShutdownInstanceDisks(self, instance)
6566           raise errors.OpExecError("Could not start instance: %s" % msg)
6567
6568     # TODO: check for size
6569
6570     cluster_name = self.cfg.GetClusterName()
6571     for idx, dev in enumerate(snap_disks):
6572       if dev:
6573         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6574                                                instance, cluster_name, idx)
6575         msg = result.fail_msg
6576         if msg:
6577           self.LogWarning("Could not export disk/%s from node %s to"
6578                           " node %s: %s", idx, src_node, dst_node.name, msg)
6579         msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
6580         if msg:
6581           self.LogWarning("Could not remove snapshot for disk/%d from node"
6582                           " %s: %s", idx, src_node, msg)
6583
6584     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6585     msg = result.fail_msg
6586     if msg:
6587       self.LogWarning("Could not finalize export for instance %s"
6588                       " on node %s: %s", instance.name, dst_node.name, msg)
6589
6590     nodelist = self.cfg.GetNodeList()
6591     nodelist.remove(dst_node.name)
6592
6593     # on one-node clusters nodelist will be empty after the removal
6594     # if we proceed the backup would be removed because OpQueryExports
6595     # substitutes an empty list with the full cluster node list.
6596     iname = instance.name
6597     if nodelist:
6598       exportlist = self.rpc.call_export_list(nodelist)
6599       for node in exportlist:
6600         if exportlist[node].fail_msg:
6601           continue
6602         if iname in exportlist[node].payload:
6603           msg = self.rpc.call_export_remove(node, iname).fail_msg
6604           if msg:
6605             self.LogWarning("Could not remove older export for instance %s"
6606                             " on node %s: %s", iname, node, msg)
6607
6608
6609 class LURemoveExport(NoHooksLU):
6610   """Remove exports related to the named instance.
6611
6612   """
6613   _OP_REQP = ["instance_name"]
6614   REQ_BGL = False
6615
6616   def ExpandNames(self):
6617     self.needed_locks = {}
6618     # We need all nodes to be locked in order for RemoveExport to work, but we
6619     # don't need to lock the instance itself, as nothing will happen to it (and
6620     # we can remove exports also for a removed instance)
6621     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6622
6623   def CheckPrereq(self):
6624     """Check prerequisites.
6625     """
6626     pass
6627
6628   def Exec(self, feedback_fn):
6629     """Remove any export.
6630
6631     """
6632     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6633     # If the instance was not found we'll try with the name that was passed in.
6634     # This will only work if it was an FQDN, though.
6635     fqdn_warn = False
6636     if not instance_name:
6637       fqdn_warn = True
6638       instance_name = self.op.instance_name
6639
6640     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6641     exportlist = self.rpc.call_export_list(locked_nodes)
6642     found = False
6643     for node in exportlist:
6644       msg = exportlist[node].fail_msg
6645       if msg:
6646         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6647         continue
6648       if instance_name in exportlist[node].payload:
6649         found = True
6650         result = self.rpc.call_export_remove(node, instance_name)
6651         msg = result.fail_msg
6652         if msg:
6653           logging.error("Could not remove export for instance %s"
6654                         " on node %s: %s", instance_name, node, msg)
6655
6656     if fqdn_warn and not found:
6657       feedback_fn("Export not found. If trying to remove an export belonging"
6658                   " to a deleted instance please use its Fully Qualified"
6659                   " Domain Name.")
6660
6661
6662 class TagsLU(NoHooksLU):
6663   """Generic tags LU.
6664
6665   This is an abstract class which is the parent of all the other tags LUs.
6666
6667   """
6668
6669   def ExpandNames(self):
6670     self.needed_locks = {}
6671     if self.op.kind == constants.TAG_NODE:
6672       name = self.cfg.ExpandNodeName(self.op.name)
6673       if name is None:
6674         raise errors.OpPrereqError("Invalid node name (%s)" %
6675                                    (self.op.name,))
6676       self.op.name = name
6677       self.needed_locks[locking.LEVEL_NODE] = name
6678     elif self.op.kind == constants.TAG_INSTANCE:
6679       name = self.cfg.ExpandInstanceName(self.op.name)
6680       if name is None:
6681         raise errors.OpPrereqError("Invalid instance name (%s)" %
6682                                    (self.op.name,))
6683       self.op.name = name
6684       self.needed_locks[locking.LEVEL_INSTANCE] = name
6685
6686   def CheckPrereq(self):
6687     """Check prerequisites.
6688
6689     """
6690     if self.op.kind == constants.TAG_CLUSTER:
6691       self.target = self.cfg.GetClusterInfo()
6692     elif self.op.kind == constants.TAG_NODE:
6693       self.target = self.cfg.GetNodeInfo(self.op.name)
6694     elif self.op.kind == constants.TAG_INSTANCE:
6695       self.target = self.cfg.GetInstanceInfo(self.op.name)
6696     else:
6697       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6698                                  str(self.op.kind))
6699
6700
6701 class LUGetTags(TagsLU):
6702   """Returns the tags of a given object.
6703
6704   """
6705   _OP_REQP = ["kind", "name"]
6706   REQ_BGL = False
6707
6708   def Exec(self, feedback_fn):
6709     """Returns the tag list.
6710
6711     """
6712     return list(self.target.GetTags())
6713
6714
6715 class LUSearchTags(NoHooksLU):
6716   """Searches the tags for a given pattern.
6717
6718   """
6719   _OP_REQP = ["pattern"]
6720   REQ_BGL = False
6721
6722   def ExpandNames(self):
6723     self.needed_locks = {}
6724
6725   def CheckPrereq(self):
6726     """Check prerequisites.
6727
6728     This checks the pattern passed for validity by compiling it.
6729
6730     """
6731     try:
6732       self.re = re.compile(self.op.pattern)
6733     except re.error, err:
6734       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6735                                  (self.op.pattern, err))
6736
6737   def Exec(self, feedback_fn):
6738     """Returns the tag list.
6739
6740     """
6741     cfg = self.cfg
6742     tgts = [("/cluster", cfg.GetClusterInfo())]
6743     ilist = cfg.GetAllInstancesInfo().values()
6744     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6745     nlist = cfg.GetAllNodesInfo().values()
6746     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6747     results = []
6748     for path, target in tgts:
6749       for tag in target.GetTags():
6750         if self.re.search(tag):
6751           results.append((path, tag))
6752     return results
6753
6754
6755 class LUAddTags(TagsLU):
6756   """Sets a tag on a given object.
6757
6758   """
6759   _OP_REQP = ["kind", "name", "tags"]
6760   REQ_BGL = False
6761
6762   def CheckPrereq(self):
6763     """Check prerequisites.
6764
6765     This checks the type and length of the tag name and value.
6766
6767     """
6768     TagsLU.CheckPrereq(self)
6769     for tag in self.op.tags:
6770       objects.TaggableObject.ValidateTag(tag)
6771
6772   def Exec(self, feedback_fn):
6773     """Sets the tag.
6774
6775     """
6776     try:
6777       for tag in self.op.tags:
6778         self.target.AddTag(tag)
6779     except errors.TagError, err:
6780       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6781     try:
6782       self.cfg.Update(self.target)
6783     except errors.ConfigurationError:
6784       raise errors.OpRetryError("There has been a modification to the"
6785                                 " config file and the operation has been"
6786                                 " aborted. Please retry.")
6787
6788
6789 class LUDelTags(TagsLU):
6790   """Delete a list of tags from a given object.
6791
6792   """
6793   _OP_REQP = ["kind", "name", "tags"]
6794   REQ_BGL = False
6795
6796   def CheckPrereq(self):
6797     """Check prerequisites.
6798
6799     This checks that we have the given tag.
6800
6801     """
6802     TagsLU.CheckPrereq(self)
6803     for tag in self.op.tags:
6804       objects.TaggableObject.ValidateTag(tag)
6805     del_tags = frozenset(self.op.tags)
6806     cur_tags = self.target.GetTags()
6807     if not del_tags <= cur_tags:
6808       diff_tags = del_tags - cur_tags
6809       diff_names = ["'%s'" % tag for tag in diff_tags]
6810       diff_names.sort()
6811       raise errors.OpPrereqError("Tag(s) %s not found" %
6812                                  (",".join(diff_names)))
6813
6814   def Exec(self, feedback_fn):
6815     """Remove the tag from the object.
6816
6817     """
6818     for tag in self.op.tags:
6819       self.target.RemoveTag(tag)
6820     try:
6821       self.cfg.Update(self.target)
6822     except errors.ConfigurationError:
6823       raise errors.OpRetryError("There has been a modification to the"
6824                                 " config file and the operation has been"
6825                                 " aborted. Please retry.")
6826
6827
6828 class LUTestDelay(NoHooksLU):
6829   """Sleep for a specified amount of time.
6830
6831   This LU sleeps on the master and/or nodes for a specified amount of
6832   time.
6833
6834   """
6835   _OP_REQP = ["duration", "on_master", "on_nodes"]
6836   REQ_BGL = False
6837
6838   def ExpandNames(self):
6839     """Expand names and set required locks.
6840
6841     This expands the node list, if any.
6842
6843     """
6844     self.needed_locks = {}
6845     if self.op.on_nodes:
6846       # _GetWantedNodes can be used here, but is not always appropriate to use
6847       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6848       # more information.
6849       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6850       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6851
6852   def CheckPrereq(self):
6853     """Check prerequisites.
6854
6855     """
6856
6857   def Exec(self, feedback_fn):
6858     """Do the actual sleep.
6859
6860     """
6861     if self.op.on_master:
6862       if not utils.TestDelay(self.op.duration):
6863         raise errors.OpExecError("Error during master delay test")
6864     if self.op.on_nodes:
6865       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6866       for node, node_result in result.items():
6867         node_result.Raise("Failure during rpc call to node %s" % node)
6868
6869
6870 class IAllocator(object):
6871   """IAllocator framework.
6872
6873   An IAllocator instance has three sets of attributes:
6874     - cfg that is needed to query the cluster
6875     - input data (all members of the _KEYS class attribute are required)
6876     - four buffer attributes (in|out_data|text), that represent the
6877       input (to the external script) in text and data structure format,
6878       and the output from it, again in two formats
6879     - the result variables from the script (success, info, nodes) for
6880       easy usage
6881
6882   """
6883   _ALLO_KEYS = [
6884     "mem_size", "disks", "disk_template",
6885     "os", "tags", "nics", "vcpus", "hypervisor",
6886     ]
6887   _RELO_KEYS = [
6888     "relocate_from",
6889     ]
6890
6891   def __init__(self, lu, mode, name, **kwargs):
6892     self.lu = lu
6893     # init buffer variables
6894     self.in_text = self.out_text = self.in_data = self.out_data = None
6895     # init all input fields so that pylint is happy
6896     self.mode = mode
6897     self.name = name
6898     self.mem_size = self.disks = self.disk_template = None
6899     self.os = self.tags = self.nics = self.vcpus = None
6900     self.hypervisor = None
6901     self.relocate_from = None
6902     # computed fields
6903     self.required_nodes = None
6904     # init result fields
6905     self.success = self.info = self.nodes = None
6906     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6907       keyset = self._ALLO_KEYS
6908     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6909       keyset = self._RELO_KEYS
6910     else:
6911       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6912                                    " IAllocator" % self.mode)
6913     for key in kwargs:
6914       if key not in keyset:
6915         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6916                                      " IAllocator" % key)
6917       setattr(self, key, kwargs[key])
6918     for key in keyset:
6919       if key not in kwargs:
6920         raise errors.ProgrammerError("Missing input parameter '%s' to"
6921                                      " IAllocator" % key)
6922     self._BuildInputData()
6923
6924   def _ComputeClusterData(self):
6925     """Compute the generic allocator input data.
6926
6927     This is the data that is independent of the actual operation.
6928
6929     """
6930     cfg = self.lu.cfg
6931     cluster_info = cfg.GetClusterInfo()
6932     # cluster data
6933     data = {
6934       "version": constants.IALLOCATOR_VERSION,
6935       "cluster_name": cfg.GetClusterName(),
6936       "cluster_tags": list(cluster_info.GetTags()),
6937       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6938       # we don't have job IDs
6939       }
6940     iinfo = cfg.GetAllInstancesInfo().values()
6941     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6942
6943     # node data
6944     node_results = {}
6945     node_list = cfg.GetNodeList()
6946
6947     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6948       hypervisor_name = self.hypervisor
6949     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6950       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6951
6952     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6953                                            hypervisor_name)
6954     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6955                        cluster_info.enabled_hypervisors)
6956     for nname, nresult in node_data.items():
6957       # first fill in static (config-based) values
6958       ninfo = cfg.GetNodeInfo(nname)
6959       pnr = {
6960         "tags": list(ninfo.GetTags()),
6961         "primary_ip": ninfo.primary_ip,
6962         "secondary_ip": ninfo.secondary_ip,
6963         "offline": ninfo.offline,
6964         "drained": ninfo.drained,
6965         "master_candidate": ninfo.master_candidate,
6966         }
6967
6968       if not ninfo.offline:
6969         nresult.Raise("Can't get data for node %s" % nname)
6970         node_iinfo[nname].Raise("Can't get node instance info from node %s" %
6971                                 nname)
6972         remote_info = nresult.payload
6973         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6974                      'vg_size', 'vg_free', 'cpu_total']:
6975           if attr not in remote_info:
6976             raise errors.OpExecError("Node '%s' didn't return attribute"
6977                                      " '%s'" % (nname, attr))
6978           if not isinstance(remote_info[attr], int):
6979             raise errors.OpExecError("Node '%s' returned invalid value"
6980                                      " for '%s': %s" %
6981                                      (nname, attr, remote_info[attr]))
6982         # compute memory used by primary instances
6983         i_p_mem = i_p_up_mem = 0
6984         for iinfo, beinfo in i_list:
6985           if iinfo.primary_node == nname:
6986             i_p_mem += beinfo[constants.BE_MEMORY]
6987             if iinfo.name not in node_iinfo[nname].payload:
6988               i_used_mem = 0
6989             else:
6990               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6991             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6992             remote_info['memory_free'] -= max(0, i_mem_diff)
6993
6994             if iinfo.admin_up:
6995               i_p_up_mem += beinfo[constants.BE_MEMORY]
6996
6997         # compute memory used by instances
6998         pnr_dyn = {
6999           "total_memory": remote_info['memory_total'],
7000           "reserved_memory": remote_info['memory_dom0'],
7001           "free_memory": remote_info['memory_free'],
7002           "total_disk": remote_info['vg_size'],
7003           "free_disk": remote_info['vg_free'],
7004           "total_cpus": remote_info['cpu_total'],
7005           "i_pri_memory": i_p_mem,
7006           "i_pri_up_memory": i_p_up_mem,
7007           }
7008         pnr.update(pnr_dyn)
7009
7010       node_results[nname] = pnr
7011     data["nodes"] = node_results
7012
7013     # instance data
7014     instance_data = {}
7015     for iinfo, beinfo in i_list:
7016       nic_data = []
7017       for nic in iinfo.nics:
7018         filled_params = objects.FillDict(
7019             cluster_info.nicparams[constants.PP_DEFAULT],
7020             nic.nicparams)
7021         nic_dict = {"mac": nic.mac,
7022                     "ip": nic.ip,
7023                     "mode": filled_params[constants.NIC_MODE],
7024                     "link": filled_params[constants.NIC_LINK],
7025                    }
7026         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7027           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7028         nic_data.append(nic_dict)
7029       pir = {
7030         "tags": list(iinfo.GetTags()),
7031         "admin_up": iinfo.admin_up,
7032         "vcpus": beinfo[constants.BE_VCPUS],
7033         "memory": beinfo[constants.BE_MEMORY],
7034         "os": iinfo.os,
7035         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7036         "nics": nic_data,
7037         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7038         "disk_template": iinfo.disk_template,
7039         "hypervisor": iinfo.hypervisor,
7040         }
7041       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7042                                                  pir["disks"])
7043       instance_data[iinfo.name] = pir
7044
7045     data["instances"] = instance_data
7046
7047     self.in_data = data
7048
7049   def _AddNewInstance(self):
7050     """Add new instance data to allocator structure.
7051
7052     This in combination with _AllocatorGetClusterData will create the
7053     correct structure needed as input for the allocator.
7054
7055     The checks for the completeness of the opcode must have already been
7056     done.
7057
7058     """
7059     data = self.in_data
7060
7061     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7062
7063     if self.disk_template in constants.DTS_NET_MIRROR:
7064       self.required_nodes = 2
7065     else:
7066       self.required_nodes = 1
7067     request = {
7068       "type": "allocate",
7069       "name": self.name,
7070       "disk_template": self.disk_template,
7071       "tags": self.tags,
7072       "os": self.os,
7073       "vcpus": self.vcpus,
7074       "memory": self.mem_size,
7075       "disks": self.disks,
7076       "disk_space_total": disk_space,
7077       "nics": self.nics,
7078       "required_nodes": self.required_nodes,
7079       }
7080     data["request"] = request
7081
7082   def _AddRelocateInstance(self):
7083     """Add relocate instance data to allocator structure.
7084
7085     This in combination with _IAllocatorGetClusterData will create the
7086     correct structure needed as input for the allocator.
7087
7088     The checks for the completeness of the opcode must have already been
7089     done.
7090
7091     """
7092     instance = self.lu.cfg.GetInstanceInfo(self.name)
7093     if instance is None:
7094       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7095                                    " IAllocator" % self.name)
7096
7097     if instance.disk_template not in constants.DTS_NET_MIRROR:
7098       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7099
7100     if len(instance.secondary_nodes) != 1:
7101       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7102
7103     self.required_nodes = 1
7104     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7105     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7106
7107     request = {
7108       "type": "relocate",
7109       "name": self.name,
7110       "disk_space_total": disk_space,
7111       "required_nodes": self.required_nodes,
7112       "relocate_from": self.relocate_from,
7113       }
7114     self.in_data["request"] = request
7115
7116   def _BuildInputData(self):
7117     """Build input data structures.
7118
7119     """
7120     self._ComputeClusterData()
7121
7122     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7123       self._AddNewInstance()
7124     else:
7125       self._AddRelocateInstance()
7126
7127     self.in_text = serializer.Dump(self.in_data)
7128
7129   def Run(self, name, validate=True, call_fn=None):
7130     """Run an instance allocator and return the results.
7131
7132     """
7133     if call_fn is None:
7134       call_fn = self.lu.rpc.call_iallocator_runner
7135
7136     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7137     result.Raise("Failure while running the iallocator script")
7138
7139     self.out_text = result.payload
7140     if validate:
7141       self._ValidateResult()
7142
7143   def _ValidateResult(self):
7144     """Process the allocator results.
7145
7146     This will process and if successful save the result in
7147     self.out_data and the other parameters.
7148
7149     """
7150     try:
7151       rdict = serializer.Load(self.out_text)
7152     except Exception, err:
7153       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7154
7155     if not isinstance(rdict, dict):
7156       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7157
7158     for key in "success", "info", "nodes":
7159       if key not in rdict:
7160         raise errors.OpExecError("Can't parse iallocator results:"
7161                                  " missing key '%s'" % key)
7162       setattr(self, key, rdict[key])
7163
7164     if not isinstance(rdict["nodes"], list):
7165       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7166                                " is not a list")
7167     self.out_data = rdict
7168
7169
7170 class LUTestAllocator(NoHooksLU):
7171   """Run allocator tests.
7172
7173   This LU runs the allocator tests
7174
7175   """
7176   _OP_REQP = ["direction", "mode", "name"]
7177
7178   def CheckPrereq(self):
7179     """Check prerequisites.
7180
7181     This checks the opcode parameters depending on the director and mode test.
7182
7183     """
7184     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7185       for attr in ["name", "mem_size", "disks", "disk_template",
7186                    "os", "tags", "nics", "vcpus"]:
7187         if not hasattr(self.op, attr):
7188           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7189                                      attr)
7190       iname = self.cfg.ExpandInstanceName(self.op.name)
7191       if iname is not None:
7192         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7193                                    iname)
7194       if not isinstance(self.op.nics, list):
7195         raise errors.OpPrereqError("Invalid parameter 'nics'")
7196       for row in self.op.nics:
7197         if (not isinstance(row, dict) or
7198             "mac" not in row or
7199             "ip" not in row or
7200             "bridge" not in row):
7201           raise errors.OpPrereqError("Invalid contents of the"
7202                                      " 'nics' parameter")
7203       if not isinstance(self.op.disks, list):
7204         raise errors.OpPrereqError("Invalid parameter 'disks'")
7205       for row in self.op.disks:
7206         if (not isinstance(row, dict) or
7207             "size" not in row or
7208             not isinstance(row["size"], int) or
7209             "mode" not in row or
7210             row["mode"] not in ['r', 'w']):
7211           raise errors.OpPrereqError("Invalid contents of the"
7212                                      " 'disks' parameter")
7213       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7214         self.op.hypervisor = self.cfg.GetHypervisorType()
7215     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7216       if not hasattr(self.op, "name"):
7217         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7218       fname = self.cfg.ExpandInstanceName(self.op.name)
7219       if fname is None:
7220         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7221                                    self.op.name)
7222       self.op.name = fname
7223       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7224     else:
7225       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7226                                  self.op.mode)
7227
7228     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7229       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7230         raise errors.OpPrereqError("Missing allocator name")
7231     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7232       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7233                                  self.op.direction)
7234
7235   def Exec(self, feedback_fn):
7236     """Run the allocator test.
7237
7238     """
7239     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7240       ial = IAllocator(self,
7241                        mode=self.op.mode,
7242                        name=self.op.name,
7243                        mem_size=self.op.mem_size,
7244                        disks=self.op.disks,
7245                        disk_template=self.op.disk_template,
7246                        os=self.op.os,
7247                        tags=self.op.tags,
7248                        nics=self.op.nics,
7249                        vcpus=self.op.vcpus,
7250                        hypervisor=self.op.hypervisor,
7251                        )
7252     else:
7253       ial = IAllocator(self,
7254                        mode=self.op.mode,
7255                        name=self.op.name,
7256                        relocate_from=list(self.relocate_from),
7257                        )
7258
7259     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7260       result = ial.in_text
7261     else:
7262       ial.Run(self.op.allocator, validate=False)
7263       result = ial.out_text
7264     return result