Small whitespace change
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, mac, mode, link) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_MAC" % idx] = mac
509       env["INSTANCE_NIC%d_MODE" % idx] = mode
510       env["INSTANCE_NIC%d_LINK" % idx] = link
511       if mode == constants.NIC_MODE_BRIDGED:
512         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
513   else:
514     nic_count = 0
515
516   env["INSTANCE_NIC_COUNT"] = nic_count
517
518   if disks:
519     disk_count = len(disks)
520     for idx, (size, mode) in enumerate(disks):
521       env["INSTANCE_DISK%d_SIZE" % idx] = size
522       env["INSTANCE_DISK%d_MODE" % idx] = mode
523   else:
524     disk_count = 0
525
526   env["INSTANCE_DISK_COUNT"] = disk_count
527
528   return env
529
530 def _PreBuildNICHooksList(lu, nics):
531   """Build a list of nic information tuples.
532
533   This list is suitable to be passed to _BuildInstanceHookEnv.
534
535   @type lu:  L{LogicalUnit}
536   @param lu: the logical unit on whose behalf we execute
537   @type nics: list of L{objects.NIC}
538   @param nics: list of nics to convert to hooks tuples
539
540   """
541   hooks_nics = []
542   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
543   for nic in nics:
544     ip = nic.ip
545     mac = nic.mac
546     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547     mode = filled_params[constants.NIC_MODE]
548     link = filled_params[constants.NIC_LINK]
549     hooks_nics.append((ip, mac, mode, link))
550   return hooks_nics
551
552 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553   """Builds instance related env variables for hooks from an object.
554
555   @type lu: L{LogicalUnit}
556   @param lu: the logical unit on whose behalf we execute
557   @type instance: L{objects.Instance}
558   @param instance: the instance for which we should build the
559       environment
560   @type override: dict
561   @param override: dictionary with key/values that will override
562       our values
563   @rtype: dict
564   @return: the hook environment dictionary
565
566   """
567   bep = lu.cfg.GetClusterInfo().FillBE(instance)
568   args = {
569     'name': instance.name,
570     'primary_node': instance.primary_node,
571     'secondary_nodes': instance.secondary_nodes,
572     'os_type': instance.os,
573     'status': instance.admin_up,
574     'memory': bep[constants.BE_MEMORY],
575     'vcpus': bep[constants.BE_VCPUS],
576     'nics': _PreBuildNICHooksList(lu, instance.nics),
577     'disk_template': instance.disk_template,
578     'disks': [(disk.size, disk.mode) for disk in instance.disks],
579   }
580   if override:
581     args.update(override)
582   return _BuildInstanceHookEnv(**args)
583
584
585 def _AdjustCandidatePool(lu):
586   """Adjust the candidate pool after node operations.
587
588   """
589   mod_list = lu.cfg.MaintainCandidatePool()
590   if mod_list:
591     lu.LogInfo("Promoted nodes to master candidate role: %s",
592                ", ".join(node.name for node in mod_list))
593     for name in mod_list:
594       lu.context.ReaddNode(name)
595   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
596   if mc_now > mc_max:
597     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
598                (mc_now, mc_max))
599
600
601 def _CheckNicsBridgesExist(lu, target_nics, target_node,
602                                profile=constants.PP_DEFAULT):
603   """Check that the brigdes needed by a list of nics exist.
604
605   """
606   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608                 for nic in target_nics]
609   brlist = [params[constants.NIC_LINK] for params in paramslist
610             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
611   if brlist:
612     result = lu.rpc.call_bridges_exist(target_node, brlist)
613     result.Raise("Error checking bridges on destination node '%s'" %
614                  target_node, prereq=True)
615
616
617 def _CheckInstanceBridgesExist(lu, instance, node=None):
618   """Check that the brigdes needed by an instance exist.
619
620   """
621   if node is None:
622     node=instance.primary_node
623   _CheckNicsBridgesExist(lu, instance.nics, node)
624
625
626 class LUDestroyCluster(NoHooksLU):
627   """Logical unit for destroying the cluster.
628
629   """
630   _OP_REQP = []
631
632   def CheckPrereq(self):
633     """Check prerequisites.
634
635     This checks whether the cluster is empty.
636
637     Any errors are signalled by raising errors.OpPrereqError.
638
639     """
640     master = self.cfg.GetMasterNode()
641
642     nodelist = self.cfg.GetNodeList()
643     if len(nodelist) != 1 or nodelist[0] != master:
644       raise errors.OpPrereqError("There are still %d node(s) in"
645                                  " this cluster." % (len(nodelist) - 1))
646     instancelist = self.cfg.GetInstanceList()
647     if instancelist:
648       raise errors.OpPrereqError("There are still %d instance(s) in"
649                                  " this cluster." % len(instancelist))
650
651   def Exec(self, feedback_fn):
652     """Destroys the cluster.
653
654     """
655     master = self.cfg.GetMasterNode()
656     result = self.rpc.call_node_stop_master(master, False)
657     result.Raise("Could not disable the master role")
658     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
659     utils.CreateBackup(priv_key)
660     utils.CreateBackup(pub_key)
661     return master
662
663
664 class LUVerifyCluster(LogicalUnit):
665   """Verifies the cluster status.
666
667   """
668   HPATH = "cluster-verify"
669   HTYPE = constants.HTYPE_CLUSTER
670   _OP_REQP = ["skip_checks"]
671   REQ_BGL = False
672
673   def ExpandNames(self):
674     self.needed_locks = {
675       locking.LEVEL_NODE: locking.ALL_SET,
676       locking.LEVEL_INSTANCE: locking.ALL_SET,
677     }
678     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
679
680   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
681                   node_result, feedback_fn, master_files,
682                   drbd_map, vg_name):
683     """Run multiple tests against a node.
684
685     Test list:
686
687       - compares ganeti version
688       - checks vg existance and size > 20G
689       - checks config file checksum
690       - checks ssh to other nodes
691
692     @type nodeinfo: L{objects.Node}
693     @param nodeinfo: the node to check
694     @param file_list: required list of files
695     @param local_cksum: dictionary of local files and their checksums
696     @param node_result: the results from the node
697     @param feedback_fn: function used to accumulate results
698     @param master_files: list of files that only masters should have
699     @param drbd_map: the useddrbd minors for this node, in
700         form of minor: (instance, must_exist) which correspond to instances
701         and their running status
702     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
703
704     """
705     node = nodeinfo.name
706
707     # main result, node_result should be a non-empty dict
708     if not node_result or not isinstance(node_result, dict):
709       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
710       return True
711
712     # compares ganeti version
713     local_version = constants.PROTOCOL_VERSION
714     remote_version = node_result.get('version', None)
715     if not (remote_version and isinstance(remote_version, (list, tuple)) and
716             len(remote_version) == 2):
717       feedback_fn("  - ERROR: connection to %s failed" % (node))
718       return True
719
720     if local_version != remote_version[0]:
721       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
722                   " node %s %s" % (local_version, node, remote_version[0]))
723       return True
724
725     # node seems compatible, we can actually try to look into its results
726
727     bad = False
728
729     # full package version
730     if constants.RELEASE_VERSION != remote_version[1]:
731       feedback_fn("  - WARNING: software version mismatch: master %s,"
732                   " node %s %s" %
733                   (constants.RELEASE_VERSION, node, remote_version[1]))
734
735     # checks vg existence and size > 20G
736     if vg_name is not None:
737       vglist = node_result.get(constants.NV_VGLIST, None)
738       if not vglist:
739         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
740                         (node,))
741         bad = True
742       else:
743         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
744                                               constants.MIN_VG_SIZE)
745         if vgstatus:
746           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
747           bad = True
748
749     # checks config file checksum
750
751     remote_cksum = node_result.get(constants.NV_FILELIST, None)
752     if not isinstance(remote_cksum, dict):
753       bad = True
754       feedback_fn("  - ERROR: node hasn't returned file checksum data")
755     else:
756       for file_name in file_list:
757         node_is_mc = nodeinfo.master_candidate
758         must_have_file = file_name not in master_files
759         if file_name not in remote_cksum:
760           if node_is_mc or must_have_file:
761             bad = True
762             feedback_fn("  - ERROR: file '%s' missing" % file_name)
763         elif remote_cksum[file_name] != local_cksum[file_name]:
764           if node_is_mc or must_have_file:
765             bad = True
766             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
767           else:
768             # not candidate and this is not a must-have file
769             bad = True
770             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
771                         " '%s'" % file_name)
772         else:
773           # all good, except non-master/non-must have combination
774           if not node_is_mc and not must_have_file:
775             feedback_fn("  - ERROR: file '%s' should not exist on non master"
776                         " candidates" % file_name)
777
778     # checks ssh to any
779
780     if constants.NV_NODELIST not in node_result:
781       bad = True
782       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
783     else:
784       if node_result[constants.NV_NODELIST]:
785         bad = True
786         for node in node_result[constants.NV_NODELIST]:
787           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
788                           (node, node_result[constants.NV_NODELIST][node]))
789
790     if constants.NV_NODENETTEST not in node_result:
791       bad = True
792       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
793     else:
794       if node_result[constants.NV_NODENETTEST]:
795         bad = True
796         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
797         for node in nlist:
798           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
799                           (node, node_result[constants.NV_NODENETTEST][node]))
800
801     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
802     if isinstance(hyp_result, dict):
803       for hv_name, hv_result in hyp_result.iteritems():
804         if hv_result is not None:
805           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
806                       (hv_name, hv_result))
807
808     # check used drbd list
809     if vg_name is not None:
810       used_minors = node_result.get(constants.NV_DRBDLIST, [])
811       if not isinstance(used_minors, (tuple, list)):
812         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
813                     str(used_minors))
814       else:
815         for minor, (iname, must_exist) in drbd_map.items():
816           if minor not in used_minors and must_exist:
817             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
818                         " not active" % (minor, iname))
819             bad = True
820         for minor in used_minors:
821           if minor not in drbd_map:
822             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
823                         minor)
824             bad = True
825
826     return bad
827
828   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
829                       node_instance, feedback_fn, n_offline):
830     """Verify an instance.
831
832     This function checks to see if the required block devices are
833     available on the instance's node.
834
835     """
836     bad = False
837
838     node_current = instanceconfig.primary_node
839
840     node_vol_should = {}
841     instanceconfig.MapLVsByNode(node_vol_should)
842
843     for node in node_vol_should:
844       if node in n_offline:
845         # ignore missing volumes on offline nodes
846         continue
847       for volume in node_vol_should[node]:
848         if node not in node_vol_is or volume not in node_vol_is[node]:
849           feedback_fn("  - ERROR: volume %s missing on node %s" %
850                           (volume, node))
851           bad = True
852
853     if instanceconfig.admin_up:
854       if ((node_current not in node_instance or
855           not instance in node_instance[node_current]) and
856           node_current not in n_offline):
857         feedback_fn("  - ERROR: instance %s not running on node %s" %
858                         (instance, node_current))
859         bad = True
860
861     for node in node_instance:
862       if (not node == node_current):
863         if instance in node_instance[node]:
864           feedback_fn("  - ERROR: instance %s should not run on node %s" %
865                           (instance, node))
866           bad = True
867
868     return bad
869
870   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
871     """Verify if there are any unknown volumes in the cluster.
872
873     The .os, .swap and backup volumes are ignored. All other volumes are
874     reported as unknown.
875
876     """
877     bad = False
878
879     for node in node_vol_is:
880       for volume in node_vol_is[node]:
881         if node not in node_vol_should or volume not in node_vol_should[node]:
882           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
883                       (volume, node))
884           bad = True
885     return bad
886
887   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
888     """Verify the list of running instances.
889
890     This checks what instances are running but unknown to the cluster.
891
892     """
893     bad = False
894     for node in node_instance:
895       for runninginstance in node_instance[node]:
896         if runninginstance not in instancelist:
897           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
898                           (runninginstance, node))
899           bad = True
900     return bad
901
902   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
903     """Verify N+1 Memory Resilience.
904
905     Check that if one single node dies we can still start all the instances it
906     was primary for.
907
908     """
909     bad = False
910
911     for node, nodeinfo in node_info.iteritems():
912       # This code checks that every node which is now listed as secondary has
913       # enough memory to host all instances it is supposed to should a single
914       # other node in the cluster fail.
915       # FIXME: not ready for failover to an arbitrary node
916       # FIXME: does not support file-backed instances
917       # WARNING: we currently take into account down instances as well as up
918       # ones, considering that even if they're down someone might want to start
919       # them even in the event of a node failure.
920       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
921         needed_mem = 0
922         for instance in instances:
923           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
924           if bep[constants.BE_AUTO_BALANCE]:
925             needed_mem += bep[constants.BE_MEMORY]
926         if nodeinfo['mfree'] < needed_mem:
927           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
928                       " failovers should node %s fail" % (node, prinode))
929           bad = True
930     return bad
931
932   def CheckPrereq(self):
933     """Check prerequisites.
934
935     Transform the list of checks we're going to skip into a set and check that
936     all its members are valid.
937
938     """
939     self.skip_set = frozenset(self.op.skip_checks)
940     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
941       raise errors.OpPrereqError("Invalid checks to be skipped specified")
942
943   def BuildHooksEnv(self):
944     """Build hooks env.
945
946     Cluster-Verify hooks just rone in the post phase and their failure makes
947     the output be logged in the verify output and the verification to fail.
948
949     """
950     all_nodes = self.cfg.GetNodeList()
951     env = {
952       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
953       }
954     for node in self.cfg.GetAllNodesInfo().values():
955       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
956
957     return env, [], all_nodes
958
959   def Exec(self, feedback_fn):
960     """Verify integrity of cluster, performing various test on nodes.
961
962     """
963     bad = False
964     feedback_fn("* Verifying global settings")
965     for msg in self.cfg.VerifyConfig():
966       feedback_fn("  - ERROR: %s" % msg)
967
968     vg_name = self.cfg.GetVGName()
969     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
970     nodelist = utils.NiceSort(self.cfg.GetNodeList())
971     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
972     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
973     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
974                         for iname in instancelist)
975     i_non_redundant = [] # Non redundant instances
976     i_non_a_balanced = [] # Non auto-balanced instances
977     n_offline = [] # List of offline nodes
978     n_drained = [] # List of nodes being drained
979     node_volume = {}
980     node_instance = {}
981     node_info = {}
982     instance_cfg = {}
983
984     # FIXME: verify OS list
985     # do local checksums
986     master_files = [constants.CLUSTER_CONF_FILE]
987
988     file_names = ssconf.SimpleStore().GetFileList()
989     file_names.append(constants.SSL_CERT_FILE)
990     file_names.append(constants.RAPI_CERT_FILE)
991     file_names.extend(master_files)
992
993     local_checksums = utils.FingerprintFiles(file_names)
994
995     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
996     node_verify_param = {
997       constants.NV_FILELIST: file_names,
998       constants.NV_NODELIST: [node.name for node in nodeinfo
999                               if not node.offline],
1000       constants.NV_HYPERVISOR: hypervisors,
1001       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1002                                   node.secondary_ip) for node in nodeinfo
1003                                  if not node.offline],
1004       constants.NV_INSTANCELIST: hypervisors,
1005       constants.NV_VERSION: None,
1006       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1007       }
1008     if vg_name is not None:
1009       node_verify_param[constants.NV_VGLIST] = None
1010       node_verify_param[constants.NV_LVLIST] = vg_name
1011       node_verify_param[constants.NV_DRBDLIST] = None
1012     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1013                                            self.cfg.GetClusterName())
1014
1015     cluster = self.cfg.GetClusterInfo()
1016     master_node = self.cfg.GetMasterNode()
1017     all_drbd_map = self.cfg.ComputeDRBDMap()
1018
1019     for node_i in nodeinfo:
1020       node = node_i.name
1021
1022       if node_i.offline:
1023         feedback_fn("* Skipping offline node %s" % (node,))
1024         n_offline.append(node)
1025         continue
1026
1027       if node == master_node:
1028         ntype = "master"
1029       elif node_i.master_candidate:
1030         ntype = "master candidate"
1031       elif node_i.drained:
1032         ntype = "drained"
1033         n_drained.append(node)
1034       else:
1035         ntype = "regular"
1036       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1037
1038       msg = all_nvinfo[node].fail_msg
1039       if msg:
1040         feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1041         bad = True
1042         continue
1043
1044       nresult = all_nvinfo[node].payload
1045       node_drbd = {}
1046       for minor, instance in all_drbd_map[node].items():
1047         if instance not in instanceinfo:
1048           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1049                       instance)
1050           # ghost instance should not be running, but otherwise we
1051           # don't give double warnings (both ghost instance and
1052           # unallocated minor in use)
1053           node_drbd[minor] = (instance, False)
1054         else:
1055           instance = instanceinfo[instance]
1056           node_drbd[minor] = (instance.name, instance.admin_up)
1057       result = self._VerifyNode(node_i, file_names, local_checksums,
1058                                 nresult, feedback_fn, master_files,
1059                                 node_drbd, vg_name)
1060       bad = bad or result
1061
1062       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1063       if vg_name is None:
1064         node_volume[node] = {}
1065       elif isinstance(lvdata, basestring):
1066         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1067                     (node, utils.SafeEncode(lvdata)))
1068         bad = True
1069         node_volume[node] = {}
1070       elif not isinstance(lvdata, dict):
1071         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1072         bad = True
1073         continue
1074       else:
1075         node_volume[node] = lvdata
1076
1077       # node_instance
1078       idata = nresult.get(constants.NV_INSTANCELIST, None)
1079       if not isinstance(idata, list):
1080         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1081                     (node,))
1082         bad = True
1083         continue
1084
1085       node_instance[node] = idata
1086
1087       # node_info
1088       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1089       if not isinstance(nodeinfo, dict):
1090         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1091         bad = True
1092         continue
1093
1094       try:
1095         node_info[node] = {
1096           "mfree": int(nodeinfo['memory_free']),
1097           "pinst": [],
1098           "sinst": [],
1099           # dictionary holding all instances this node is secondary for,
1100           # grouped by their primary node. Each key is a cluster node, and each
1101           # value is a list of instances which have the key as primary and the
1102           # current node as secondary.  this is handy to calculate N+1 memory
1103           # availability if you can only failover from a primary to its
1104           # secondary.
1105           "sinst-by-pnode": {},
1106         }
1107         # FIXME: devise a free space model for file based instances as well
1108         if vg_name is not None:
1109           if (constants.NV_VGLIST not in nresult or
1110               vg_name not in nresult[constants.NV_VGLIST]):
1111             feedback_fn("  - ERROR: node %s didn't return data for the"
1112                         " volume group '%s' - it is either missing or broken" %
1113                         (node, vg_name))
1114             bad = True
1115             continue
1116           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1117       except (ValueError, KeyError):
1118         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1119                     " from node %s" % (node,))
1120         bad = True
1121         continue
1122
1123     node_vol_should = {}
1124
1125     for instance in instancelist:
1126       feedback_fn("* Verifying instance %s" % instance)
1127       inst_config = instanceinfo[instance]
1128       result =  self._VerifyInstance(instance, inst_config, node_volume,
1129                                      node_instance, feedback_fn, n_offline)
1130       bad = bad or result
1131       inst_nodes_offline = []
1132
1133       inst_config.MapLVsByNode(node_vol_should)
1134
1135       instance_cfg[instance] = inst_config
1136
1137       pnode = inst_config.primary_node
1138       if pnode in node_info:
1139         node_info[pnode]['pinst'].append(instance)
1140       elif pnode not in n_offline:
1141         feedback_fn("  - ERROR: instance %s, connection to primary node"
1142                     " %s failed" % (instance, pnode))
1143         bad = True
1144
1145       if pnode in n_offline:
1146         inst_nodes_offline.append(pnode)
1147
1148       # If the instance is non-redundant we cannot survive losing its primary
1149       # node, so we are not N+1 compliant. On the other hand we have no disk
1150       # templates with more than one secondary so that situation is not well
1151       # supported either.
1152       # FIXME: does not support file-backed instances
1153       if len(inst_config.secondary_nodes) == 0:
1154         i_non_redundant.append(instance)
1155       elif len(inst_config.secondary_nodes) > 1:
1156         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1157                     % instance)
1158
1159       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1160         i_non_a_balanced.append(instance)
1161
1162       for snode in inst_config.secondary_nodes:
1163         if snode in node_info:
1164           node_info[snode]['sinst'].append(instance)
1165           if pnode not in node_info[snode]['sinst-by-pnode']:
1166             node_info[snode]['sinst-by-pnode'][pnode] = []
1167           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1168         elif snode not in n_offline:
1169           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1170                       " %s failed" % (instance, snode))
1171           bad = True
1172         if snode in n_offline:
1173           inst_nodes_offline.append(snode)
1174
1175       if inst_nodes_offline:
1176         # warn that the instance lives on offline nodes, and set bad=True
1177         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1178                     ", ".join(inst_nodes_offline))
1179         bad = True
1180
1181     feedback_fn("* Verifying orphan volumes")
1182     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1183                                        feedback_fn)
1184     bad = bad or result
1185
1186     feedback_fn("* Verifying remaining instances")
1187     result = self._VerifyOrphanInstances(instancelist, node_instance,
1188                                          feedback_fn)
1189     bad = bad or result
1190
1191     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1192       feedback_fn("* Verifying N+1 Memory redundancy")
1193       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1194       bad = bad or result
1195
1196     feedback_fn("* Other Notes")
1197     if i_non_redundant:
1198       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1199                   % len(i_non_redundant))
1200
1201     if i_non_a_balanced:
1202       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1203                   % len(i_non_a_balanced))
1204
1205     if n_offline:
1206       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1207
1208     if n_drained:
1209       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1210
1211     return not bad
1212
1213   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1214     """Analize the post-hooks' result
1215
1216     This method analyses the hook result, handles it, and sends some
1217     nicely-formatted feedback back to the user.
1218
1219     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1220         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1221     @param hooks_results: the results of the multi-node hooks rpc call
1222     @param feedback_fn: function used send feedback back to the caller
1223     @param lu_result: previous Exec result
1224     @return: the new Exec result, based on the previous result
1225         and hook results
1226
1227     """
1228     # We only really run POST phase hooks, and are only interested in
1229     # their results
1230     if phase == constants.HOOKS_PHASE_POST:
1231       # Used to change hooks' output to proper indentation
1232       indent_re = re.compile('^', re.M)
1233       feedback_fn("* Hooks Results")
1234       if not hooks_results:
1235         feedback_fn("  - ERROR: general communication failure")
1236         lu_result = 1
1237       else:
1238         for node_name in hooks_results:
1239           show_node_header = True
1240           res = hooks_results[node_name]
1241           msg = res.fail_msg
1242           if msg:
1243             if res.offline:
1244               # no need to warn or set fail return value
1245               continue
1246             feedback_fn("    Communication failure in hooks execution: %s" %
1247                         msg)
1248             lu_result = 1
1249             continue
1250           for script, hkr, output in res.payload:
1251             if hkr == constants.HKR_FAIL:
1252               # The node header is only shown once, if there are
1253               # failing hooks on that node
1254               if show_node_header:
1255                 feedback_fn("  Node %s:" % node_name)
1256                 show_node_header = False
1257               feedback_fn("    ERROR: Script %s failed, output:" % script)
1258               output = indent_re.sub('      ', output)
1259               feedback_fn("%s" % output)
1260               lu_result = 1
1261
1262       return lu_result
1263
1264
1265 class LUVerifyDisks(NoHooksLU):
1266   """Verifies the cluster disks status.
1267
1268   """
1269   _OP_REQP = []
1270   REQ_BGL = False
1271
1272   def ExpandNames(self):
1273     self.needed_locks = {
1274       locking.LEVEL_NODE: locking.ALL_SET,
1275       locking.LEVEL_INSTANCE: locking.ALL_SET,
1276     }
1277     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1278
1279   def CheckPrereq(self):
1280     """Check prerequisites.
1281
1282     This has no prerequisites.
1283
1284     """
1285     pass
1286
1287   def Exec(self, feedback_fn):
1288     """Verify integrity of cluster disks.
1289
1290     @rtype: tuple of three items
1291     @return: a tuple of (dict of node-to-node_error, list of instances
1292         which need activate-disks, dict of instance: (node, volume) for
1293         missing volumes
1294
1295     """
1296     result = res_nodes, res_instances, res_missing = {}, [], {}
1297
1298     vg_name = self.cfg.GetVGName()
1299     nodes = utils.NiceSort(self.cfg.GetNodeList())
1300     instances = [self.cfg.GetInstanceInfo(name)
1301                  for name in self.cfg.GetInstanceList()]
1302
1303     nv_dict = {}
1304     for inst in instances:
1305       inst_lvs = {}
1306       if (not inst.admin_up or
1307           inst.disk_template not in constants.DTS_NET_MIRROR):
1308         continue
1309       inst.MapLVsByNode(inst_lvs)
1310       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1311       for node, vol_list in inst_lvs.iteritems():
1312         for vol in vol_list:
1313           nv_dict[(node, vol)] = inst
1314
1315     if not nv_dict:
1316       return result
1317
1318     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1319
1320     to_act = set()
1321     for node in nodes:
1322       # node_volume
1323       node_res = node_lvs[node]
1324       if node_res.offline:
1325         continue
1326       msg = node_res.fail_msg
1327       if msg:
1328         logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1329         res_nodes[node] = msg
1330         continue
1331
1332       lvs = node_res.payload
1333       for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1334         inst = nv_dict.pop((node, lv_name), None)
1335         if (not lv_online and inst is not None
1336             and inst.name not in res_instances):
1337           res_instances.append(inst.name)
1338
1339     # any leftover items in nv_dict are missing LVs, let's arrange the
1340     # data better
1341     for key, inst in nv_dict.iteritems():
1342       if inst.name not in res_missing:
1343         res_missing[inst.name] = []
1344       res_missing[inst.name].append(key)
1345
1346     return result
1347
1348
1349 class LURenameCluster(LogicalUnit):
1350   """Rename the cluster.
1351
1352   """
1353   HPATH = "cluster-rename"
1354   HTYPE = constants.HTYPE_CLUSTER
1355   _OP_REQP = ["name"]
1356
1357   def BuildHooksEnv(self):
1358     """Build hooks env.
1359
1360     """
1361     env = {
1362       "OP_TARGET": self.cfg.GetClusterName(),
1363       "NEW_NAME": self.op.name,
1364       }
1365     mn = self.cfg.GetMasterNode()
1366     return env, [mn], [mn]
1367
1368   def CheckPrereq(self):
1369     """Verify that the passed name is a valid one.
1370
1371     """
1372     hostname = utils.HostInfo(self.op.name)
1373
1374     new_name = hostname.name
1375     self.ip = new_ip = hostname.ip
1376     old_name = self.cfg.GetClusterName()
1377     old_ip = self.cfg.GetMasterIP()
1378     if new_name == old_name and new_ip == old_ip:
1379       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1380                                  " cluster has changed")
1381     if new_ip != old_ip:
1382       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1383         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1384                                    " reachable on the network. Aborting." %
1385                                    new_ip)
1386
1387     self.op.name = new_name
1388
1389   def Exec(self, feedback_fn):
1390     """Rename the cluster.
1391
1392     """
1393     clustername = self.op.name
1394     ip = self.ip
1395
1396     # shutdown the master IP
1397     master = self.cfg.GetMasterNode()
1398     result = self.rpc.call_node_stop_master(master, False)
1399     result.Raise("Could not disable the master role")
1400
1401     try:
1402       cluster = self.cfg.GetClusterInfo()
1403       cluster.cluster_name = clustername
1404       cluster.master_ip = ip
1405       self.cfg.Update(cluster)
1406
1407       # update the known hosts file
1408       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1409       node_list = self.cfg.GetNodeList()
1410       try:
1411         node_list.remove(master)
1412       except ValueError:
1413         pass
1414       result = self.rpc.call_upload_file(node_list,
1415                                          constants.SSH_KNOWN_HOSTS_FILE)
1416       for to_node, to_result in result.iteritems():
1417         msg = to_result.fail_msg
1418         if msg:
1419           msg = ("Copy of file %s to node %s failed: %s" %
1420                  (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1421           self.proc.LogWarning(msg)
1422
1423     finally:
1424       result = self.rpc.call_node_start_master(master, False)
1425       msg = result.fail_msg
1426       if msg:
1427         self.LogWarning("Could not re-enable the master role on"
1428                         " the master, please restart manually: %s", msg)
1429
1430
1431 def _RecursiveCheckIfLVMBased(disk):
1432   """Check if the given disk or its children are lvm-based.
1433
1434   @type disk: L{objects.Disk}
1435   @param disk: the disk to check
1436   @rtype: booleean
1437   @return: boolean indicating whether a LD_LV dev_type was found or not
1438
1439   """
1440   if disk.children:
1441     for chdisk in disk.children:
1442       if _RecursiveCheckIfLVMBased(chdisk):
1443         return True
1444   return disk.dev_type == constants.LD_LV
1445
1446
1447 class LUSetClusterParams(LogicalUnit):
1448   """Change the parameters of the cluster.
1449
1450   """
1451   HPATH = "cluster-modify"
1452   HTYPE = constants.HTYPE_CLUSTER
1453   _OP_REQP = []
1454   REQ_BGL = False
1455
1456   def CheckArguments(self):
1457     """Check parameters
1458
1459     """
1460     if not hasattr(self.op, "candidate_pool_size"):
1461       self.op.candidate_pool_size = None
1462     if self.op.candidate_pool_size is not None:
1463       try:
1464         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1465       except (ValueError, TypeError), err:
1466         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1467                                    str(err))
1468       if self.op.candidate_pool_size < 1:
1469         raise errors.OpPrereqError("At least one master candidate needed")
1470
1471   def ExpandNames(self):
1472     # FIXME: in the future maybe other cluster params won't require checking on
1473     # all nodes to be modified.
1474     self.needed_locks = {
1475       locking.LEVEL_NODE: locking.ALL_SET,
1476     }
1477     self.share_locks[locking.LEVEL_NODE] = 1
1478
1479   def BuildHooksEnv(self):
1480     """Build hooks env.
1481
1482     """
1483     env = {
1484       "OP_TARGET": self.cfg.GetClusterName(),
1485       "NEW_VG_NAME": self.op.vg_name,
1486       }
1487     mn = self.cfg.GetMasterNode()
1488     return env, [mn], [mn]
1489
1490   def CheckPrereq(self):
1491     """Check prerequisites.
1492
1493     This checks whether the given params don't conflict and
1494     if the given volume group is valid.
1495
1496     """
1497     if self.op.vg_name is not None and not self.op.vg_name:
1498       instances = self.cfg.GetAllInstancesInfo().values()
1499       for inst in instances:
1500         for disk in inst.disks:
1501           if _RecursiveCheckIfLVMBased(disk):
1502             raise errors.OpPrereqError("Cannot disable lvm storage while"
1503                                        " lvm-based instances exist")
1504
1505     node_list = self.acquired_locks[locking.LEVEL_NODE]
1506
1507     # if vg_name not None, checks given volume group on all nodes
1508     if self.op.vg_name:
1509       vglist = self.rpc.call_vg_list(node_list)
1510       for node in node_list:
1511         msg = vglist[node].fail_msg
1512         if msg:
1513           # ignoring down node
1514           self.LogWarning("Error while gathering data on node %s"
1515                           " (ignoring node): %s", node, msg)
1516           continue
1517         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1518                                               self.op.vg_name,
1519                                               constants.MIN_VG_SIZE)
1520         if vgstatus:
1521           raise errors.OpPrereqError("Error on node '%s': %s" %
1522                                      (node, vgstatus))
1523
1524     self.cluster = cluster = self.cfg.GetClusterInfo()
1525     # validate params changes
1526     if self.op.beparams:
1527       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1528       self.new_beparams = objects.FillDict(
1529         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1530
1531     if self.op.nicparams:
1532       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1533       self.new_nicparams = objects.FillDict(
1534         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1535       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1536
1537     # hypervisor list/parameters
1538     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1539     if self.op.hvparams:
1540       if not isinstance(self.op.hvparams, dict):
1541         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1542       for hv_name, hv_dict in self.op.hvparams.items():
1543         if hv_name not in self.new_hvparams:
1544           self.new_hvparams[hv_name] = hv_dict
1545         else:
1546           self.new_hvparams[hv_name].update(hv_dict)
1547
1548     if self.op.enabled_hypervisors is not None:
1549       self.hv_list = self.op.enabled_hypervisors
1550     else:
1551       self.hv_list = cluster.enabled_hypervisors
1552
1553     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1554       # either the enabled list has changed, or the parameters have, validate
1555       for hv_name, hv_params in self.new_hvparams.items():
1556         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1557             (self.op.enabled_hypervisors and
1558              hv_name in self.op.enabled_hypervisors)):
1559           # either this is a new hypervisor, or its parameters have changed
1560           hv_class = hypervisor.GetHypervisor(hv_name)
1561           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1562           hv_class.CheckParameterSyntax(hv_params)
1563           _CheckHVParams(self, node_list, hv_name, hv_params)
1564
1565   def Exec(self, feedback_fn):
1566     """Change the parameters of the cluster.
1567
1568     """
1569     if self.op.vg_name is not None:
1570       new_volume = self.op.vg_name
1571       if not new_volume:
1572         new_volume = None
1573       if new_volume != self.cfg.GetVGName():
1574         self.cfg.SetVGName(new_volume)
1575       else:
1576         feedback_fn("Cluster LVM configuration already in desired"
1577                     " state, not changing")
1578     if self.op.hvparams:
1579       self.cluster.hvparams = self.new_hvparams
1580     if self.op.enabled_hypervisors is not None:
1581       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1582     if self.op.beparams:
1583       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1584     if self.op.nicparams:
1585       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1586
1587     if self.op.candidate_pool_size is not None:
1588       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1589
1590     self.cfg.Update(self.cluster)
1591
1592     # we want to update nodes after the cluster so that if any errors
1593     # happen, we have recorded and saved the cluster info
1594     if self.op.candidate_pool_size is not None:
1595       _AdjustCandidatePool(self)
1596
1597
1598 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1599   """Distribute additional files which are part of the cluster configuration.
1600
1601   ConfigWriter takes care of distributing the config and ssconf files, but
1602   there are more files which should be distributed to all nodes. This function
1603   makes sure those are copied.
1604
1605   @param lu: calling logical unit
1606   @param additional_nodes: list of nodes not in the config to distribute to
1607
1608   """
1609   # 1. Gather target nodes
1610   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1611   dist_nodes = lu.cfg.GetNodeList()
1612   if additional_nodes is not None:
1613     dist_nodes.extend(additional_nodes)
1614   if myself.name in dist_nodes:
1615     dist_nodes.remove(myself.name)
1616   # 2. Gather files to distribute
1617   dist_files = set([constants.ETC_HOSTS,
1618                     constants.SSH_KNOWN_HOSTS_FILE,
1619                     constants.RAPI_CERT_FILE,
1620                     constants.RAPI_USERS_FILE,
1621                    ])
1622
1623   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1624   for hv_name in enabled_hypervisors:
1625     hv_class = hypervisor.GetHypervisor(hv_name)
1626     dist_files.update(hv_class.GetAncillaryFiles())
1627
1628   # 3. Perform the files upload
1629   for fname in dist_files:
1630     if os.path.exists(fname):
1631       result = lu.rpc.call_upload_file(dist_nodes, fname)
1632       for to_node, to_result in result.items():
1633         msg = to_result.fail_msg
1634         if msg:
1635           msg = ("Copy of file %s to node %s failed: %s" %
1636                  (fname, to_node, msg))
1637           lu.proc.LogWarning(msg)
1638
1639
1640 class LURedistributeConfig(NoHooksLU):
1641   """Force the redistribution of cluster configuration.
1642
1643   This is a very simple LU.
1644
1645   """
1646   _OP_REQP = []
1647   REQ_BGL = False
1648
1649   def ExpandNames(self):
1650     self.needed_locks = {
1651       locking.LEVEL_NODE: locking.ALL_SET,
1652     }
1653     self.share_locks[locking.LEVEL_NODE] = 1
1654
1655   def CheckPrereq(self):
1656     """Check prerequisites.
1657
1658     """
1659
1660   def Exec(self, feedback_fn):
1661     """Redistribute the configuration.
1662
1663     """
1664     self.cfg.Update(self.cfg.GetClusterInfo())
1665     _RedistributeAncillaryFiles(self)
1666
1667
1668 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1669   """Sleep and poll for an instance's disk to sync.
1670
1671   """
1672   if not instance.disks:
1673     return True
1674
1675   if not oneshot:
1676     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1677
1678   node = instance.primary_node
1679
1680   for dev in instance.disks:
1681     lu.cfg.SetDiskID(dev, node)
1682
1683   retries = 0
1684   while True:
1685     max_time = 0
1686     done = True
1687     cumul_degraded = False
1688     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1689     msg = rstats.fail_msg
1690     if msg:
1691       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1692       retries += 1
1693       if retries >= 10:
1694         raise errors.RemoteError("Can't contact node %s for mirror data,"
1695                                  " aborting." % node)
1696       time.sleep(6)
1697       continue
1698     rstats = rstats.payload
1699     retries = 0
1700     for i, mstat in enumerate(rstats):
1701       if mstat is None:
1702         lu.LogWarning("Can't compute data for node %s/%s",
1703                            node, instance.disks[i].iv_name)
1704         continue
1705       # we ignore the ldisk parameter
1706       perc_done, est_time, is_degraded, _ = mstat
1707       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1708       if perc_done is not None:
1709         done = False
1710         if est_time is not None:
1711           rem_time = "%d estimated seconds remaining" % est_time
1712           max_time = est_time
1713         else:
1714           rem_time = "no time estimate"
1715         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1716                         (instance.disks[i].iv_name, perc_done, rem_time))
1717     if done or oneshot:
1718       break
1719
1720     time.sleep(min(60, max_time))
1721
1722   if done:
1723     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1724   return not cumul_degraded
1725
1726
1727 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1728   """Check that mirrors are not degraded.
1729
1730   The ldisk parameter, if True, will change the test from the
1731   is_degraded attribute (which represents overall non-ok status for
1732   the device(s)) to the ldisk (representing the local storage status).
1733
1734   """
1735   lu.cfg.SetDiskID(dev, node)
1736   if ldisk:
1737     idx = 6
1738   else:
1739     idx = 5
1740
1741   result = True
1742   if on_primary or dev.AssembleOnSecondary():
1743     rstats = lu.rpc.call_blockdev_find(node, dev)
1744     msg = rstats.fail_msg
1745     if msg:
1746       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1747       result = False
1748     elif not rstats.payload:
1749       lu.LogWarning("Can't find disk on node %s", node)
1750       result = False
1751     else:
1752       result = result and (not rstats.payload[idx])
1753   if dev.children:
1754     for child in dev.children:
1755       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1756
1757   return result
1758
1759
1760 class LUDiagnoseOS(NoHooksLU):
1761   """Logical unit for OS diagnose/query.
1762
1763   """
1764   _OP_REQP = ["output_fields", "names"]
1765   REQ_BGL = False
1766   _FIELDS_STATIC = utils.FieldSet()
1767   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1768
1769   def ExpandNames(self):
1770     if self.op.names:
1771       raise errors.OpPrereqError("Selective OS query not supported")
1772
1773     _CheckOutputFields(static=self._FIELDS_STATIC,
1774                        dynamic=self._FIELDS_DYNAMIC,
1775                        selected=self.op.output_fields)
1776
1777     # Lock all nodes, in shared mode
1778     # Temporary removal of locks, should be reverted later
1779     # TODO: reintroduce locks when they are lighter-weight
1780     self.needed_locks = {}
1781     #self.share_locks[locking.LEVEL_NODE] = 1
1782     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1783
1784   def CheckPrereq(self):
1785     """Check prerequisites.
1786
1787     """
1788
1789   @staticmethod
1790   def _DiagnoseByOS(node_list, rlist):
1791     """Remaps a per-node return list into an a per-os per-node dictionary
1792
1793     @param node_list: a list with the names of all nodes
1794     @param rlist: a map with node names as keys and OS objects as values
1795
1796     @rtype: dict
1797     @return: a dictionary with osnames as keys and as value another map, with
1798         nodes as keys and tuples of (path, status, diagnose) as values, eg::
1799
1800           {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1801                                      (/srv/..., False, "invalid api")],
1802                            "node2": [(/srv/..., True, "")]}
1803           }
1804
1805     """
1806     all_os = {}
1807     # we build here the list of nodes that didn't fail the RPC (at RPC
1808     # level), so that nodes with a non-responding node daemon don't
1809     # make all OSes invalid
1810     good_nodes = [node_name for node_name in rlist
1811                   if not rlist[node_name].fail_msg]
1812     for node_name, nr in rlist.items():
1813       if nr.fail_msg or not nr.payload:
1814         continue
1815       for name, path, status, diagnose in nr.payload:
1816         if name not in all_os:
1817           # build a list of nodes for this os containing empty lists
1818           # for each node in node_list
1819           all_os[name] = {}
1820           for nname in good_nodes:
1821             all_os[name][nname] = []
1822         all_os[name][node_name].append((path, status, diagnose))
1823     return all_os
1824
1825   def Exec(self, feedback_fn):
1826     """Compute the list of OSes.
1827
1828     """
1829     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1830     node_data = self.rpc.call_os_diagnose(valid_nodes)
1831     pol = self._DiagnoseByOS(valid_nodes, node_data)
1832     output = []
1833     for os_name, os_data in pol.items():
1834       row = []
1835       for field in self.op.output_fields:
1836         if field == "name":
1837           val = os_name
1838         elif field == "valid":
1839           val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1840         elif field == "node_status":
1841           # this is just a copy of the dict
1842           val = {}
1843           for node_name, nos_list in os_data.items():
1844             val[node_name] = nos_list
1845         else:
1846           raise errors.ParameterError(field)
1847         row.append(val)
1848       output.append(row)
1849
1850     return output
1851
1852
1853 class LURemoveNode(LogicalUnit):
1854   """Logical unit for removing a node.
1855
1856   """
1857   HPATH = "node-remove"
1858   HTYPE = constants.HTYPE_NODE
1859   _OP_REQP = ["node_name"]
1860
1861   def BuildHooksEnv(self):
1862     """Build hooks env.
1863
1864     This doesn't run on the target node in the pre phase as a failed
1865     node would then be impossible to remove.
1866
1867     """
1868     env = {
1869       "OP_TARGET": self.op.node_name,
1870       "NODE_NAME": self.op.node_name,
1871       }
1872     all_nodes = self.cfg.GetNodeList()
1873     all_nodes.remove(self.op.node_name)
1874     return env, all_nodes, all_nodes
1875
1876   def CheckPrereq(self):
1877     """Check prerequisites.
1878
1879     This checks:
1880      - the node exists in the configuration
1881      - it does not have primary or secondary instances
1882      - it's not the master
1883
1884     Any errors are signalled by raising errors.OpPrereqError.
1885
1886     """
1887     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1888     if node is None:
1889       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1890
1891     instance_list = self.cfg.GetInstanceList()
1892
1893     masternode = self.cfg.GetMasterNode()
1894     if node.name == masternode:
1895       raise errors.OpPrereqError("Node is the master node,"
1896                                  " you need to failover first.")
1897
1898     for instance_name in instance_list:
1899       instance = self.cfg.GetInstanceInfo(instance_name)
1900       if node.name in instance.all_nodes:
1901         raise errors.OpPrereqError("Instance %s is still running on the node,"
1902                                    " please remove first." % instance_name)
1903     self.op.node_name = node.name
1904     self.node = node
1905
1906   def Exec(self, feedback_fn):
1907     """Removes the node from the cluster.
1908
1909     """
1910     node = self.node
1911     logging.info("Stopping the node daemon and removing configs from node %s",
1912                  node.name)
1913
1914     self.context.RemoveNode(node.name)
1915
1916     result = self.rpc.call_node_leave_cluster(node.name)
1917     msg = result.fail_msg
1918     if msg:
1919       self.LogWarning("Errors encountered on the remote node while leaving"
1920                       " the cluster: %s", msg)
1921
1922     # Promote nodes to master candidate as needed
1923     _AdjustCandidatePool(self)
1924
1925
1926 class LUQueryNodes(NoHooksLU):
1927   """Logical unit for querying nodes.
1928
1929   """
1930   _OP_REQP = ["output_fields", "names", "use_locking"]
1931   REQ_BGL = False
1932   _FIELDS_DYNAMIC = utils.FieldSet(
1933     "dtotal", "dfree",
1934     "mtotal", "mnode", "mfree",
1935     "bootid",
1936     "ctotal", "cnodes", "csockets",
1937     )
1938
1939   _FIELDS_STATIC = utils.FieldSet(
1940     "name", "pinst_cnt", "sinst_cnt",
1941     "pinst_list", "sinst_list",
1942     "pip", "sip", "tags",
1943     "serial_no",
1944     "master_candidate",
1945     "master",
1946     "offline",
1947     "drained",
1948     )
1949
1950   def ExpandNames(self):
1951     _CheckOutputFields(static=self._FIELDS_STATIC,
1952                        dynamic=self._FIELDS_DYNAMIC,
1953                        selected=self.op.output_fields)
1954
1955     self.needed_locks = {}
1956     self.share_locks[locking.LEVEL_NODE] = 1
1957
1958     if self.op.names:
1959       self.wanted = _GetWantedNodes(self, self.op.names)
1960     else:
1961       self.wanted = locking.ALL_SET
1962
1963     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1964     self.do_locking = self.do_node_query and self.op.use_locking
1965     if self.do_locking:
1966       # if we don't request only static fields, we need to lock the nodes
1967       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1968
1969
1970   def CheckPrereq(self):
1971     """Check prerequisites.
1972
1973     """
1974     # The validation of the node list is done in the _GetWantedNodes,
1975     # if non empty, and if empty, there's no validation to do
1976     pass
1977
1978   def Exec(self, feedback_fn):
1979     """Computes the list of nodes and their attributes.
1980
1981     """
1982     all_info = self.cfg.GetAllNodesInfo()
1983     if self.do_locking:
1984       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1985     elif self.wanted != locking.ALL_SET:
1986       nodenames = self.wanted
1987       missing = set(nodenames).difference(all_info.keys())
1988       if missing:
1989         raise errors.OpExecError(
1990           "Some nodes were removed before retrieving their data: %s" % missing)
1991     else:
1992       nodenames = all_info.keys()
1993
1994     nodenames = utils.NiceSort(nodenames)
1995     nodelist = [all_info[name] for name in nodenames]
1996
1997     # begin data gathering
1998
1999     if self.do_node_query:
2000       live_data = {}
2001       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2002                                           self.cfg.GetHypervisorType())
2003       for name in nodenames:
2004         nodeinfo = node_data[name]
2005         if not nodeinfo.fail_msg and nodeinfo.payload:
2006           nodeinfo = nodeinfo.payload
2007           fn = utils.TryConvert
2008           live_data[name] = {
2009             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2010             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2011             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2012             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2013             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2014             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2015             "bootid": nodeinfo.get('bootid', None),
2016             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2017             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2018             }
2019         else:
2020           live_data[name] = {}
2021     else:
2022       live_data = dict.fromkeys(nodenames, {})
2023
2024     node_to_primary = dict([(name, set()) for name in nodenames])
2025     node_to_secondary = dict([(name, set()) for name in nodenames])
2026
2027     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2028                              "sinst_cnt", "sinst_list"))
2029     if inst_fields & frozenset(self.op.output_fields):
2030       instancelist = self.cfg.GetInstanceList()
2031
2032       for instance_name in instancelist:
2033         inst = self.cfg.GetInstanceInfo(instance_name)
2034         if inst.primary_node in node_to_primary:
2035           node_to_primary[inst.primary_node].add(inst.name)
2036         for secnode in inst.secondary_nodes:
2037           if secnode in node_to_secondary:
2038             node_to_secondary[secnode].add(inst.name)
2039
2040     master_node = self.cfg.GetMasterNode()
2041
2042     # end data gathering
2043
2044     output = []
2045     for node in nodelist:
2046       node_output = []
2047       for field in self.op.output_fields:
2048         if field == "name":
2049           val = node.name
2050         elif field == "pinst_list":
2051           val = list(node_to_primary[node.name])
2052         elif field == "sinst_list":
2053           val = list(node_to_secondary[node.name])
2054         elif field == "pinst_cnt":
2055           val = len(node_to_primary[node.name])
2056         elif field == "sinst_cnt":
2057           val = len(node_to_secondary[node.name])
2058         elif field == "pip":
2059           val = node.primary_ip
2060         elif field == "sip":
2061           val = node.secondary_ip
2062         elif field == "tags":
2063           val = list(node.GetTags())
2064         elif field == "serial_no":
2065           val = node.serial_no
2066         elif field == "master_candidate":
2067           val = node.master_candidate
2068         elif field == "master":
2069           val = node.name == master_node
2070         elif field == "offline":
2071           val = node.offline
2072         elif field == "drained":
2073           val = node.drained
2074         elif self._FIELDS_DYNAMIC.Matches(field):
2075           val = live_data[node.name].get(field, None)
2076         else:
2077           raise errors.ParameterError(field)
2078         node_output.append(val)
2079       output.append(node_output)
2080
2081     return output
2082
2083
2084 class LUQueryNodeVolumes(NoHooksLU):
2085   """Logical unit for getting volumes on node(s).
2086
2087   """
2088   _OP_REQP = ["nodes", "output_fields"]
2089   REQ_BGL = False
2090   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2091   _FIELDS_STATIC = utils.FieldSet("node")
2092
2093   def ExpandNames(self):
2094     _CheckOutputFields(static=self._FIELDS_STATIC,
2095                        dynamic=self._FIELDS_DYNAMIC,
2096                        selected=self.op.output_fields)
2097
2098     self.needed_locks = {}
2099     self.share_locks[locking.LEVEL_NODE] = 1
2100     if not self.op.nodes:
2101       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2102     else:
2103       self.needed_locks[locking.LEVEL_NODE] = \
2104         _GetWantedNodes(self, self.op.nodes)
2105
2106   def CheckPrereq(self):
2107     """Check prerequisites.
2108
2109     This checks that the fields required are valid output fields.
2110
2111     """
2112     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2113
2114   def Exec(self, feedback_fn):
2115     """Computes the list of nodes and their attributes.
2116
2117     """
2118     nodenames = self.nodes
2119     volumes = self.rpc.call_node_volumes(nodenames)
2120
2121     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2122              in self.cfg.GetInstanceList()]
2123
2124     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2125
2126     output = []
2127     for node in nodenames:
2128       nresult = volumes[node]
2129       if nresult.offline:
2130         continue
2131       msg = nresult.fail_msg
2132       if msg:
2133         self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2134         continue
2135
2136       node_vols = nresult.payload[:]
2137       node_vols.sort(key=lambda vol: vol['dev'])
2138
2139       for vol in node_vols:
2140         node_output = []
2141         for field in self.op.output_fields:
2142           if field == "node":
2143             val = node
2144           elif field == "phys":
2145             val = vol['dev']
2146           elif field == "vg":
2147             val = vol['vg']
2148           elif field == "name":
2149             val = vol['name']
2150           elif field == "size":
2151             val = int(float(vol['size']))
2152           elif field == "instance":
2153             for inst in ilist:
2154               if node not in lv_by_node[inst]:
2155                 continue
2156               if vol['name'] in lv_by_node[inst][node]:
2157                 val = inst.name
2158                 break
2159             else:
2160               val = '-'
2161           else:
2162             raise errors.ParameterError(field)
2163           node_output.append(str(val))
2164
2165         output.append(node_output)
2166
2167     return output
2168
2169
2170 class LUAddNode(LogicalUnit):
2171   """Logical unit for adding node to the cluster.
2172
2173   """
2174   HPATH = "node-add"
2175   HTYPE = constants.HTYPE_NODE
2176   _OP_REQP = ["node_name"]
2177
2178   def BuildHooksEnv(self):
2179     """Build hooks env.
2180
2181     This will run on all nodes before, and on all nodes + the new node after.
2182
2183     """
2184     env = {
2185       "OP_TARGET": self.op.node_name,
2186       "NODE_NAME": self.op.node_name,
2187       "NODE_PIP": self.op.primary_ip,
2188       "NODE_SIP": self.op.secondary_ip,
2189       }
2190     nodes_0 = self.cfg.GetNodeList()
2191     nodes_1 = nodes_0 + [self.op.node_name, ]
2192     return env, nodes_0, nodes_1
2193
2194   def CheckPrereq(self):
2195     """Check prerequisites.
2196
2197     This checks:
2198      - the new node is not already in the config
2199      - it is resolvable
2200      - its parameters (single/dual homed) matches the cluster
2201
2202     Any errors are signalled by raising errors.OpPrereqError.
2203
2204     """
2205     node_name = self.op.node_name
2206     cfg = self.cfg
2207
2208     dns_data = utils.HostInfo(node_name)
2209
2210     node = dns_data.name
2211     primary_ip = self.op.primary_ip = dns_data.ip
2212     secondary_ip = getattr(self.op, "secondary_ip", None)
2213     if secondary_ip is None:
2214       secondary_ip = primary_ip
2215     if not utils.IsValidIP(secondary_ip):
2216       raise errors.OpPrereqError("Invalid secondary IP given")
2217     self.op.secondary_ip = secondary_ip
2218
2219     node_list = cfg.GetNodeList()
2220     if not self.op.readd and node in node_list:
2221       raise errors.OpPrereqError("Node %s is already in the configuration" %
2222                                  node)
2223     elif self.op.readd and node not in node_list:
2224       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2225
2226     for existing_node_name in node_list:
2227       existing_node = cfg.GetNodeInfo(existing_node_name)
2228
2229       if self.op.readd and node == existing_node_name:
2230         if (existing_node.primary_ip != primary_ip or
2231             existing_node.secondary_ip != secondary_ip):
2232           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2233                                      " address configuration as before")
2234         continue
2235
2236       if (existing_node.primary_ip == primary_ip or
2237           existing_node.secondary_ip == primary_ip or
2238           existing_node.primary_ip == secondary_ip or
2239           existing_node.secondary_ip == secondary_ip):
2240         raise errors.OpPrereqError("New node ip address(es) conflict with"
2241                                    " existing node %s" % existing_node.name)
2242
2243     # check that the type of the node (single versus dual homed) is the
2244     # same as for the master
2245     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2246     master_singlehomed = myself.secondary_ip == myself.primary_ip
2247     newbie_singlehomed = secondary_ip == primary_ip
2248     if master_singlehomed != newbie_singlehomed:
2249       if master_singlehomed:
2250         raise errors.OpPrereqError("The master has no private ip but the"
2251                                    " new node has one")
2252       else:
2253         raise errors.OpPrereqError("The master has a private ip but the"
2254                                    " new node doesn't have one")
2255
2256     # checks reachablity
2257     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2258       raise errors.OpPrereqError("Node not reachable by ping")
2259
2260     if not newbie_singlehomed:
2261       # check reachability from my secondary ip to newbie's secondary ip
2262       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2263                            source=myself.secondary_ip):
2264         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2265                                    " based ping to noded port")
2266
2267     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2268     mc_now, _ = self.cfg.GetMasterCandidateStats()
2269     master_candidate = mc_now < cp_size
2270
2271     self.new_node = objects.Node(name=node,
2272                                  primary_ip=primary_ip,
2273                                  secondary_ip=secondary_ip,
2274                                  master_candidate=master_candidate,
2275                                  offline=False, drained=False)
2276
2277   def Exec(self, feedback_fn):
2278     """Adds the new node to the cluster.
2279
2280     """
2281     new_node = self.new_node
2282     node = new_node.name
2283
2284     # check connectivity
2285     result = self.rpc.call_version([node])[node]
2286     result.Raise("Can't get version information from node %s" % node)
2287     if constants.PROTOCOL_VERSION == result.payload:
2288       logging.info("Communication to node %s fine, sw version %s match",
2289                    node, result.payload)
2290     else:
2291       raise errors.OpExecError("Version mismatch master version %s,"
2292                                " node version %s" %
2293                                (constants.PROTOCOL_VERSION, result.payload))
2294
2295     # setup ssh on node
2296     logging.info("Copy ssh key to node %s", node)
2297     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2298     keyarray = []
2299     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2300                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2301                 priv_key, pub_key]
2302
2303     for i in keyfiles:
2304       f = open(i, 'r')
2305       try:
2306         keyarray.append(f.read())
2307       finally:
2308         f.close()
2309
2310     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2311                                     keyarray[2],
2312                                     keyarray[3], keyarray[4], keyarray[5])
2313     result.Raise("Cannot transfer ssh keys to the new node")
2314
2315     # Add node to our /etc/hosts, and add key to known_hosts
2316     if self.cfg.GetClusterInfo().modify_etc_hosts:
2317       utils.AddHostToEtcHosts(new_node.name)
2318
2319     if new_node.secondary_ip != new_node.primary_ip:
2320       result = self.rpc.call_node_has_ip_address(new_node.name,
2321                                                  new_node.secondary_ip)
2322       result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2323                    prereq=True)
2324       if not result.payload:
2325         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2326                                  " you gave (%s). Please fix and re-run this"
2327                                  " command." % new_node.secondary_ip)
2328
2329     node_verify_list = [self.cfg.GetMasterNode()]
2330     node_verify_param = {
2331       'nodelist': [node],
2332       # TODO: do a node-net-test as well?
2333     }
2334
2335     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2336                                        self.cfg.GetClusterName())
2337     for verifier in node_verify_list:
2338       result[verifier].Raise("Cannot communicate with node %s" % verifier)
2339       nl_payload = result[verifier].payload['nodelist']
2340       if nl_payload:
2341         for failed in nl_payload:
2342           feedback_fn("ssh/hostname verification failed %s -> %s" %
2343                       (verifier, nl_payload[failed]))
2344         raise errors.OpExecError("ssh/hostname verification failed.")
2345
2346     if self.op.readd:
2347       _RedistributeAncillaryFiles(self)
2348       self.context.ReaddNode(new_node)
2349     else:
2350       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2351       self.context.AddNode(new_node)
2352
2353
2354 class LUSetNodeParams(LogicalUnit):
2355   """Modifies the parameters of a node.
2356
2357   """
2358   HPATH = "node-modify"
2359   HTYPE = constants.HTYPE_NODE
2360   _OP_REQP = ["node_name"]
2361   REQ_BGL = False
2362
2363   def CheckArguments(self):
2364     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2365     if node_name is None:
2366       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2367     self.op.node_name = node_name
2368     _CheckBooleanOpField(self.op, 'master_candidate')
2369     _CheckBooleanOpField(self.op, 'offline')
2370     _CheckBooleanOpField(self.op, 'drained')
2371     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2372     if all_mods.count(None) == 3:
2373       raise errors.OpPrereqError("Please pass at least one modification")
2374     if all_mods.count(True) > 1:
2375       raise errors.OpPrereqError("Can't set the node into more than one"
2376                                  " state at the same time")
2377
2378   def ExpandNames(self):
2379     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2380
2381   def BuildHooksEnv(self):
2382     """Build hooks env.
2383
2384     This runs on the master node.
2385
2386     """
2387     env = {
2388       "OP_TARGET": self.op.node_name,
2389       "MASTER_CANDIDATE": str(self.op.master_candidate),
2390       "OFFLINE": str(self.op.offline),
2391       "DRAINED": str(self.op.drained),
2392       }
2393     nl = [self.cfg.GetMasterNode(),
2394           self.op.node_name]
2395     return env, nl, nl
2396
2397   def CheckPrereq(self):
2398     """Check prerequisites.
2399
2400     This only checks the instance list against the existing names.
2401
2402     """
2403     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2404
2405     if ((self.op.master_candidate == False or self.op.offline == True or
2406          self.op.drained == True) and node.master_candidate):
2407       # we will demote the node from master_candidate
2408       if self.op.node_name == self.cfg.GetMasterNode():
2409         raise errors.OpPrereqError("The master node has to be a"
2410                                    " master candidate, online and not drained")
2411       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2412       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2413       if num_candidates <= cp_size:
2414         msg = ("Not enough master candidates (desired"
2415                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2416         if self.op.force:
2417           self.LogWarning(msg)
2418         else:
2419           raise errors.OpPrereqError(msg)
2420
2421     if (self.op.master_candidate == True and
2422         ((node.offline and not self.op.offline == False) or
2423          (node.drained and not self.op.drained == False))):
2424       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2425                                  " to master_candidate" % node.name)
2426
2427     return
2428
2429   def Exec(self, feedback_fn):
2430     """Modifies a node.
2431
2432     """
2433     node = self.node
2434
2435     result = []
2436     changed_mc = False
2437
2438     if self.op.offline is not None:
2439       node.offline = self.op.offline
2440       result.append(("offline", str(self.op.offline)))
2441       if self.op.offline == True:
2442         if node.master_candidate:
2443           node.master_candidate = False
2444           changed_mc = True
2445           result.append(("master_candidate", "auto-demotion due to offline"))
2446         if node.drained:
2447           node.drained = False
2448           result.append(("drained", "clear drained status due to offline"))
2449
2450     if self.op.master_candidate is not None:
2451       node.master_candidate = self.op.master_candidate
2452       changed_mc = True
2453       result.append(("master_candidate", str(self.op.master_candidate)))
2454       if self.op.master_candidate == False:
2455         rrc = self.rpc.call_node_demote_from_mc(node.name)
2456         msg = rrc.fail_msg
2457         if msg:
2458           self.LogWarning("Node failed to demote itself: %s" % msg)
2459
2460     if self.op.drained is not None:
2461       node.drained = self.op.drained
2462       result.append(("drained", str(self.op.drained)))
2463       if self.op.drained == True:
2464         if node.master_candidate:
2465           node.master_candidate = False
2466           changed_mc = True
2467           result.append(("master_candidate", "auto-demotion due to drain"))
2468         if node.offline:
2469           node.offline = False
2470           result.append(("offline", "clear offline status due to drain"))
2471
2472     # this will trigger configuration file update, if needed
2473     self.cfg.Update(node)
2474     # this will trigger job queue propagation or cleanup
2475     if changed_mc:
2476       self.context.ReaddNode(node)
2477
2478     return result
2479
2480
2481 class LUPowercycleNode(NoHooksLU):
2482   """Powercycles a node.
2483
2484   """
2485   _OP_REQP = ["node_name", "force"]
2486   REQ_BGL = False
2487
2488   def CheckArguments(self):
2489     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2490     if node_name is None:
2491       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2492     self.op.node_name = node_name
2493     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2494       raise errors.OpPrereqError("The node is the master and the force"
2495                                  " parameter was not set")
2496
2497   def ExpandNames(self):
2498     """Locking for PowercycleNode.
2499
2500     This is a last-resource option and shouldn't block on other
2501     jobs. Therefore, we grab no locks.
2502
2503     """
2504     self.needed_locks = {}
2505
2506   def CheckPrereq(self):
2507     """Check prerequisites.
2508
2509     This LU has no prereqs.
2510
2511     """
2512     pass
2513
2514   def Exec(self, feedback_fn):
2515     """Reboots a node.
2516
2517     """
2518     result = self.rpc.call_node_powercycle(self.op.node_name,
2519                                            self.cfg.GetHypervisorType())
2520     result.Raise("Failed to schedule the reboot")
2521     return result.payload
2522
2523
2524 class LUQueryClusterInfo(NoHooksLU):
2525   """Query cluster configuration.
2526
2527   """
2528   _OP_REQP = []
2529   REQ_BGL = False
2530
2531   def ExpandNames(self):
2532     self.needed_locks = {}
2533
2534   def CheckPrereq(self):
2535     """No prerequsites needed for this LU.
2536
2537     """
2538     pass
2539
2540   def Exec(self, feedback_fn):
2541     """Return cluster config.
2542
2543     """
2544     cluster = self.cfg.GetClusterInfo()
2545     result = {
2546       "software_version": constants.RELEASE_VERSION,
2547       "protocol_version": constants.PROTOCOL_VERSION,
2548       "config_version": constants.CONFIG_VERSION,
2549       "os_api_version": constants.OS_API_VERSION,
2550       "export_version": constants.EXPORT_VERSION,
2551       "architecture": (platform.architecture()[0], platform.machine()),
2552       "name": cluster.cluster_name,
2553       "master": cluster.master_node,
2554       "default_hypervisor": cluster.default_hypervisor,
2555       "enabled_hypervisors": cluster.enabled_hypervisors,
2556       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2557                         for hypervisor in cluster.enabled_hypervisors]),
2558       "beparams": cluster.beparams,
2559       "nicparams": cluster.nicparams,
2560       "candidate_pool_size": cluster.candidate_pool_size,
2561       "master_netdev": cluster.master_netdev,
2562       "volume_group_name": cluster.volume_group_name,
2563       "file_storage_dir": cluster.file_storage_dir,
2564       }
2565
2566     return result
2567
2568
2569 class LUQueryConfigValues(NoHooksLU):
2570   """Return configuration values.
2571
2572   """
2573   _OP_REQP = []
2574   REQ_BGL = False
2575   _FIELDS_DYNAMIC = utils.FieldSet()
2576   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2577
2578   def ExpandNames(self):
2579     self.needed_locks = {}
2580
2581     _CheckOutputFields(static=self._FIELDS_STATIC,
2582                        dynamic=self._FIELDS_DYNAMIC,
2583                        selected=self.op.output_fields)
2584
2585   def CheckPrereq(self):
2586     """No prerequisites.
2587
2588     """
2589     pass
2590
2591   def Exec(self, feedback_fn):
2592     """Dump a representation of the cluster config to the standard output.
2593
2594     """
2595     values = []
2596     for field in self.op.output_fields:
2597       if field == "cluster_name":
2598         entry = self.cfg.GetClusterName()
2599       elif field == "master_node":
2600         entry = self.cfg.GetMasterNode()
2601       elif field == "drain_flag":
2602         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2603       else:
2604         raise errors.ParameterError(field)
2605       values.append(entry)
2606     return values
2607
2608
2609 class LUActivateInstanceDisks(NoHooksLU):
2610   """Bring up an instance's disks.
2611
2612   """
2613   _OP_REQP = ["instance_name"]
2614   REQ_BGL = False
2615
2616   def ExpandNames(self):
2617     self._ExpandAndLockInstance()
2618     self.needed_locks[locking.LEVEL_NODE] = []
2619     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2620
2621   def DeclareLocks(self, level):
2622     if level == locking.LEVEL_NODE:
2623       self._LockInstancesNodes()
2624
2625   def CheckPrereq(self):
2626     """Check prerequisites.
2627
2628     This checks that the instance is in the cluster.
2629
2630     """
2631     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2632     assert self.instance is not None, \
2633       "Cannot retrieve locked instance %s" % self.op.instance_name
2634     _CheckNodeOnline(self, self.instance.primary_node)
2635
2636   def Exec(self, feedback_fn):
2637     """Activate the disks.
2638
2639     """
2640     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2641     if not disks_ok:
2642       raise errors.OpExecError("Cannot activate block devices")
2643
2644     return disks_info
2645
2646
2647 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2648   """Prepare the block devices for an instance.
2649
2650   This sets up the block devices on all nodes.
2651
2652   @type lu: L{LogicalUnit}
2653   @param lu: the logical unit on whose behalf we execute
2654   @type instance: L{objects.Instance}
2655   @param instance: the instance for whose disks we assemble
2656   @type ignore_secondaries: boolean
2657   @param ignore_secondaries: if true, errors on secondary nodes
2658       won't result in an error return from the function
2659   @return: False if the operation failed, otherwise a list of
2660       (host, instance_visible_name, node_visible_name)
2661       with the mapping from node devices to instance devices
2662
2663   """
2664   device_info = []
2665   disks_ok = True
2666   iname = instance.name
2667   # With the two passes mechanism we try to reduce the window of
2668   # opportunity for the race condition of switching DRBD to primary
2669   # before handshaking occured, but we do not eliminate it
2670
2671   # The proper fix would be to wait (with some limits) until the
2672   # connection has been made and drbd transitions from WFConnection
2673   # into any other network-connected state (Connected, SyncTarget,
2674   # SyncSource, etc.)
2675
2676   # 1st pass, assemble on all nodes in secondary mode
2677   for inst_disk in instance.disks:
2678     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2679       lu.cfg.SetDiskID(node_disk, node)
2680       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2681       msg = result.fail_msg
2682       if msg:
2683         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2684                            " (is_primary=False, pass=1): %s",
2685                            inst_disk.iv_name, node, msg)
2686         if not ignore_secondaries:
2687           disks_ok = False
2688
2689   # FIXME: race condition on drbd migration to primary
2690
2691   # 2nd pass, do only the primary node
2692   for inst_disk in instance.disks:
2693     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2694       if node != instance.primary_node:
2695         continue
2696       lu.cfg.SetDiskID(node_disk, node)
2697       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2698       msg = result.fail_msg
2699       if msg:
2700         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2701                            " (is_primary=True, pass=2): %s",
2702                            inst_disk.iv_name, node, msg)
2703         disks_ok = False
2704     device_info.append((instance.primary_node, inst_disk.iv_name,
2705                         result.payload))
2706
2707   # leave the disks configured for the primary node
2708   # this is a workaround that would be fixed better by
2709   # improving the logical/physical id handling
2710   for disk in instance.disks:
2711     lu.cfg.SetDiskID(disk, instance.primary_node)
2712
2713   return disks_ok, device_info
2714
2715
2716 def _StartInstanceDisks(lu, instance, force):
2717   """Start the disks of an instance.
2718
2719   """
2720   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2721                                            ignore_secondaries=force)
2722   if not disks_ok:
2723     _ShutdownInstanceDisks(lu, instance)
2724     if force is not None and not force:
2725       lu.proc.LogWarning("", hint="If the message above refers to a"
2726                          " secondary node,"
2727                          " you can retry the operation using '--force'.")
2728     raise errors.OpExecError("Disk consistency error")
2729
2730
2731 class LUDeactivateInstanceDisks(NoHooksLU):
2732   """Shutdown an instance's disks.
2733
2734   """
2735   _OP_REQP = ["instance_name"]
2736   REQ_BGL = False
2737
2738   def ExpandNames(self):
2739     self._ExpandAndLockInstance()
2740     self.needed_locks[locking.LEVEL_NODE] = []
2741     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2742
2743   def DeclareLocks(self, level):
2744     if level == locking.LEVEL_NODE:
2745       self._LockInstancesNodes()
2746
2747   def CheckPrereq(self):
2748     """Check prerequisites.
2749
2750     This checks that the instance is in the cluster.
2751
2752     """
2753     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2754     assert self.instance is not None, \
2755       "Cannot retrieve locked instance %s" % self.op.instance_name
2756
2757   def Exec(self, feedback_fn):
2758     """Deactivate the disks
2759
2760     """
2761     instance = self.instance
2762     _SafeShutdownInstanceDisks(self, instance)
2763
2764
2765 def _SafeShutdownInstanceDisks(lu, instance):
2766   """Shutdown block devices of an instance.
2767
2768   This function checks if an instance is running, before calling
2769   _ShutdownInstanceDisks.
2770
2771   """
2772   pnode = instance.primary_node
2773   ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2774   ins_l.Raise("Can't contact node %s" % pnode)
2775
2776   if instance.name in ins_l.payload:
2777     raise errors.OpExecError("Instance is running, can't shutdown"
2778                              " block devices.")
2779
2780   _ShutdownInstanceDisks(lu, instance)
2781
2782
2783 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2784   """Shutdown block devices of an instance.
2785
2786   This does the shutdown on all nodes of the instance.
2787
2788   If the ignore_primary is false, errors on the primary node are
2789   ignored.
2790
2791   """
2792   all_result = True
2793   for disk in instance.disks:
2794     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2795       lu.cfg.SetDiskID(top_disk, node)
2796       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2797       msg = result.fail_msg
2798       if msg:
2799         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2800                       disk.iv_name, node, msg)
2801         if not ignore_primary or node != instance.primary_node:
2802           all_result = False
2803   return all_result
2804
2805
2806 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2807   """Checks if a node has enough free memory.
2808
2809   This function check if a given node has the needed amount of free
2810   memory. In case the node has less memory or we cannot get the
2811   information from the node, this function raise an OpPrereqError
2812   exception.
2813
2814   @type lu: C{LogicalUnit}
2815   @param lu: a logical unit from which we get configuration data
2816   @type node: C{str}
2817   @param node: the node to check
2818   @type reason: C{str}
2819   @param reason: string to use in the error message
2820   @type requested: C{int}
2821   @param requested: the amount of memory in MiB to check for
2822   @type hypervisor_name: C{str}
2823   @param hypervisor_name: the hypervisor to ask for memory stats
2824   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2825       we cannot check the node
2826
2827   """
2828   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2829   nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2830   free_mem = nodeinfo[node].payload.get('memory_free', None)
2831   if not isinstance(free_mem, int):
2832     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2833                                " was '%s'" % (node, free_mem))
2834   if requested > free_mem:
2835     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2836                                " needed %s MiB, available %s MiB" %
2837                                (node, reason, requested, free_mem))
2838
2839
2840 class LUStartupInstance(LogicalUnit):
2841   """Starts an instance.
2842
2843   """
2844   HPATH = "instance-start"
2845   HTYPE = constants.HTYPE_INSTANCE
2846   _OP_REQP = ["instance_name", "force"]
2847   REQ_BGL = False
2848
2849   def ExpandNames(self):
2850     self._ExpandAndLockInstance()
2851
2852   def BuildHooksEnv(self):
2853     """Build hooks env.
2854
2855     This runs on master, primary and secondary nodes of the instance.
2856
2857     """
2858     env = {
2859       "FORCE": self.op.force,
2860       }
2861     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2862     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2863     return env, nl, nl
2864
2865   def CheckPrereq(self):
2866     """Check prerequisites.
2867
2868     This checks that the instance is in the cluster.
2869
2870     """
2871     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2872     assert self.instance is not None, \
2873       "Cannot retrieve locked instance %s" % self.op.instance_name
2874
2875     # extra beparams
2876     self.beparams = getattr(self.op, "beparams", {})
2877     if self.beparams:
2878       if not isinstance(self.beparams, dict):
2879         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2880                                    " dict" % (type(self.beparams), ))
2881       # fill the beparams dict
2882       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2883       self.op.beparams = self.beparams
2884
2885     # extra hvparams
2886     self.hvparams = getattr(self.op, "hvparams", {})
2887     if self.hvparams:
2888       if not isinstance(self.hvparams, dict):
2889         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2890                                    " dict" % (type(self.hvparams), ))
2891
2892       # check hypervisor parameter syntax (locally)
2893       cluster = self.cfg.GetClusterInfo()
2894       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2895       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2896                                     instance.hvparams)
2897       filled_hvp.update(self.hvparams)
2898       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2899       hv_type.CheckParameterSyntax(filled_hvp)
2900       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2901       self.op.hvparams = self.hvparams
2902
2903     _CheckNodeOnline(self, instance.primary_node)
2904
2905     bep = self.cfg.GetClusterInfo().FillBE(instance)
2906     # check bridges existance
2907     _CheckInstanceBridgesExist(self, instance)
2908
2909     remote_info = self.rpc.call_instance_info(instance.primary_node,
2910                                               instance.name,
2911                                               instance.hypervisor)
2912     remote_info.Raise("Error checking node %s" % instance.primary_node,
2913                       prereq=True)
2914     if not remote_info.payload: # not running already
2915       _CheckNodeFreeMemory(self, instance.primary_node,
2916                            "starting instance %s" % instance.name,
2917                            bep[constants.BE_MEMORY], instance.hypervisor)
2918
2919   def Exec(self, feedback_fn):
2920     """Start the instance.
2921
2922     """
2923     instance = self.instance
2924     force = self.op.force
2925
2926     self.cfg.MarkInstanceUp(instance.name)
2927
2928     node_current = instance.primary_node
2929
2930     _StartInstanceDisks(self, instance, force)
2931
2932     result = self.rpc.call_instance_start(node_current, instance,
2933                                           self.hvparams, self.beparams)
2934     msg = result.fail_msg
2935     if msg:
2936       _ShutdownInstanceDisks(self, instance)
2937       raise errors.OpExecError("Could not start instance: %s" % msg)
2938
2939
2940 class LURebootInstance(LogicalUnit):
2941   """Reboot an instance.
2942
2943   """
2944   HPATH = "instance-reboot"
2945   HTYPE = constants.HTYPE_INSTANCE
2946   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2947   REQ_BGL = False
2948
2949   def ExpandNames(self):
2950     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2951                                    constants.INSTANCE_REBOOT_HARD,
2952                                    constants.INSTANCE_REBOOT_FULL]:
2953       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2954                                   (constants.INSTANCE_REBOOT_SOFT,
2955                                    constants.INSTANCE_REBOOT_HARD,
2956                                    constants.INSTANCE_REBOOT_FULL))
2957     self._ExpandAndLockInstance()
2958
2959   def BuildHooksEnv(self):
2960     """Build hooks env.
2961
2962     This runs on master, primary and secondary nodes of the instance.
2963
2964     """
2965     env = {
2966       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2967       "REBOOT_TYPE": self.op.reboot_type,
2968       }
2969     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2970     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2971     return env, nl, nl
2972
2973   def CheckPrereq(self):
2974     """Check prerequisites.
2975
2976     This checks that the instance is in the cluster.
2977
2978     """
2979     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2980     assert self.instance is not None, \
2981       "Cannot retrieve locked instance %s" % self.op.instance_name
2982
2983     _CheckNodeOnline(self, instance.primary_node)
2984
2985     # check bridges existance
2986     _CheckInstanceBridgesExist(self, instance)
2987
2988   def Exec(self, feedback_fn):
2989     """Reboot the instance.
2990
2991     """
2992     instance = self.instance
2993     ignore_secondaries = self.op.ignore_secondaries
2994     reboot_type = self.op.reboot_type
2995
2996     node_current = instance.primary_node
2997
2998     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2999                        constants.INSTANCE_REBOOT_HARD]:
3000       for disk in instance.disks:
3001         self.cfg.SetDiskID(disk, node_current)
3002       result = self.rpc.call_instance_reboot(node_current, instance,
3003                                              reboot_type)
3004       result.Raise("Could not reboot instance")
3005     else:
3006       result = self.rpc.call_instance_shutdown(node_current, instance)
3007       result.Raise("Could not shutdown instance for full reboot")
3008       _ShutdownInstanceDisks(self, instance)
3009       _StartInstanceDisks(self, instance, ignore_secondaries)
3010       result = self.rpc.call_instance_start(node_current, instance, None, None)
3011       msg = result.fail_msg
3012       if msg:
3013         _ShutdownInstanceDisks(self, instance)
3014         raise errors.OpExecError("Could not start instance for"
3015                                  " full reboot: %s" % msg)
3016
3017     self.cfg.MarkInstanceUp(instance.name)
3018
3019
3020 class LUShutdownInstance(LogicalUnit):
3021   """Shutdown an instance.
3022
3023   """
3024   HPATH = "instance-stop"
3025   HTYPE = constants.HTYPE_INSTANCE
3026   _OP_REQP = ["instance_name"]
3027   REQ_BGL = False
3028
3029   def ExpandNames(self):
3030     self._ExpandAndLockInstance()
3031
3032   def BuildHooksEnv(self):
3033     """Build hooks env.
3034
3035     This runs on master, primary and secondary nodes of the instance.
3036
3037     """
3038     env = _BuildInstanceHookEnvByObject(self, self.instance)
3039     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3040     return env, nl, nl
3041
3042   def CheckPrereq(self):
3043     """Check prerequisites.
3044
3045     This checks that the instance is in the cluster.
3046
3047     """
3048     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3049     assert self.instance is not None, \
3050       "Cannot retrieve locked instance %s" % self.op.instance_name
3051     _CheckNodeOnline(self, self.instance.primary_node)
3052
3053   def Exec(self, feedback_fn):
3054     """Shutdown the instance.
3055
3056     """
3057     instance = self.instance
3058     node_current = instance.primary_node
3059     self.cfg.MarkInstanceDown(instance.name)
3060     result = self.rpc.call_instance_shutdown(node_current, instance)
3061     msg = result.fail_msg
3062     if msg:
3063       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3064
3065     _ShutdownInstanceDisks(self, instance)
3066
3067
3068 class LUReinstallInstance(LogicalUnit):
3069   """Reinstall an instance.
3070
3071   """
3072   HPATH = "instance-reinstall"
3073   HTYPE = constants.HTYPE_INSTANCE
3074   _OP_REQP = ["instance_name"]
3075   REQ_BGL = False
3076
3077   def ExpandNames(self):
3078     self._ExpandAndLockInstance()
3079
3080   def BuildHooksEnv(self):
3081     """Build hooks env.
3082
3083     This runs on master, primary and secondary nodes of the instance.
3084
3085     """
3086     env = _BuildInstanceHookEnvByObject(self, self.instance)
3087     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3088     return env, nl, nl
3089
3090   def CheckPrereq(self):
3091     """Check prerequisites.
3092
3093     This checks that the instance is in the cluster and is not running.
3094
3095     """
3096     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3097     assert instance is not None, \
3098       "Cannot retrieve locked instance %s" % self.op.instance_name
3099     _CheckNodeOnline(self, instance.primary_node)
3100
3101     if instance.disk_template == constants.DT_DISKLESS:
3102       raise errors.OpPrereqError("Instance '%s' has no disks" %
3103                                  self.op.instance_name)
3104     if instance.admin_up:
3105       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3106                                  self.op.instance_name)
3107     remote_info = self.rpc.call_instance_info(instance.primary_node,
3108                                               instance.name,
3109                                               instance.hypervisor)
3110     remote_info.Raise("Error checking node %s" % instance.primary_node,
3111                       prereq=True)
3112     if remote_info.payload:
3113       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3114                                  (self.op.instance_name,
3115                                   instance.primary_node))
3116
3117     self.op.os_type = getattr(self.op, "os_type", None)
3118     if self.op.os_type is not None:
3119       # OS verification
3120       pnode = self.cfg.GetNodeInfo(
3121         self.cfg.ExpandNodeName(instance.primary_node))
3122       if pnode is None:
3123         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3124                                    self.op.pnode)
3125       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3126       result.Raise("OS '%s' not in supported OS list for primary node %s" %
3127                    (self.op.os_type, pnode.name), prereq=True)
3128
3129     self.instance = instance
3130
3131   def Exec(self, feedback_fn):
3132     """Reinstall the instance.
3133
3134     """
3135     inst = self.instance
3136
3137     if self.op.os_type is not None:
3138       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3139       inst.os = self.op.os_type
3140       self.cfg.Update(inst)
3141
3142     _StartInstanceDisks(self, inst, None)
3143     try:
3144       feedback_fn("Running the instance OS create scripts...")
3145       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3146       result.Raise("Could not install OS for instance %s on node %s" %
3147                    (inst.name, inst.primary_node))
3148     finally:
3149       _ShutdownInstanceDisks(self, inst)
3150
3151
3152 class LURenameInstance(LogicalUnit):
3153   """Rename an instance.
3154
3155   """
3156   HPATH = "instance-rename"
3157   HTYPE = constants.HTYPE_INSTANCE
3158   _OP_REQP = ["instance_name", "new_name"]
3159
3160   def BuildHooksEnv(self):
3161     """Build hooks env.
3162
3163     This runs on master, primary and secondary nodes of the instance.
3164
3165     """
3166     env = _BuildInstanceHookEnvByObject(self, self.instance)
3167     env["INSTANCE_NEW_NAME"] = self.op.new_name
3168     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3169     return env, nl, nl
3170
3171   def CheckPrereq(self):
3172     """Check prerequisites.
3173
3174     This checks that the instance is in the cluster and is not running.
3175
3176     """
3177     instance = self.cfg.GetInstanceInfo(
3178       self.cfg.ExpandInstanceName(self.op.instance_name))
3179     if instance is None:
3180       raise errors.OpPrereqError("Instance '%s' not known" %
3181                                  self.op.instance_name)
3182     _CheckNodeOnline(self, instance.primary_node)
3183
3184     if instance.admin_up:
3185       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3186                                  self.op.instance_name)
3187     remote_info = self.rpc.call_instance_info(instance.primary_node,
3188                                               instance.name,
3189                                               instance.hypervisor)
3190     remote_info.Raise("Error checking node %s" % instance.primary_node,
3191                       prereq=True)
3192     if remote_info.payload:
3193       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3194                                  (self.op.instance_name,
3195                                   instance.primary_node))
3196     self.instance = instance
3197
3198     # new name verification
3199     name_info = utils.HostInfo(self.op.new_name)
3200
3201     self.op.new_name = new_name = name_info.name
3202     instance_list = self.cfg.GetInstanceList()
3203     if new_name in instance_list:
3204       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3205                                  new_name)
3206
3207     if not getattr(self.op, "ignore_ip", False):
3208       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3209         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3210                                    (name_info.ip, new_name))
3211
3212
3213   def Exec(self, feedback_fn):
3214     """Reinstall the instance.
3215
3216     """
3217     inst = self.instance
3218     old_name = inst.name
3219
3220     if inst.disk_template == constants.DT_FILE:
3221       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3222
3223     self.cfg.RenameInstance(inst.name, self.op.new_name)
3224     # Change the instance lock. This is definitely safe while we hold the BGL
3225     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3226     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3227
3228     # re-read the instance from the configuration after rename
3229     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3230
3231     if inst.disk_template == constants.DT_FILE:
3232       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3233       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3234                                                      old_file_storage_dir,
3235                                                      new_file_storage_dir)
3236       result.Raise("Could not rename on node %s directory '%s' to '%s'"
3237                    " (but the instance has been renamed in Ganeti)" %
3238                    (inst.primary_node, old_file_storage_dir,
3239                     new_file_storage_dir))
3240
3241     _StartInstanceDisks(self, inst, None)
3242     try:
3243       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3244                                                  old_name)
3245       msg = result.fail_msg
3246       if msg:
3247         msg = ("Could not run OS rename script for instance %s on node %s"
3248                " (but the instance has been renamed in Ganeti): %s" %
3249                (inst.name, inst.primary_node, msg))
3250         self.proc.LogWarning(msg)
3251     finally:
3252       _ShutdownInstanceDisks(self, inst)
3253
3254
3255 class LURemoveInstance(LogicalUnit):
3256   """Remove an instance.
3257
3258   """
3259   HPATH = "instance-remove"
3260   HTYPE = constants.HTYPE_INSTANCE
3261   _OP_REQP = ["instance_name", "ignore_failures"]
3262   REQ_BGL = False
3263
3264   def ExpandNames(self):
3265     self._ExpandAndLockInstance()
3266     self.needed_locks[locking.LEVEL_NODE] = []
3267     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3268
3269   def DeclareLocks(self, level):
3270     if level == locking.LEVEL_NODE:
3271       self._LockInstancesNodes()
3272
3273   def BuildHooksEnv(self):
3274     """Build hooks env.
3275
3276     This runs on master, primary and secondary nodes of the instance.
3277
3278     """
3279     env = _BuildInstanceHookEnvByObject(self, self.instance)
3280     nl = [self.cfg.GetMasterNode()]
3281     return env, nl, nl
3282
3283   def CheckPrereq(self):
3284     """Check prerequisites.
3285
3286     This checks that the instance is in the cluster.
3287
3288     """
3289     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3290     assert self.instance is not None, \
3291       "Cannot retrieve locked instance %s" % self.op.instance_name
3292
3293   def Exec(self, feedback_fn):
3294     """Remove the instance.
3295
3296     """
3297     instance = self.instance
3298     logging.info("Shutting down instance %s on node %s",
3299                  instance.name, instance.primary_node)
3300
3301     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3302     msg = result.fail_msg
3303     if msg:
3304       if self.op.ignore_failures:
3305         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3306       else:
3307         raise errors.OpExecError("Could not shutdown instance %s on"
3308                                  " node %s: %s" %
3309                                  (instance.name, instance.primary_node, msg))
3310
3311     logging.info("Removing block devices for instance %s", instance.name)
3312
3313     if not _RemoveDisks(self, instance):
3314       if self.op.ignore_failures:
3315         feedback_fn("Warning: can't remove instance's disks")
3316       else:
3317         raise errors.OpExecError("Can't remove instance's disks")
3318
3319     logging.info("Removing instance %s out of cluster config", instance.name)
3320
3321     self.cfg.RemoveInstance(instance.name)
3322     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3323
3324
3325 class LUQueryInstances(NoHooksLU):
3326   """Logical unit for querying instances.
3327
3328   """
3329   _OP_REQP = ["output_fields", "names", "use_locking"]
3330   REQ_BGL = False
3331   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3332                                     "admin_state",
3333                                     "disk_template", "ip", "mac", "bridge",
3334                                     "sda_size", "sdb_size", "vcpus", "tags",
3335                                     "network_port", "beparams",
3336                                     r"(disk)\.(size)/([0-9]+)",
3337                                     r"(disk)\.(sizes)", "disk_usage",
3338                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3339                                     r"(nic)\.(macs|ips|bridges)",
3340                                     r"(disk|nic)\.(count)",
3341                                     "serial_no", "hypervisor", "hvparams",] +
3342                                   ["hv/%s" % name
3343                                    for name in constants.HVS_PARAMETERS] +
3344                                   ["be/%s" % name
3345                                    for name in constants.BES_PARAMETERS])
3346   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3347
3348
3349   def ExpandNames(self):
3350     _CheckOutputFields(static=self._FIELDS_STATIC,
3351                        dynamic=self._FIELDS_DYNAMIC,
3352                        selected=self.op.output_fields)
3353
3354     self.needed_locks = {}
3355     self.share_locks[locking.LEVEL_INSTANCE] = 1
3356     self.share_locks[locking.LEVEL_NODE] = 1
3357
3358     if self.op.names:
3359       self.wanted = _GetWantedInstances(self, self.op.names)
3360     else:
3361       self.wanted = locking.ALL_SET
3362
3363     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3364     self.do_locking = self.do_node_query and self.op.use_locking
3365     if self.do_locking:
3366       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3367       self.needed_locks[locking.LEVEL_NODE] = []
3368       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3369
3370   def DeclareLocks(self, level):
3371     if level == locking.LEVEL_NODE and self.do_locking:
3372       self._LockInstancesNodes()
3373
3374   def CheckPrereq(self):
3375     """Check prerequisites.
3376
3377     """
3378     pass
3379
3380   def Exec(self, feedback_fn):
3381     """Computes the list of nodes and their attributes.
3382
3383     """
3384     all_info = self.cfg.GetAllInstancesInfo()
3385     if self.wanted == locking.ALL_SET:
3386       # caller didn't specify instance names, so ordering is not important
3387       if self.do_locking:
3388         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3389       else:
3390         instance_names = all_info.keys()
3391       instance_names = utils.NiceSort(instance_names)
3392     else:
3393       # caller did specify names, so we must keep the ordering
3394       if self.do_locking:
3395         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3396       else:
3397         tgt_set = all_info.keys()
3398       missing = set(self.wanted).difference(tgt_set)
3399       if missing:
3400         raise errors.OpExecError("Some instances were removed before"
3401                                  " retrieving their data: %s" % missing)
3402       instance_names = self.wanted
3403
3404     instance_list = [all_info[iname] for iname in instance_names]
3405
3406     # begin data gathering
3407
3408     nodes = frozenset([inst.primary_node for inst in instance_list])
3409     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3410
3411     bad_nodes = []
3412     off_nodes = []
3413     if self.do_node_query:
3414       live_data = {}
3415       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3416       for name in nodes:
3417         result = node_data[name]
3418         if result.offline:
3419           # offline nodes will be in both lists
3420           off_nodes.append(name)
3421         if result.failed or result.fail_msg:
3422           bad_nodes.append(name)
3423         else:
3424           if result.payload:
3425             live_data.update(result.payload)
3426           # else no instance is alive
3427     else:
3428       live_data = dict([(name, {}) for name in instance_names])
3429
3430     # end data gathering
3431
3432     HVPREFIX = "hv/"
3433     BEPREFIX = "be/"
3434     output = []
3435     for instance in instance_list:
3436       iout = []
3437       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3438       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3439       for field in self.op.output_fields:
3440         st_match = self._FIELDS_STATIC.Matches(field)
3441         if field == "name":
3442           val = instance.name
3443         elif field == "os":
3444           val = instance.os
3445         elif field == "pnode":
3446           val = instance.primary_node
3447         elif field == "snodes":
3448           val = list(instance.secondary_nodes)
3449         elif field == "admin_state":
3450           val = instance.admin_up
3451         elif field == "oper_state":
3452           if instance.primary_node in bad_nodes:
3453             val = None
3454           else:
3455             val = bool(live_data.get(instance.name))
3456         elif field == "status":
3457           if instance.primary_node in off_nodes:
3458             val = "ERROR_nodeoffline"
3459           elif instance.primary_node in bad_nodes:
3460             val = "ERROR_nodedown"
3461           else:
3462             running = bool(live_data.get(instance.name))
3463             if running:
3464               if instance.admin_up:
3465                 val = "running"
3466               else:
3467                 val = "ERROR_up"
3468             else:
3469               if instance.admin_up:
3470                 val = "ERROR_down"
3471               else:
3472                 val = "ADMIN_down"
3473         elif field == "oper_ram":
3474           if instance.primary_node in bad_nodes:
3475             val = None
3476           elif instance.name in live_data:
3477             val = live_data[instance.name].get("memory", "?")
3478           else:
3479             val = "-"
3480         elif field == "disk_template":
3481           val = instance.disk_template
3482         elif field == "ip":
3483           val = instance.nics[0].ip
3484         elif field == "bridge":
3485           val = instance.nics[0].bridge
3486         elif field == "mac":
3487           val = instance.nics[0].mac
3488         elif field == "sda_size" or field == "sdb_size":
3489           idx = ord(field[2]) - ord('a')
3490           try:
3491             val = instance.FindDisk(idx).size
3492           except errors.OpPrereqError:
3493             val = None
3494         elif field == "disk_usage": # total disk usage per node
3495           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3496           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3497         elif field == "tags":
3498           val = list(instance.GetTags())
3499         elif field == "serial_no":
3500           val = instance.serial_no
3501         elif field == "network_port":
3502           val = instance.network_port
3503         elif field == "hypervisor":
3504           val = instance.hypervisor
3505         elif field == "hvparams":
3506           val = i_hv
3507         elif (field.startswith(HVPREFIX) and
3508               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3509           val = i_hv.get(field[len(HVPREFIX):], None)
3510         elif field == "beparams":
3511           val = i_be
3512         elif (field.startswith(BEPREFIX) and
3513               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3514           val = i_be.get(field[len(BEPREFIX):], None)
3515         elif st_match and st_match.groups():
3516           # matches a variable list
3517           st_groups = st_match.groups()
3518           if st_groups and st_groups[0] == "disk":
3519             if st_groups[1] == "count":
3520               val = len(instance.disks)
3521             elif st_groups[1] == "sizes":
3522               val = [disk.size for disk in instance.disks]
3523             elif st_groups[1] == "size":
3524               try:
3525                 val = instance.FindDisk(st_groups[2]).size
3526               except errors.OpPrereqError:
3527                 val = None
3528             else:
3529               assert False, "Unhandled disk parameter"
3530           elif st_groups[0] == "nic":
3531             if st_groups[1] == "count":
3532               val = len(instance.nics)
3533             elif st_groups[1] == "macs":
3534               val = [nic.mac for nic in instance.nics]
3535             elif st_groups[1] == "ips":
3536               val = [nic.ip for nic in instance.nics]
3537             elif st_groups[1] == "bridges":
3538               val = [nic.bridge for nic in instance.nics]
3539             else:
3540               # index-based item
3541               nic_idx = int(st_groups[2])
3542               if nic_idx >= len(instance.nics):
3543                 val = None
3544               else:
3545                 if st_groups[1] == "mac":
3546                   val = instance.nics[nic_idx].mac
3547                 elif st_groups[1] == "ip":
3548                   val = instance.nics[nic_idx].ip
3549                 elif st_groups[1] == "bridge":
3550                   val = instance.nics[nic_idx].bridge
3551                 else:
3552                   assert False, "Unhandled NIC parameter"
3553           else:
3554             assert False, "Unhandled variable parameter"
3555         else:
3556           raise errors.ParameterError(field)
3557         iout.append(val)
3558       output.append(iout)
3559
3560     return output
3561
3562
3563 class LUFailoverInstance(LogicalUnit):
3564   """Failover an instance.
3565
3566   """
3567   HPATH = "instance-failover"
3568   HTYPE = constants.HTYPE_INSTANCE
3569   _OP_REQP = ["instance_name", "ignore_consistency"]
3570   REQ_BGL = False
3571
3572   def ExpandNames(self):
3573     self._ExpandAndLockInstance()
3574     self.needed_locks[locking.LEVEL_NODE] = []
3575     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3576
3577   def DeclareLocks(self, level):
3578     if level == locking.LEVEL_NODE:
3579       self._LockInstancesNodes()
3580
3581   def BuildHooksEnv(self):
3582     """Build hooks env.
3583
3584     This runs on master, primary and secondary nodes of the instance.
3585
3586     """
3587     env = {
3588       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3589       }
3590     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3591     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3592     return env, nl, nl
3593
3594   def CheckPrereq(self):
3595     """Check prerequisites.
3596
3597     This checks that the instance is in the cluster.
3598
3599     """
3600     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3601     assert self.instance is not None, \
3602       "Cannot retrieve locked instance %s" % self.op.instance_name
3603
3604     bep = self.cfg.GetClusterInfo().FillBE(instance)
3605     if instance.disk_template not in constants.DTS_NET_MIRROR:
3606       raise errors.OpPrereqError("Instance's disk layout is not"
3607                                  " network mirrored, cannot failover.")
3608
3609     secondary_nodes = instance.secondary_nodes
3610     if not secondary_nodes:
3611       raise errors.ProgrammerError("no secondary node but using "
3612                                    "a mirrored disk template")
3613
3614     target_node = secondary_nodes[0]
3615     _CheckNodeOnline(self, target_node)
3616     _CheckNodeNotDrained(self, target_node)
3617     # check memory requirements on the secondary node
3618     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3619                          instance.name, bep[constants.BE_MEMORY],
3620                          instance.hypervisor)
3621     # check bridge existance
3622     _CheckInstanceBridgesExist(self, instance, node=target_node)
3623
3624   def Exec(self, feedback_fn):
3625     """Failover an instance.
3626
3627     The failover is done by shutting it down on its present node and
3628     starting it on the secondary.
3629
3630     """
3631     instance = self.instance
3632
3633     source_node = instance.primary_node
3634     target_node = instance.secondary_nodes[0]
3635
3636     feedback_fn("* checking disk consistency between source and target")
3637     for dev in instance.disks:
3638       # for drbd, these are drbd over lvm
3639       if not _CheckDiskConsistency(self, dev, target_node, False):
3640         if instance.admin_up and not self.op.ignore_consistency:
3641           raise errors.OpExecError("Disk %s is degraded on target node,"
3642                                    " aborting failover." % dev.iv_name)
3643
3644     feedback_fn("* shutting down instance on source node")
3645     logging.info("Shutting down instance %s on node %s",
3646                  instance.name, source_node)
3647
3648     result = self.rpc.call_instance_shutdown(source_node, instance)
3649     msg = result.fail_msg
3650     if msg:
3651       if self.op.ignore_consistency:
3652         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3653                              " Proceeding anyway. Please make sure node"
3654                              " %s is down. Error details: %s",
3655                              instance.name, source_node, source_node, msg)
3656       else:
3657         raise errors.OpExecError("Could not shutdown instance %s on"
3658                                  " node %s: %s" %
3659                                  (instance.name, source_node, msg))
3660
3661     feedback_fn("* deactivating the instance's disks on source node")
3662     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3663       raise errors.OpExecError("Can't shut down the instance's disks.")
3664
3665     instance.primary_node = target_node
3666     # distribute new instance config to the other nodes
3667     self.cfg.Update(instance)
3668
3669     # Only start the instance if it's marked as up
3670     if instance.admin_up:
3671       feedback_fn("* activating the instance's disks on target node")
3672       logging.info("Starting instance %s on node %s",
3673                    instance.name, target_node)
3674
3675       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3676                                                ignore_secondaries=True)
3677       if not disks_ok:
3678         _ShutdownInstanceDisks(self, instance)
3679         raise errors.OpExecError("Can't activate the instance's disks")
3680
3681       feedback_fn("* starting the instance on the target node")
3682       result = self.rpc.call_instance_start(target_node, instance, None, None)
3683       msg = result.fail_msg
3684       if msg:
3685         _ShutdownInstanceDisks(self, instance)
3686         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3687                                  (instance.name, target_node, msg))
3688
3689
3690 class LUMigrateInstance(LogicalUnit):
3691   """Migrate an instance.
3692
3693   This is migration without shutting down, compared to the failover,
3694   which is done with shutdown.
3695
3696   """
3697   HPATH = "instance-migrate"
3698   HTYPE = constants.HTYPE_INSTANCE
3699   _OP_REQP = ["instance_name", "live", "cleanup"]
3700
3701   REQ_BGL = False
3702
3703   def ExpandNames(self):
3704     self._ExpandAndLockInstance()
3705     self.needed_locks[locking.LEVEL_NODE] = []
3706     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3707
3708   def DeclareLocks(self, level):
3709     if level == locking.LEVEL_NODE:
3710       self._LockInstancesNodes()
3711
3712   def BuildHooksEnv(self):
3713     """Build hooks env.
3714
3715     This runs on master, primary and secondary nodes of the instance.
3716
3717     """
3718     env = _BuildInstanceHookEnvByObject(self, self.instance)
3719     env["MIGRATE_LIVE"] = self.op.live
3720     env["MIGRATE_CLEANUP"] = self.op.cleanup
3721     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3722     return env, nl, nl
3723
3724   def CheckPrereq(self):
3725     """Check prerequisites.
3726
3727     This checks that the instance is in the cluster.
3728
3729     """
3730     instance = self.cfg.GetInstanceInfo(
3731       self.cfg.ExpandInstanceName(self.op.instance_name))
3732     if instance is None:
3733       raise errors.OpPrereqError("Instance '%s' not known" %
3734                                  self.op.instance_name)
3735
3736     if instance.disk_template != constants.DT_DRBD8:
3737       raise errors.OpPrereqError("Instance's disk layout is not"
3738                                  " drbd8, cannot migrate.")
3739
3740     secondary_nodes = instance.secondary_nodes
3741     if not secondary_nodes:
3742       raise errors.ConfigurationError("No secondary node but using"
3743                                       " drbd8 disk template")
3744
3745     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3746
3747     target_node = secondary_nodes[0]
3748     # check memory requirements on the secondary node
3749     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3750                          instance.name, i_be[constants.BE_MEMORY],
3751                          instance.hypervisor)
3752
3753     # check bridge existance
3754     _CheckInstanceBridgesExist(self, instance, node=target_node)
3755
3756     if not self.op.cleanup:
3757       _CheckNodeNotDrained(self, target_node)
3758       result = self.rpc.call_instance_migratable(instance.primary_node,
3759                                                  instance)
3760       result.Raise("Can't migrate, please use failover", prereq=True)
3761
3762     self.instance = instance
3763
3764   def _WaitUntilSync(self):
3765     """Poll with custom rpc for disk sync.
3766
3767     This uses our own step-based rpc call.
3768
3769     """
3770     self.feedback_fn("* wait until resync is done")
3771     all_done = False
3772     while not all_done:
3773       all_done = True
3774       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3775                                             self.nodes_ip,
3776                                             self.instance.disks)
3777       min_percent = 100
3778       for node, nres in result.items():
3779         nres.Raise("Cannot resync disks on node %s" % node)
3780         node_done, node_percent = nres.payload
3781         all_done = all_done and node_done
3782         if node_percent is not None:
3783           min_percent = min(min_percent, node_percent)
3784       if not all_done:
3785         if min_percent < 100:
3786           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3787         time.sleep(2)
3788
3789   def _EnsureSecondary(self, node):
3790     """Demote a node to secondary.
3791
3792     """
3793     self.feedback_fn("* switching node %s to secondary mode" % node)
3794
3795     for dev in self.instance.disks:
3796       self.cfg.SetDiskID(dev, node)
3797
3798     result = self.rpc.call_blockdev_close(node, self.instance.name,
3799                                           self.instance.disks)
3800     result.Raise("Cannot change disk to secondary on node %s" % node)
3801
3802   def _GoStandalone(self):
3803     """Disconnect from the network.
3804
3805     """
3806     self.feedback_fn("* changing into standalone mode")
3807     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3808                                                self.instance.disks)
3809     for node, nres in result.items():
3810       nres.Raise("Cannot disconnect disks node %s" % node)
3811
3812   def _GoReconnect(self, multimaster):
3813     """Reconnect to the network.
3814
3815     """
3816     if multimaster:
3817       msg = "dual-master"
3818     else:
3819       msg = "single-master"
3820     self.feedback_fn("* changing disks into %s mode" % msg)
3821     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3822                                            self.instance.disks,
3823                                            self.instance.name, multimaster)
3824     for node, nres in result.items():
3825       nres.Raise("Cannot change disks config on node %s" % node)
3826
3827   def _ExecCleanup(self):
3828     """Try to cleanup after a failed migration.
3829
3830     The cleanup is done by:
3831       - check that the instance is running only on one node
3832         (and update the config if needed)
3833       - change disks on its secondary node to secondary
3834       - wait until disks are fully synchronized
3835       - disconnect from the network
3836       - change disks into single-master mode
3837       - wait again until disks are fully synchronized
3838
3839     """
3840     instance = self.instance
3841     target_node = self.target_node
3842     source_node = self.source_node
3843
3844     # check running on only one node
3845     self.feedback_fn("* checking where the instance actually runs"
3846                      " (if this hangs, the hypervisor might be in"
3847                      " a bad state)")
3848     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3849     for node, result in ins_l.items():
3850       result.Raise("Can't contact node %s" % node)
3851
3852     runningon_source = instance.name in ins_l[source_node].payload
3853     runningon_target = instance.name in ins_l[target_node].payload
3854
3855     if runningon_source and runningon_target:
3856       raise errors.OpExecError("Instance seems to be running on two nodes,"
3857                                " or the hypervisor is confused. You will have"
3858                                " to ensure manually that it runs only on one"
3859                                " and restart this operation.")
3860
3861     if not (runningon_source or runningon_target):
3862       raise errors.OpExecError("Instance does not seem to be running at all."
3863                                " In this case, it's safer to repair by"
3864                                " running 'gnt-instance stop' to ensure disk"
3865                                " shutdown, and then restarting it.")
3866
3867     if runningon_target:
3868       # the migration has actually succeeded, we need to update the config
3869       self.feedback_fn("* instance running on secondary node (%s),"
3870                        " updating config" % target_node)
3871       instance.primary_node = target_node
3872       self.cfg.Update(instance)
3873       demoted_node = source_node
3874     else:
3875       self.feedback_fn("* instance confirmed to be running on its"
3876                        " primary node (%s)" % source_node)
3877       demoted_node = target_node
3878
3879     self._EnsureSecondary(demoted_node)
3880     try:
3881       self._WaitUntilSync()
3882     except errors.OpExecError:
3883       # we ignore here errors, since if the device is standalone, it
3884       # won't be able to sync
3885       pass
3886     self._GoStandalone()
3887     self._GoReconnect(False)
3888     self._WaitUntilSync()
3889
3890     self.feedback_fn("* done")
3891
3892   def _RevertDiskStatus(self):
3893     """Try to revert the disk status after a failed migration.
3894
3895     """
3896     target_node = self.target_node
3897     try:
3898       self._EnsureSecondary(target_node)
3899       self._GoStandalone()
3900       self._GoReconnect(False)
3901       self._WaitUntilSync()
3902     except errors.OpExecError, err:
3903       self.LogWarning("Migration failed and I can't reconnect the"
3904                       " drives: error '%s'\n"
3905                       "Please look and recover the instance status" %
3906                       str(err))
3907
3908   def _AbortMigration(self):
3909     """Call the hypervisor code to abort a started migration.
3910
3911     """
3912     instance = self.instance
3913     target_node = self.target_node
3914     migration_info = self.migration_info
3915
3916     abort_result = self.rpc.call_finalize_migration(target_node,
3917                                                     instance,
3918                                                     migration_info,
3919                                                     False)
3920     abort_msg = abort_result.fail_msg
3921     if abort_msg:
3922       logging.error("Aborting migration failed on target node %s: %s" %
3923                     (target_node, abort_msg))
3924       # Don't raise an exception here, as we stil have to try to revert the
3925       # disk status, even if this step failed.
3926
3927   def _ExecMigration(self):
3928     """Migrate an instance.
3929
3930     The migrate is done by:
3931       - change the disks into dual-master mode
3932       - wait until disks are fully synchronized again
3933       - migrate the instance
3934       - change disks on the new secondary node (the old primary) to secondary
3935       - wait until disks are fully synchronized
3936       - change disks into single-master mode
3937
3938     """
3939     instance = self.instance
3940     target_node = self.target_node
3941     source_node = self.source_node
3942
3943     self.feedback_fn("* checking disk consistency between source and target")
3944     for dev in instance.disks:
3945       if not _CheckDiskConsistency(self, dev, target_node, False):
3946         raise errors.OpExecError("Disk %s is degraded or not fully"
3947                                  " synchronized on target node,"
3948                                  " aborting migrate." % dev.iv_name)
3949
3950     # First get the migration information from the remote node
3951     result = self.rpc.call_migration_info(source_node, instance)
3952     msg = result.fail_msg
3953     if msg:
3954       log_err = ("Failed fetching source migration information from %s: %s" %
3955                  (source_node, msg))
3956       logging.error(log_err)
3957       raise errors.OpExecError(log_err)
3958
3959     self.migration_info = migration_info = result.payload
3960
3961     # Then switch the disks to master/master mode
3962     self._EnsureSecondary(target_node)
3963     self._GoStandalone()
3964     self._GoReconnect(True)
3965     self._WaitUntilSync()
3966
3967     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3968     result = self.rpc.call_accept_instance(target_node,
3969                                            instance,
3970                                            migration_info,
3971                                            self.nodes_ip[target_node])
3972
3973     msg = result.fail_msg
3974     if msg:
3975       logging.error("Instance pre-migration failed, trying to revert"
3976                     " disk status: %s", msg)
3977       self._AbortMigration()
3978       self._RevertDiskStatus()
3979       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3980                                (instance.name, msg))
3981
3982     self.feedback_fn("* migrating instance to %s" % target_node)
3983     time.sleep(10)
3984     result = self.rpc.call_instance_migrate(source_node, instance,
3985                                             self.nodes_ip[target_node],
3986                                             self.op.live)
3987     msg = result.fail_msg
3988     if msg:
3989       logging.error("Instance migration failed, trying to revert"
3990                     " disk status: %s", msg)
3991       self._AbortMigration()
3992       self._RevertDiskStatus()
3993       raise errors.OpExecError("Could not migrate instance %s: %s" %
3994                                (instance.name, msg))
3995     time.sleep(10)
3996
3997     instance.primary_node = target_node
3998     # distribute new instance config to the other nodes
3999     self.cfg.Update(instance)
4000
4001     result = self.rpc.call_finalize_migration(target_node,
4002                                               instance,
4003                                               migration_info,
4004                                               True)
4005     msg = result.fail_msg
4006     if msg:
4007       logging.error("Instance migration succeeded, but finalization failed:"
4008                     " %s" % msg)
4009       raise errors.OpExecError("Could not finalize instance migration: %s" %
4010                                msg)
4011
4012     self._EnsureSecondary(source_node)
4013     self._WaitUntilSync()
4014     self._GoStandalone()
4015     self._GoReconnect(False)
4016     self._WaitUntilSync()
4017
4018     self.feedback_fn("* done")
4019
4020   def Exec(self, feedback_fn):
4021     """Perform the migration.
4022
4023     """
4024     self.feedback_fn = feedback_fn
4025
4026     self.source_node = self.instance.primary_node
4027     self.target_node = self.instance.secondary_nodes[0]
4028     self.all_nodes = [self.source_node, self.target_node]
4029     self.nodes_ip = {
4030       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4031       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4032       }
4033     if self.op.cleanup:
4034       return self._ExecCleanup()
4035     else:
4036       return self._ExecMigration()
4037
4038
4039 def _CreateBlockDev(lu, node, instance, device, force_create,
4040                     info, force_open):
4041   """Create a tree of block devices on a given node.
4042
4043   If this device type has to be created on secondaries, create it and
4044   all its children.
4045
4046   If not, just recurse to children keeping the same 'force' value.
4047
4048   @param lu: the lu on whose behalf we execute
4049   @param node: the node on which to create the device
4050   @type instance: L{objects.Instance}
4051   @param instance: the instance which owns the device
4052   @type device: L{objects.Disk}
4053   @param device: the device to create
4054   @type force_create: boolean
4055   @param force_create: whether to force creation of this device; this
4056       will be change to True whenever we find a device which has
4057       CreateOnSecondary() attribute
4058   @param info: the extra 'metadata' we should attach to the device
4059       (this will be represented as a LVM tag)
4060   @type force_open: boolean
4061   @param force_open: this parameter will be passes to the
4062       L{backend.BlockdevCreate} function where it specifies
4063       whether we run on primary or not, and it affects both
4064       the child assembly and the device own Open() execution
4065
4066   """
4067   if device.CreateOnSecondary():
4068     force_create = True
4069
4070   if device.children:
4071     for child in device.children:
4072       _CreateBlockDev(lu, node, instance, child, force_create,
4073                       info, force_open)
4074
4075   if not force_create:
4076     return
4077
4078   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4079
4080
4081 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4082   """Create a single block device on a given node.
4083
4084   This will not recurse over children of the device, so they must be
4085   created in advance.
4086
4087   @param lu: the lu on whose behalf we execute
4088   @param node: the node on which to create the device
4089   @type instance: L{objects.Instance}
4090   @param instance: the instance which owns the device
4091   @type device: L{objects.Disk}
4092   @param device: the device to create
4093   @param info: the extra 'metadata' we should attach to the device
4094       (this will be represented as a LVM tag)
4095   @type force_open: boolean
4096   @param force_open: this parameter will be passes to the
4097       L{backend.BlockdevCreate} function where it specifies
4098       whether we run on primary or not, and it affects both
4099       the child assembly and the device own Open() execution
4100
4101   """
4102   lu.cfg.SetDiskID(device, node)
4103   result = lu.rpc.call_blockdev_create(node, device, device.size,
4104                                        instance.name, force_open, info)
4105   result.Raise("Can't create block device %s on"
4106                " node %s for instance %s" % (device, node, instance.name))
4107   if device.physical_id is None:
4108     device.physical_id = result.payload
4109
4110
4111 def _GenerateUniqueNames(lu, exts):
4112   """Generate a suitable LV name.
4113
4114   This will generate a logical volume name for the given instance.
4115
4116   """
4117   results = []
4118   for val in exts:
4119     new_id = lu.cfg.GenerateUniqueID()
4120     results.append("%s%s" % (new_id, val))
4121   return results
4122
4123
4124 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4125                          p_minor, s_minor):
4126   """Generate a drbd8 device complete with its children.
4127
4128   """
4129   port = lu.cfg.AllocatePort()
4130   vgname = lu.cfg.GetVGName()
4131   shared_secret = lu.cfg.GenerateDRBDSecret()
4132   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4133                           logical_id=(vgname, names[0]))
4134   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4135                           logical_id=(vgname, names[1]))
4136   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4137                           logical_id=(primary, secondary, port,
4138                                       p_minor, s_minor,
4139                                       shared_secret),
4140                           children=[dev_data, dev_meta],
4141                           iv_name=iv_name)
4142   return drbd_dev
4143
4144
4145 def _GenerateDiskTemplate(lu, template_name,
4146                           instance_name, primary_node,
4147                           secondary_nodes, disk_info,
4148                           file_storage_dir, file_driver,
4149                           base_index):
4150   """Generate the entire disk layout for a given template type.
4151
4152   """
4153   #TODO: compute space requirements
4154
4155   vgname = lu.cfg.GetVGName()
4156   disk_count = len(disk_info)
4157   disks = []
4158   if template_name == constants.DT_DISKLESS:
4159     pass
4160   elif template_name == constants.DT_PLAIN:
4161     if len(secondary_nodes) != 0:
4162       raise errors.ProgrammerError("Wrong template configuration")
4163
4164     names = _GenerateUniqueNames(lu, [".disk%d" % i
4165                                       for i in range(disk_count)])
4166     for idx, disk in enumerate(disk_info):
4167       disk_index = idx + base_index
4168       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4169                               logical_id=(vgname, names[idx]),
4170                               iv_name="disk/%d" % disk_index,
4171                               mode=disk["mode"])
4172       disks.append(disk_dev)
4173   elif template_name == constants.DT_DRBD8:
4174     if len(secondary_nodes) != 1:
4175       raise errors.ProgrammerError("Wrong template configuration")
4176     remote_node = secondary_nodes[0]
4177     minors = lu.cfg.AllocateDRBDMinor(
4178       [primary_node, remote_node] * len(disk_info), instance_name)
4179
4180     names = []
4181     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4182                                                for i in range(disk_count)]):
4183       names.append(lv_prefix + "_data")
4184       names.append(lv_prefix + "_meta")
4185     for idx, disk in enumerate(disk_info):
4186       disk_index = idx + base_index
4187       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4188                                       disk["size"], names[idx*2:idx*2+2],
4189                                       "disk/%d" % disk_index,
4190                                       minors[idx*2], minors[idx*2+1])
4191       disk_dev.mode = disk["mode"]
4192       disks.append(disk_dev)
4193   elif template_name == constants.DT_FILE:
4194     if len(secondary_nodes) != 0:
4195       raise errors.ProgrammerError("Wrong template configuration")
4196
4197     for idx, disk in enumerate(disk_info):
4198       disk_index = idx + base_index
4199       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4200                               iv_name="disk/%d" % disk_index,
4201                               logical_id=(file_driver,
4202                                           "%s/disk%d" % (file_storage_dir,
4203                                                          disk_index)),
4204                               mode=disk["mode"])
4205       disks.append(disk_dev)
4206   else:
4207     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4208   return disks
4209
4210
4211 def _GetInstanceInfoText(instance):
4212   """Compute that text that should be added to the disk's metadata.
4213
4214   """
4215   return "originstname+%s" % instance.name
4216
4217
4218 def _CreateDisks(lu, instance):
4219   """Create all disks for an instance.
4220
4221   This abstracts away some work from AddInstance.
4222
4223   @type lu: L{LogicalUnit}
4224   @param lu: the logical unit on whose behalf we execute
4225   @type instance: L{objects.Instance}
4226   @param instance: the instance whose disks we should create
4227   @rtype: boolean
4228   @return: the success of the creation
4229
4230   """
4231   info = _GetInstanceInfoText(instance)
4232   pnode = instance.primary_node
4233
4234   if instance.disk_template == constants.DT_FILE:
4235     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4236     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4237
4238     result.Raise("Failed to create directory '%s' on"
4239                  " node %s: %s" % (file_storage_dir, pnode))
4240
4241   # Note: this needs to be kept in sync with adding of disks in
4242   # LUSetInstanceParams
4243   for device in instance.disks:
4244     logging.info("Creating volume %s for instance %s",
4245                  device.iv_name, instance.name)
4246     #HARDCODE
4247     for node in instance.all_nodes:
4248       f_create = node == pnode
4249       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4250
4251
4252 def _RemoveDisks(lu, instance):
4253   """Remove all disks for an instance.
4254
4255   This abstracts away some work from `AddInstance()` and
4256   `RemoveInstance()`. Note that in case some of the devices couldn't
4257   be removed, the removal will continue with the other ones (compare
4258   with `_CreateDisks()`).
4259
4260   @type lu: L{LogicalUnit}
4261   @param lu: the logical unit on whose behalf we execute
4262   @type instance: L{objects.Instance}
4263   @param instance: the instance whose disks we should remove
4264   @rtype: boolean
4265   @return: the success of the removal
4266
4267   """
4268   logging.info("Removing block devices for instance %s", instance.name)
4269
4270   all_result = True
4271   for device in instance.disks:
4272     for node, disk in device.ComputeNodeTree(instance.primary_node):
4273       lu.cfg.SetDiskID(disk, node)
4274       msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4275       if msg:
4276         lu.LogWarning("Could not remove block device %s on node %s,"
4277                       " continuing anyway: %s", device.iv_name, node, msg)
4278         all_result = False
4279
4280   if instance.disk_template == constants.DT_FILE:
4281     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4282     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4283                                                  file_storage_dir)
4284     msg = result.fail_msg
4285     if msg:
4286       lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4287                     file_storage_dir, instance.primary_node, msg)
4288       all_result = False
4289
4290   return all_result
4291
4292
4293 def _ComputeDiskSize(disk_template, disks):
4294   """Compute disk size requirements in the volume group
4295
4296   """
4297   # Required free disk space as a function of disk and swap space
4298   req_size_dict = {
4299     constants.DT_DISKLESS: None,
4300     constants.DT_PLAIN: sum(d["size"] for d in disks),
4301     # 128 MB are added for drbd metadata for each disk
4302     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4303     constants.DT_FILE: None,
4304   }
4305
4306   if disk_template not in req_size_dict:
4307     raise errors.ProgrammerError("Disk template '%s' size requirement"
4308                                  " is unknown" %  disk_template)
4309
4310   return req_size_dict[disk_template]
4311
4312
4313 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4314   """Hypervisor parameter validation.
4315
4316   This function abstract the hypervisor parameter validation to be
4317   used in both instance create and instance modify.
4318
4319   @type lu: L{LogicalUnit}
4320   @param lu: the logical unit for which we check
4321   @type nodenames: list
4322   @param nodenames: the list of nodes on which we should check
4323   @type hvname: string
4324   @param hvname: the name of the hypervisor we should use
4325   @type hvparams: dict
4326   @param hvparams: the parameters which we need to check
4327   @raise errors.OpPrereqError: if the parameters are not valid
4328
4329   """
4330   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4331                                                   hvname,
4332                                                   hvparams)
4333   for node in nodenames:
4334     info = hvinfo[node]
4335     if info.offline:
4336       continue
4337     info.Raise("Hypervisor parameter validation failed on node %s" % node)
4338
4339
4340 class LUCreateInstance(LogicalUnit):
4341   """Create an instance.
4342
4343   """
4344   HPATH = "instance-add"
4345   HTYPE = constants.HTYPE_INSTANCE
4346   _OP_REQP = ["instance_name", "disks", "disk_template",
4347               "mode", "start",
4348               "wait_for_sync", "ip_check", "nics",
4349               "hvparams", "beparams"]
4350   REQ_BGL = False
4351
4352   def _ExpandNode(self, node):
4353     """Expands and checks one node name.
4354
4355     """
4356     node_full = self.cfg.ExpandNodeName(node)
4357     if node_full is None:
4358       raise errors.OpPrereqError("Unknown node %s" % node)
4359     return node_full
4360
4361   def ExpandNames(self):
4362     """ExpandNames for CreateInstance.
4363
4364     Figure out the right locks for instance creation.
4365
4366     """
4367     self.needed_locks = {}
4368
4369     # set optional parameters to none if they don't exist
4370     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4371       if not hasattr(self.op, attr):
4372         setattr(self.op, attr, None)
4373
4374     # cheap checks, mostly valid constants given
4375
4376     # verify creation mode
4377     if self.op.mode not in (constants.INSTANCE_CREATE,
4378                             constants.INSTANCE_IMPORT):
4379       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4380                                  self.op.mode)
4381
4382     # disk template and mirror node verification
4383     if self.op.disk_template not in constants.DISK_TEMPLATES:
4384       raise errors.OpPrereqError("Invalid disk template name")
4385
4386     if self.op.hypervisor is None:
4387       self.op.hypervisor = self.cfg.GetHypervisorType()
4388
4389     cluster = self.cfg.GetClusterInfo()
4390     enabled_hvs = cluster.enabled_hypervisors
4391     if self.op.hypervisor not in enabled_hvs:
4392       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4393                                  " cluster (%s)" % (self.op.hypervisor,
4394                                   ",".join(enabled_hvs)))
4395
4396     # check hypervisor parameter syntax (locally)
4397     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4398     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4399                                   self.op.hvparams)
4400     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4401     hv_type.CheckParameterSyntax(filled_hvp)
4402
4403     # fill and remember the beparams dict
4404     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4405     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4406                                     self.op.beparams)
4407
4408     #### instance parameters check
4409
4410     # instance name verification
4411     hostname1 = utils.HostInfo(self.op.instance_name)
4412     self.op.instance_name = instance_name = hostname1.name
4413
4414     # this is just a preventive check, but someone might still add this
4415     # instance in the meantime, and creation will fail at lock-add time
4416     if instance_name in self.cfg.GetInstanceList():
4417       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4418                                  instance_name)
4419
4420     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4421
4422     # NIC buildup
4423     self.nics = []
4424     for idx, nic in enumerate(self.op.nics):
4425       nic_mode_req = nic.get("mode", None)
4426       nic_mode = nic_mode_req
4427       if nic_mode is None:
4428         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4429
4430       # in routed mode, for the first nic, the default ip is 'auto'
4431       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4432         default_ip_mode = constants.VALUE_AUTO
4433       else:
4434         default_ip_mode = constants.VALUE_NONE
4435
4436       # ip validity checks
4437       ip = nic.get("ip", default_ip_mode)
4438       if ip is None or ip.lower() == constants.VALUE_NONE:
4439         nic_ip = None
4440       elif ip.lower() == constants.VALUE_AUTO:
4441         nic_ip = hostname1.ip
4442       else:
4443         if not utils.IsValidIP(ip):
4444           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4445                                      " like a valid IP" % ip)
4446         nic_ip = ip
4447
4448       # TODO: check the ip for uniqueness !!
4449       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4450         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4451
4452       # MAC address verification
4453       mac = nic.get("mac", constants.VALUE_AUTO)
4454       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4455         if not utils.IsValidMac(mac.lower()):
4456           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4457                                      mac)
4458       # bridge verification
4459       bridge = nic.get("bridge", None)
4460       link = nic.get("link", None)
4461       if bridge and link:
4462         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4463       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4464         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4465       elif bridge:
4466         link = bridge
4467
4468       nicparams = {}
4469       if nic_mode_req:
4470         nicparams[constants.NIC_MODE] = nic_mode_req
4471       if link:
4472         nicparams[constants.NIC_LINK] = link
4473
4474       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4475                                       nicparams)
4476       objects.NIC.CheckParameterSyntax(check_params)
4477       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4478
4479     # disk checks/pre-build
4480     self.disks = []
4481     for disk in self.op.disks:
4482       mode = disk.get("mode", constants.DISK_RDWR)
4483       if mode not in constants.DISK_ACCESS_SET:
4484         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4485                                    mode)
4486       size = disk.get("size", None)
4487       if size is None:
4488         raise errors.OpPrereqError("Missing disk size")
4489       try:
4490         size = int(size)
4491       except ValueError:
4492         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4493       self.disks.append({"size": size, "mode": mode})
4494
4495     # used in CheckPrereq for ip ping check
4496     self.check_ip = hostname1.ip
4497
4498     # file storage checks
4499     if (self.op.file_driver and
4500         not self.op.file_driver in constants.FILE_DRIVER):
4501       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4502                                  self.op.file_driver)
4503
4504     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4505       raise errors.OpPrereqError("File storage directory path not absolute")
4506
4507     ### Node/iallocator related checks
4508     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4509       raise errors.OpPrereqError("One and only one of iallocator and primary"
4510                                  " node must be given")
4511
4512     if self.op.iallocator:
4513       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4514     else:
4515       self.op.pnode = self._ExpandNode(self.op.pnode)
4516       nodelist = [self.op.pnode]
4517       if self.op.snode is not None:
4518         self.op.snode = self._ExpandNode(self.op.snode)
4519         nodelist.append(self.op.snode)
4520       self.needed_locks[locking.LEVEL_NODE] = nodelist
4521
4522     # in case of import lock the source node too
4523     if self.op.mode == constants.INSTANCE_IMPORT:
4524       src_node = getattr(self.op, "src_node", None)
4525       src_path = getattr(self.op, "src_path", None)
4526
4527       if src_path is None:
4528         self.op.src_path = src_path = self.op.instance_name
4529
4530       if src_node is None:
4531         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4532         self.op.src_node = None
4533         if os.path.isabs(src_path):
4534           raise errors.OpPrereqError("Importing an instance from an absolute"
4535                                      " path requires a source node option.")
4536       else:
4537         self.op.src_node = src_node = self._ExpandNode(src_node)
4538         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4539           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4540         if not os.path.isabs(src_path):
4541           self.op.src_path = src_path = \
4542             os.path.join(constants.EXPORT_DIR, src_path)
4543
4544     else: # INSTANCE_CREATE
4545       if getattr(self.op, "os_type", None) is None:
4546         raise errors.OpPrereqError("No guest OS specified")
4547
4548   def _RunAllocator(self):
4549     """Run the allocator based on input opcode.
4550
4551     """
4552     nics = [n.ToDict() for n in self.nics]
4553     ial = IAllocator(self,
4554                      mode=constants.IALLOCATOR_MODE_ALLOC,
4555                      name=self.op.instance_name,
4556                      disk_template=self.op.disk_template,
4557                      tags=[],
4558                      os=self.op.os_type,
4559                      vcpus=self.be_full[constants.BE_VCPUS],
4560                      mem_size=self.be_full[constants.BE_MEMORY],
4561                      disks=self.disks,
4562                      nics=nics,
4563                      hypervisor=self.op.hypervisor,
4564                      )
4565
4566     ial.Run(self.op.iallocator)
4567
4568     if not ial.success:
4569       raise errors.OpPrereqError("Can't compute nodes using"
4570                                  " iallocator '%s': %s" % (self.op.iallocator,
4571                                                            ial.info))
4572     if len(ial.nodes) != ial.required_nodes:
4573       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4574                                  " of nodes (%s), required %s" %
4575                                  (self.op.iallocator, len(ial.nodes),
4576                                   ial.required_nodes))
4577     self.op.pnode = ial.nodes[0]
4578     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4579                  self.op.instance_name, self.op.iallocator,
4580                  ", ".join(ial.nodes))
4581     if ial.required_nodes == 2:
4582       self.op.snode = ial.nodes[1]
4583
4584   def BuildHooksEnv(self):
4585     """Build hooks env.
4586
4587     This runs on master, primary and secondary nodes of the instance.
4588
4589     """
4590     env = {
4591       "ADD_MODE": self.op.mode,
4592       }
4593     if self.op.mode == constants.INSTANCE_IMPORT:
4594       env["SRC_NODE"] = self.op.src_node
4595       env["SRC_PATH"] = self.op.src_path
4596       env["SRC_IMAGES"] = self.src_images
4597
4598     env.update(_BuildInstanceHookEnv(
4599       name=self.op.instance_name,
4600       primary_node=self.op.pnode,
4601       secondary_nodes=self.secondaries,
4602       status=self.op.start,
4603       os_type=self.op.os_type,
4604       memory=self.be_full[constants.BE_MEMORY],
4605       vcpus=self.be_full[constants.BE_VCPUS],
4606       nics=_PreBuildNICHooksList(self, self.nics),
4607       disk_template=self.op.disk_template,
4608       disks=[(d["size"], d["mode"]) for d in self.disks],
4609     ))
4610
4611     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4612           self.secondaries)
4613     return env, nl, nl
4614
4615
4616   def CheckPrereq(self):
4617     """Check prerequisites.
4618
4619     """
4620     if (not self.cfg.GetVGName() and
4621         self.op.disk_template not in constants.DTS_NOT_LVM):
4622       raise errors.OpPrereqError("Cluster does not support lvm-based"
4623                                  " instances")
4624
4625     if self.op.mode == constants.INSTANCE_IMPORT:
4626       src_node = self.op.src_node
4627       src_path = self.op.src_path
4628
4629       if src_node is None:
4630         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4631         exp_list = self.rpc.call_export_list(locked_nodes)
4632         found = False
4633         for node in exp_list:
4634           if exp_list[node].fail_msg:
4635             continue
4636           if src_path in exp_list[node].payload:
4637             found = True
4638             self.op.src_node = src_node = node
4639             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4640                                                        src_path)
4641             break
4642         if not found:
4643           raise errors.OpPrereqError("No export found for relative path %s" %
4644                                       src_path)
4645
4646       _CheckNodeOnline(self, src_node)
4647       result = self.rpc.call_export_info(src_node, src_path)
4648       result.Raise("No export or invalid export found in dir %s" % src_path)
4649
4650       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4651       if not export_info.has_section(constants.INISECT_EXP):
4652         raise errors.ProgrammerError("Corrupted export config")
4653
4654       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4655       if (int(ei_version) != constants.EXPORT_VERSION):
4656         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4657                                    (ei_version, constants.EXPORT_VERSION))
4658
4659       # Check that the new instance doesn't have less disks than the export
4660       instance_disks = len(self.disks)
4661       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4662       if instance_disks < export_disks:
4663         raise errors.OpPrereqError("Not enough disks to import."
4664                                    " (instance: %d, export: %d)" %
4665                                    (instance_disks, export_disks))
4666
4667       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4668       disk_images = []
4669       for idx in range(export_disks):
4670         option = 'disk%d_dump' % idx
4671         if export_info.has_option(constants.INISECT_INS, option):
4672           # FIXME: are the old os-es, disk sizes, etc. useful?
4673           export_name = export_info.get(constants.INISECT_INS, option)
4674           image = os.path.join(src_path, export_name)
4675           disk_images.append(image)
4676         else:
4677           disk_images.append(False)
4678
4679       self.src_images = disk_images
4680
4681       old_name = export_info.get(constants.INISECT_INS, 'name')
4682       # FIXME: int() here could throw a ValueError on broken exports
4683       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4684       if self.op.instance_name == old_name:
4685         for idx, nic in enumerate(self.nics):
4686           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4687             nic_mac_ini = 'nic%d_mac' % idx
4688             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4689
4690     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4691     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4692     if self.op.start and not self.op.ip_check:
4693       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4694                                  " adding an instance in start mode")
4695
4696     if self.op.ip_check:
4697       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4698         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4699                                    (self.check_ip, self.op.instance_name))
4700
4701     #### mac address generation
4702     # By generating here the mac address both the allocator and the hooks get
4703     # the real final mac address rather than the 'auto' or 'generate' value.
4704     # There is a race condition between the generation and the instance object
4705     # creation, which means that we know the mac is valid now, but we're not
4706     # sure it will be when we actually add the instance. If things go bad
4707     # adding the instance will abort because of a duplicate mac, and the
4708     # creation job will fail.
4709     for nic in self.nics:
4710       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4711         nic.mac = self.cfg.GenerateMAC()
4712
4713     #### allocator run
4714
4715     if self.op.iallocator is not None:
4716       self._RunAllocator()
4717
4718     #### node related checks
4719
4720     # check primary node
4721     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4722     assert self.pnode is not None, \
4723       "Cannot retrieve locked node %s" % self.op.pnode
4724     if pnode.offline:
4725       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4726                                  pnode.name)
4727     if pnode.drained:
4728       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4729                                  pnode.name)
4730
4731     self.secondaries = []
4732
4733     # mirror node verification
4734     if self.op.disk_template in constants.DTS_NET_MIRROR:
4735       if self.op.snode is None:
4736         raise errors.OpPrereqError("The networked disk templates need"
4737                                    " a mirror node")
4738       if self.op.snode == pnode.name:
4739         raise errors.OpPrereqError("The secondary node cannot be"
4740                                    " the primary node.")
4741       _CheckNodeOnline(self, self.op.snode)
4742       _CheckNodeNotDrained(self, self.op.snode)
4743       self.secondaries.append(self.op.snode)
4744
4745     nodenames = [pnode.name] + self.secondaries
4746
4747     req_size = _ComputeDiskSize(self.op.disk_template,
4748                                 self.disks)
4749
4750     # Check lv size requirements
4751     if req_size is not None:
4752       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4753                                          self.op.hypervisor)
4754       for node in nodenames:
4755         info = nodeinfo[node]
4756         info.Raise("Cannot get current information from node %s" % node)
4757         info = info.payload
4758         vg_free = info.get('vg_free', None)
4759         if not isinstance(vg_free, int):
4760           raise errors.OpPrereqError("Can't compute free disk space on"
4761                                      " node %s" % node)
4762         if req_size > vg_free:
4763           raise errors.OpPrereqError("Not enough disk space on target node %s."
4764                                      " %d MB available, %d MB required" %
4765                                      (node, vg_free, req_size))
4766
4767     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4768
4769     # os verification
4770     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4771     result.Raise("OS '%s' not in supported os list for primary node %s" %
4772                  (self.op.os_type, pnode.name), prereq=True)
4773
4774     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4775
4776     # memory check on primary node
4777     if self.op.start:
4778       _CheckNodeFreeMemory(self, self.pnode.name,
4779                            "creating instance %s" % self.op.instance_name,
4780                            self.be_full[constants.BE_MEMORY],
4781                            self.op.hypervisor)
4782
4783   def Exec(self, feedback_fn):
4784     """Create and add the instance to the cluster.
4785
4786     """
4787     instance = self.op.instance_name
4788     pnode_name = self.pnode.name
4789
4790     ht_kind = self.op.hypervisor
4791     if ht_kind in constants.HTS_REQ_PORT:
4792       network_port = self.cfg.AllocatePort()
4793     else:
4794       network_port = None
4795
4796     ##if self.op.vnc_bind_address is None:
4797     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4798
4799     # this is needed because os.path.join does not accept None arguments
4800     if self.op.file_storage_dir is None:
4801       string_file_storage_dir = ""
4802     else:
4803       string_file_storage_dir = self.op.file_storage_dir
4804
4805     # build the full file storage dir path
4806     file_storage_dir = os.path.normpath(os.path.join(
4807                                         self.cfg.GetFileStorageDir(),
4808                                         string_file_storage_dir, instance))
4809
4810
4811     disks = _GenerateDiskTemplate(self,
4812                                   self.op.disk_template,
4813                                   instance, pnode_name,
4814                                   self.secondaries,
4815                                   self.disks,
4816                                   file_storage_dir,
4817                                   self.op.file_driver,
4818                                   0)
4819
4820     iobj = objects.Instance(name=instance, os=self.op.os_type,
4821                             primary_node=pnode_name,
4822                             nics=self.nics, disks=disks,
4823                             disk_template=self.op.disk_template,
4824                             admin_up=False,
4825                             network_port=network_port,
4826                             beparams=self.op.beparams,
4827                             hvparams=self.op.hvparams,
4828                             hypervisor=self.op.hypervisor,
4829                             )
4830
4831     feedback_fn("* creating instance disks...")
4832     try:
4833       _CreateDisks(self, iobj)
4834     except errors.OpExecError:
4835       self.LogWarning("Device creation failed, reverting...")
4836       try:
4837         _RemoveDisks(self, iobj)
4838       finally:
4839         self.cfg.ReleaseDRBDMinors(instance)
4840         raise
4841
4842     feedback_fn("adding instance %s to cluster config" % instance)
4843
4844     self.cfg.AddInstance(iobj)
4845     # Declare that we don't want to remove the instance lock anymore, as we've
4846     # added the instance to the config
4847     del self.remove_locks[locking.LEVEL_INSTANCE]
4848     # Unlock all the nodes
4849     if self.op.mode == constants.INSTANCE_IMPORT:
4850       nodes_keep = [self.op.src_node]
4851       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4852                        if node != self.op.src_node]
4853       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4854       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4855     else:
4856       self.context.glm.release(locking.LEVEL_NODE)
4857       del self.acquired_locks[locking.LEVEL_NODE]
4858
4859     if self.op.wait_for_sync:
4860       disk_abort = not _WaitForSync(self, iobj)
4861     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4862       # make sure the disks are not degraded (still sync-ing is ok)
4863       time.sleep(15)
4864       feedback_fn("* checking mirrors status")
4865       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4866     else:
4867       disk_abort = False
4868
4869     if disk_abort:
4870       _RemoveDisks(self, iobj)
4871       self.cfg.RemoveInstance(iobj.name)
4872       # Make sure the instance lock gets removed
4873       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4874       raise errors.OpExecError("There are some degraded disks for"
4875                                " this instance")
4876
4877     feedback_fn("creating os for instance %s on node %s" %
4878                 (instance, pnode_name))
4879
4880     if iobj.disk_template != constants.DT_DISKLESS:
4881       if self.op.mode == constants.INSTANCE_CREATE:
4882         feedback_fn("* running the instance OS create scripts...")
4883         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4884         result.Raise("Could not add os for instance %s"
4885                      " on node %s" % (instance, pnode_name))
4886
4887       elif self.op.mode == constants.INSTANCE_IMPORT:
4888         feedback_fn("* running the instance OS import scripts...")
4889         src_node = self.op.src_node
4890         src_images = self.src_images
4891         cluster_name = self.cfg.GetClusterName()
4892         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4893                                                          src_node, src_images,
4894                                                          cluster_name)
4895         msg = import_result.fail_msg
4896         if msg:
4897           self.LogWarning("Error while importing the disk images for instance"
4898                           " %s on node %s: %s" % (instance, pnode_name, msg))
4899       else:
4900         # also checked in the prereq part
4901         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4902                                      % self.op.mode)
4903
4904     if self.op.start:
4905       iobj.admin_up = True
4906       self.cfg.Update(iobj)
4907       logging.info("Starting instance %s on node %s", instance, pnode_name)
4908       feedback_fn("* starting instance...")
4909       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4910       result.Raise("Could not start instance")
4911
4912
4913 class LUConnectConsole(NoHooksLU):
4914   """Connect to an instance's console.
4915
4916   This is somewhat special in that it returns the command line that
4917   you need to run on the master node in order to connect to the
4918   console.
4919
4920   """
4921   _OP_REQP = ["instance_name"]
4922   REQ_BGL = False
4923
4924   def ExpandNames(self):
4925     self._ExpandAndLockInstance()
4926
4927   def CheckPrereq(self):
4928     """Check prerequisites.
4929
4930     This checks that the instance is in the cluster.
4931
4932     """
4933     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4934     assert self.instance is not None, \
4935       "Cannot retrieve locked instance %s" % self.op.instance_name
4936     _CheckNodeOnline(self, self.instance.primary_node)
4937
4938   def Exec(self, feedback_fn):
4939     """Connect to the console of an instance
4940
4941     """
4942     instance = self.instance
4943     node = instance.primary_node
4944
4945     node_insts = self.rpc.call_instance_list([node],
4946                                              [instance.hypervisor])[node]
4947     node_insts.Raise("Can't get node information from %s" % node)
4948
4949     if instance.name not in node_insts.payload:
4950       raise errors.OpExecError("Instance %s is not running." % instance.name)
4951
4952     logging.debug("Connecting to console of %s on %s", instance.name, node)
4953
4954     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4955     cluster = self.cfg.GetClusterInfo()
4956     # beparams and hvparams are passed separately, to avoid editing the
4957     # instance and then saving the defaults in the instance itself.
4958     hvparams = cluster.FillHV(instance)
4959     beparams = cluster.FillBE(instance)
4960     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4961
4962     # build ssh cmdline
4963     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4964
4965
4966 class LUReplaceDisks(LogicalUnit):
4967   """Replace the disks of an instance.
4968
4969   """
4970   HPATH = "mirrors-replace"
4971   HTYPE = constants.HTYPE_INSTANCE
4972   _OP_REQP = ["instance_name", "mode", "disks"]
4973   REQ_BGL = False
4974
4975   def CheckArguments(self):
4976     if not hasattr(self.op, "remote_node"):
4977       self.op.remote_node = None
4978     if not hasattr(self.op, "iallocator"):
4979       self.op.iallocator = None
4980
4981     # check for valid parameter combination
4982     cnt = [self.op.remote_node, self.op.iallocator].count(None)
4983     if self.op.mode == constants.REPLACE_DISK_CHG:
4984       if cnt == 2:
4985         raise errors.OpPrereqError("When changing the secondary either an"
4986                                    " iallocator script must be used or the"
4987                                    " new node given")
4988       elif cnt == 0:
4989         raise errors.OpPrereqError("Give either the iallocator or the new"
4990                                    " secondary, not both")
4991     else: # not replacing the secondary
4992       if cnt != 2:
4993         raise errors.OpPrereqError("The iallocator and new node options can"
4994                                    " be used only when changing the"
4995                                    " secondary node")
4996
4997   def ExpandNames(self):
4998     self._ExpandAndLockInstance()
4999
5000     if self.op.iallocator is not None:
5001       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5002     elif self.op.remote_node is not None:
5003       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5004       if remote_node is None:
5005         raise errors.OpPrereqError("Node '%s' not known" %
5006                                    self.op.remote_node)
5007       self.op.remote_node = remote_node
5008       # Warning: do not remove the locking of the new secondary here
5009       # unless DRBD8.AddChildren is changed to work in parallel;
5010       # currently it doesn't since parallel invocations of
5011       # FindUnusedMinor will conflict
5012       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5013       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5014     else:
5015       self.needed_locks[locking.LEVEL_NODE] = []
5016       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5017
5018   def DeclareLocks(self, level):
5019     # If we're not already locking all nodes in the set we have to declare the
5020     # instance's primary/secondary nodes.
5021     if (level == locking.LEVEL_NODE and
5022         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5023       self._LockInstancesNodes()
5024
5025   def _RunAllocator(self):
5026     """Compute a new secondary node using an IAllocator.
5027
5028     """
5029     ial = IAllocator(self,
5030                      mode=constants.IALLOCATOR_MODE_RELOC,
5031                      name=self.op.instance_name,
5032                      relocate_from=[self.sec_node])
5033
5034     ial.Run(self.op.iallocator)
5035
5036     if not ial.success:
5037       raise errors.OpPrereqError("Can't compute nodes using"
5038                                  " iallocator '%s': %s" % (self.op.iallocator,
5039                                                            ial.info))
5040     if len(ial.nodes) != ial.required_nodes:
5041       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5042                                  " of nodes (%s), required %s" %
5043                                  (len(ial.nodes), ial.required_nodes))
5044     self.op.remote_node = ial.nodes[0]
5045     self.LogInfo("Selected new secondary for the instance: %s",
5046                  self.op.remote_node)
5047
5048   def BuildHooksEnv(self):
5049     """Build hooks env.
5050
5051     This runs on the master, the primary and all the secondaries.
5052
5053     """
5054     env = {
5055       "MODE": self.op.mode,
5056       "NEW_SECONDARY": self.op.remote_node,
5057       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5058       }
5059     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5060     nl = [
5061       self.cfg.GetMasterNode(),
5062       self.instance.primary_node,
5063       ]
5064     if self.op.remote_node is not None:
5065       nl.append(self.op.remote_node)
5066     return env, nl, nl
5067
5068   def CheckPrereq(self):
5069     """Check prerequisites.
5070
5071     This checks that the instance is in the cluster.
5072
5073     """
5074     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5075     assert instance is not None, \
5076       "Cannot retrieve locked instance %s" % self.op.instance_name
5077     self.instance = instance
5078
5079     if instance.disk_template != constants.DT_DRBD8:
5080       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5081                                  " instances")
5082
5083     if len(instance.secondary_nodes) != 1:
5084       raise errors.OpPrereqError("The instance has a strange layout,"
5085                                  " expected one secondary but found %d" %
5086                                  len(instance.secondary_nodes))
5087
5088     self.sec_node = instance.secondary_nodes[0]
5089
5090     if self.op.iallocator is not None:
5091       self._RunAllocator()
5092
5093     remote_node = self.op.remote_node
5094     if remote_node is not None:
5095       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5096       assert self.remote_node_info is not None, \
5097         "Cannot retrieve locked node %s" % remote_node
5098     else:
5099       self.remote_node_info = None
5100     if remote_node == instance.primary_node:
5101       raise errors.OpPrereqError("The specified node is the primary node of"
5102                                  " the instance.")
5103     elif remote_node == self.sec_node:
5104       raise errors.OpPrereqError("The specified node is already the"
5105                                  " secondary node of the instance.")
5106
5107     if self.op.mode == constants.REPLACE_DISK_PRI:
5108       n1 = self.tgt_node = instance.primary_node
5109       n2 = self.oth_node = self.sec_node
5110     elif self.op.mode == constants.REPLACE_DISK_SEC:
5111       n1 = self.tgt_node = self.sec_node
5112       n2 = self.oth_node = instance.primary_node
5113     elif self.op.mode == constants.REPLACE_DISK_CHG:
5114       n1 = self.new_node = remote_node
5115       n2 = self.oth_node = instance.primary_node
5116       self.tgt_node = self.sec_node
5117       _CheckNodeNotDrained(self, remote_node)
5118     else:
5119       raise errors.ProgrammerError("Unhandled disk replace mode")
5120
5121     _CheckNodeOnline(self, n1)
5122     _CheckNodeOnline(self, n2)
5123
5124     if not self.op.disks:
5125       self.op.disks = range(len(instance.disks))
5126
5127     for disk_idx in self.op.disks:
5128       instance.FindDisk(disk_idx)
5129
5130   def _ExecD8DiskOnly(self, feedback_fn):
5131     """Replace a disk on the primary or secondary for dbrd8.
5132
5133     The algorithm for replace is quite complicated:
5134
5135       1. for each disk to be replaced:
5136
5137         1. create new LVs on the target node with unique names
5138         1. detach old LVs from the drbd device
5139         1. rename old LVs to name_replaced.<time_t>
5140         1. rename new LVs to old LVs
5141         1. attach the new LVs (with the old names now) to the drbd device
5142
5143       1. wait for sync across all devices
5144
5145       1. for each modified disk:
5146
5147         1. remove old LVs (which have the name name_replaces.<time_t>)
5148
5149     Failures are not very well handled.
5150
5151     """
5152     steps_total = 6
5153     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5154     instance = self.instance
5155     iv_names = {}
5156     vgname = self.cfg.GetVGName()
5157     # start of work
5158     cfg = self.cfg
5159     tgt_node = self.tgt_node
5160     oth_node = self.oth_node
5161
5162     # Step: check device activation
5163     self.proc.LogStep(1, steps_total, "check device existence")
5164     info("checking volume groups")
5165     my_vg = cfg.GetVGName()
5166     results = self.rpc.call_vg_list([oth_node, tgt_node])
5167     if not results:
5168       raise errors.OpExecError("Can't list volume groups on the nodes")
5169     for node in oth_node, tgt_node:
5170       res = results[node]
5171       res.Raise("Error checking node %s" % node)
5172       if my_vg not in res.payload:
5173         raise errors.OpExecError("Volume group '%s' not found on %s" %
5174                                  (my_vg, node))
5175     for idx, dev in enumerate(instance.disks):
5176       if idx not in self.op.disks:
5177         continue
5178       for node in tgt_node, oth_node:
5179         info("checking disk/%d on %s" % (idx, node))
5180         cfg.SetDiskID(dev, node)
5181         result = self.rpc.call_blockdev_find(node, dev)
5182         msg = result.fail_msg
5183         if not msg and not result.payload:
5184           msg = "disk not found"
5185         if msg:
5186           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5187                                    (idx, node, msg))
5188
5189     # Step: check other node consistency
5190     self.proc.LogStep(2, steps_total, "check peer consistency")
5191     for idx, dev in enumerate(instance.disks):
5192       if idx not in self.op.disks:
5193         continue
5194       info("checking disk/%d consistency on %s" % (idx, oth_node))
5195       if not _CheckDiskConsistency(self, dev, oth_node,
5196                                    oth_node==instance.primary_node):
5197         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5198                                  " to replace disks on this node (%s)" %
5199                                  (oth_node, tgt_node))
5200
5201     # Step: create new storage
5202     self.proc.LogStep(3, steps_total, "allocate new storage")
5203     for idx, dev in enumerate(instance.disks):
5204       if idx not in self.op.disks:
5205         continue
5206       size = dev.size
5207       cfg.SetDiskID(dev, tgt_node)
5208       lv_names = [".disk%d_%s" % (idx, suf)
5209                   for suf in ["data", "meta"]]
5210       names = _GenerateUniqueNames(self, lv_names)
5211       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5212                              logical_id=(vgname, names[0]))
5213       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5214                              logical_id=(vgname, names[1]))
5215       new_lvs = [lv_data, lv_meta]
5216       old_lvs = dev.children
5217       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5218       info("creating new local storage on %s for %s" %
5219            (tgt_node, dev.iv_name))
5220       # we pass force_create=True to force the LVM creation
5221       for new_lv in new_lvs:
5222         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5223                         _GetInstanceInfoText(instance), False)
5224
5225     # Step: for each lv, detach+rename*2+attach
5226     self.proc.LogStep(4, steps_total, "change drbd configuration")
5227     for dev, old_lvs, new_lvs in iv_names.itervalues():
5228       info("detaching %s drbd from local storage" % dev.iv_name)
5229       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5230       result.Raise("Can't detach drbd from local storage on node"
5231                    " %s for device %s" % (tgt_node, dev.iv_name))
5232       #dev.children = []
5233       #cfg.Update(instance)
5234
5235       # ok, we created the new LVs, so now we know we have the needed
5236       # storage; as such, we proceed on the target node to rename
5237       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5238       # using the assumption that logical_id == physical_id (which in
5239       # turn is the unique_id on that node)
5240
5241       # FIXME(iustin): use a better name for the replaced LVs
5242       temp_suffix = int(time.time())
5243       ren_fn = lambda d, suff: (d.physical_id[0],
5244                                 d.physical_id[1] + "_replaced-%s" % suff)
5245       # build the rename list based on what LVs exist on the node
5246       rlist = []
5247       for to_ren in old_lvs:
5248         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5249         if not result.fail_msg and result.payload:
5250           # device exists
5251           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5252
5253       info("renaming the old LVs on the target node")
5254       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5255       result.Raise("Can't rename old LVs on node %s" % tgt_node)
5256       # now we rename the new LVs to the old LVs
5257       info("renaming the new LVs on the target node")
5258       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5259       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5260       result.Raise("Can't rename new LVs on node %s" % tgt_node)
5261
5262       for old, new in zip(old_lvs, new_lvs):
5263         new.logical_id = old.logical_id
5264         cfg.SetDiskID(new, tgt_node)
5265
5266       for disk in old_lvs:
5267         disk.logical_id = ren_fn(disk, temp_suffix)
5268         cfg.SetDiskID(disk, tgt_node)
5269
5270       # now that the new lvs have the old name, we can add them to the device
5271       info("adding new mirror component on %s" % tgt_node)
5272       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5273       msg = result.fail_msg
5274       if msg:
5275         for new_lv in new_lvs:
5276           msg2 = self.rpc.call_blockdev_remove(tgt_node, new_lv).fail_msg
5277           if msg2:
5278             warning("Can't rollback device %s: %s", dev, msg2,
5279                     hint="cleanup manually the unused logical volumes")
5280         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5281
5282       dev.children = new_lvs
5283       cfg.Update(instance)
5284
5285     # Step: wait for sync
5286
5287     # this can fail as the old devices are degraded and _WaitForSync
5288     # does a combined result over all disks, so we don't check its
5289     # return value
5290     self.proc.LogStep(5, steps_total, "sync devices")
5291     _WaitForSync(self, instance, unlock=True)
5292
5293     # so check manually all the devices
5294     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5295       cfg.SetDiskID(dev, instance.primary_node)
5296       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5297       msg = result.fail_msg
5298       if not msg and not result.payload:
5299         msg = "disk not found"
5300       if msg:
5301         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5302                                  (name, msg))
5303       if result.payload[5]:
5304         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5305
5306     # Step: remove old storage
5307     self.proc.LogStep(6, steps_total, "removing old storage")
5308     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5309       info("remove logical volumes for %s" % name)
5310       for lv in old_lvs:
5311         cfg.SetDiskID(lv, tgt_node)
5312         msg = self.rpc.call_blockdev_remove(tgt_node, lv).fail_msg
5313         if msg:
5314           warning("Can't remove old LV: %s" % msg,
5315                   hint="manually remove unused LVs")
5316           continue
5317
5318   def _ExecD8Secondary(self, feedback_fn):
5319     """Replace the secondary node for drbd8.
5320
5321     The algorithm for replace is quite complicated:
5322       - for all disks of the instance:
5323         - create new LVs on the new node with same names
5324         - shutdown the drbd device on the old secondary
5325         - disconnect the drbd network on the primary
5326         - create the drbd device on the new secondary
5327         - network attach the drbd on the primary, using an artifice:
5328           the drbd code for Attach() will connect to the network if it
5329           finds a device which is connected to the good local disks but
5330           not network enabled
5331       - wait for sync across all devices
5332       - remove all disks from the old secondary
5333
5334     Failures are not very well handled.
5335
5336     """
5337     steps_total = 6
5338     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5339     instance = self.instance
5340     iv_names = {}
5341     # start of work
5342     cfg = self.cfg
5343     old_node = self.tgt_node
5344     new_node = self.new_node
5345     pri_node = instance.primary_node
5346     nodes_ip = {
5347       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5348       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5349       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5350       }
5351
5352     # Step: check device activation
5353     self.proc.LogStep(1, steps_total, "check device existence")
5354     info("checking volume groups")
5355     my_vg = cfg.GetVGName()
5356     results = self.rpc.call_vg_list([pri_node, new_node])
5357     for node in pri_node, new_node:
5358       res = results[node]
5359       res.Raise("Error checking node %s" % node)
5360       if my_vg not in res.payload:
5361         raise errors.OpExecError("Volume group '%s' not found on %s" %
5362                                  (my_vg, node))
5363     for idx, dev in enumerate(instance.disks):
5364       if idx not in self.op.disks:
5365         continue
5366       info("checking disk/%d on %s" % (idx, pri_node))
5367       cfg.SetDiskID(dev, pri_node)
5368       result = self.rpc.call_blockdev_find(pri_node, dev)
5369       msg = result.fail_msg
5370       if not msg and not result.payload:
5371         msg = "disk not found"
5372       if msg:
5373         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5374                                  (idx, pri_node, msg))
5375
5376     # Step: check other node consistency
5377     self.proc.LogStep(2, steps_total, "check peer consistency")
5378     for idx, dev in enumerate(instance.disks):
5379       if idx not in self.op.disks:
5380         continue
5381       info("checking disk/%d consistency on %s" % (idx, pri_node))
5382       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5383         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5384                                  " unsafe to replace the secondary" %
5385                                  pri_node)
5386
5387     # Step: create new storage
5388     self.proc.LogStep(3, steps_total, "allocate new storage")
5389     for idx, dev in enumerate(instance.disks):
5390       info("adding new local storage on %s for disk/%d" %
5391            (new_node, idx))
5392       # we pass force_create=True to force LVM creation
5393       for new_lv in dev.children:
5394         _CreateBlockDev(self, new_node, instance, new_lv, True,
5395                         _GetInstanceInfoText(instance), False)
5396
5397     # Step 4: dbrd minors and drbd setups changes
5398     # after this, we must manually remove the drbd minors on both the
5399     # error and the success paths
5400     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5401                                    instance.name)
5402     logging.debug("Allocated minors %s" % (minors,))
5403     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5404     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5405       size = dev.size
5406       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5407       # create new devices on new_node; note that we create two IDs:
5408       # one without port, so the drbd will be activated without
5409       # networking information on the new node at this stage, and one
5410       # with network, for the latter activation in step 4
5411       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5412       if pri_node == o_node1:
5413         p_minor = o_minor1
5414       else:
5415         p_minor = o_minor2
5416
5417       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5418       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5419
5420       iv_names[idx] = (dev, dev.children, new_net_id)
5421       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5422                     new_net_id)
5423       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5424                               logical_id=new_alone_id,
5425                               children=dev.children)
5426       try:
5427         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5428                               _GetInstanceInfoText(instance), False)
5429       except errors.GenericError:
5430         self.cfg.ReleaseDRBDMinors(instance.name)
5431         raise
5432
5433     for idx, dev in enumerate(instance.disks):
5434       # we have new devices, shutdown the drbd on the old secondary
5435       info("shutting down drbd for disk/%d on old node" % idx)
5436       cfg.SetDiskID(dev, old_node)
5437       msg = self.rpc.call_blockdev_shutdown(old_node, dev).fail_msg
5438       if msg:
5439         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5440                 (idx, msg),
5441                 hint="Please cleanup this device manually as soon as possible")
5442
5443     info("detaching primary drbds from the network (=> standalone)")
5444     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5445                                                instance.disks)[pri_node]
5446
5447     msg = result.fail_msg
5448     if msg:
5449       # detaches didn't succeed (unlikely)
5450       self.cfg.ReleaseDRBDMinors(instance.name)
5451       raise errors.OpExecError("Can't detach the disks from the network on"
5452                                " old node: %s" % (msg,))
5453
5454     # if we managed to detach at least one, we update all the disks of
5455     # the instance to point to the new secondary
5456     info("updating instance configuration")
5457     for dev, _, new_logical_id in iv_names.itervalues():
5458       dev.logical_id = new_logical_id
5459       cfg.SetDiskID(dev, pri_node)
5460     cfg.Update(instance)
5461
5462     # and now perform the drbd attach
5463     info("attaching primary drbds to new secondary (standalone => connected)")
5464     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5465                                            instance.disks, instance.name,
5466                                            False)
5467     for to_node, to_result in result.items():
5468       msg = to_result.fail_msg
5469       if msg:
5470         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5471                 hint="please do a gnt-instance info to see the"
5472                 " status of disks")
5473
5474     # this can fail as the old devices are degraded and _WaitForSync
5475     # does a combined result over all disks, so we don't check its
5476     # return value
5477     self.proc.LogStep(5, steps_total, "sync devices")
5478     _WaitForSync(self, instance, unlock=True)
5479
5480     # so check manually all the devices
5481     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5482       cfg.SetDiskID(dev, pri_node)
5483       result = self.rpc.call_blockdev_find(pri_node, dev)
5484       msg = result.fail_msg
5485       if not msg and not result.payload:
5486         msg = "disk not found"
5487       if msg:
5488         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5489                                  (idx, msg))
5490       if result.payload[5]:
5491         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5492
5493     self.proc.LogStep(6, steps_total, "removing old storage")
5494     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5495       info("remove logical volumes for disk/%d" % idx)
5496       for lv in old_lvs:
5497         cfg.SetDiskID(lv, old_node)
5498         msg = self.rpc.call_blockdev_remove(old_node, lv).fail_msg
5499         if msg:
5500           warning("Can't remove LV on old secondary: %s", msg,
5501                   hint="Cleanup stale volumes by hand")
5502
5503   def Exec(self, feedback_fn):
5504     """Execute disk replacement.
5505
5506     This dispatches the disk replacement to the appropriate handler.
5507
5508     """
5509     instance = self.instance
5510
5511     # Activate the instance disks if we're replacing them on a down instance
5512     if not instance.admin_up:
5513       _StartInstanceDisks(self, instance, True)
5514
5515     if self.op.mode == constants.REPLACE_DISK_CHG:
5516       fn = self._ExecD8Secondary
5517     else:
5518       fn = self._ExecD8DiskOnly
5519
5520     ret = fn(feedback_fn)
5521
5522     # Deactivate the instance disks if we're replacing them on a down instance
5523     if not instance.admin_up:
5524       _SafeShutdownInstanceDisks(self, instance)
5525
5526     return ret
5527
5528
5529 class LUGrowDisk(LogicalUnit):
5530   """Grow a disk of an instance.
5531
5532   """
5533   HPATH = "disk-grow"
5534   HTYPE = constants.HTYPE_INSTANCE
5535   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5536   REQ_BGL = False
5537
5538   def ExpandNames(self):
5539     self._ExpandAndLockInstance()
5540     self.needed_locks[locking.LEVEL_NODE] = []
5541     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5542
5543   def DeclareLocks(self, level):
5544     if level == locking.LEVEL_NODE:
5545       self._LockInstancesNodes()
5546
5547   def BuildHooksEnv(self):
5548     """Build hooks env.
5549
5550     This runs on the master, the primary and all the secondaries.
5551
5552     """
5553     env = {
5554       "DISK": self.op.disk,
5555       "AMOUNT": self.op.amount,
5556       }
5557     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5558     nl = [
5559       self.cfg.GetMasterNode(),
5560       self.instance.primary_node,
5561       ]
5562     return env, nl, nl
5563
5564   def CheckPrereq(self):
5565     """Check prerequisites.
5566
5567     This checks that the instance is in the cluster.
5568
5569     """
5570     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5571     assert instance is not None, \
5572       "Cannot retrieve locked instance %s" % self.op.instance_name
5573     nodenames = list(instance.all_nodes)
5574     for node in nodenames:
5575       _CheckNodeOnline(self, node)
5576
5577
5578     self.instance = instance
5579
5580     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5581       raise errors.OpPrereqError("Instance's disk layout does not support"
5582                                  " growing.")
5583
5584     self.disk = instance.FindDisk(self.op.disk)
5585
5586     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5587                                        instance.hypervisor)
5588     for node in nodenames:
5589       info = nodeinfo[node]
5590       info.Raise("Cannot get current information from node %s" % node)
5591       vg_free = info.payload.get('vg_free', None)
5592       if not isinstance(vg_free, int):
5593         raise errors.OpPrereqError("Can't compute free disk space on"
5594                                    " node %s" % node)
5595       if self.op.amount > vg_free:
5596         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5597                                    " %d MiB available, %d MiB required" %
5598                                    (node, vg_free, self.op.amount))
5599
5600   def Exec(self, feedback_fn):
5601     """Execute disk grow.
5602
5603     """
5604     instance = self.instance
5605     disk = self.disk
5606     for node in instance.all_nodes:
5607       self.cfg.SetDiskID(disk, node)
5608       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5609       result.Raise("Grow request failed to node %s" % node)
5610     disk.RecordGrow(self.op.amount)
5611     self.cfg.Update(instance)
5612     if self.op.wait_for_sync:
5613       disk_abort = not _WaitForSync(self, instance)
5614       if disk_abort:
5615         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5616                              " status.\nPlease check the instance.")
5617
5618
5619 class LUQueryInstanceData(NoHooksLU):
5620   """Query runtime instance data.
5621
5622   """
5623   _OP_REQP = ["instances", "static"]
5624   REQ_BGL = False
5625
5626   def ExpandNames(self):
5627     self.needed_locks = {}
5628     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5629
5630     if not isinstance(self.op.instances, list):
5631       raise errors.OpPrereqError("Invalid argument type 'instances'")
5632
5633     if self.op.instances:
5634       self.wanted_names = []
5635       for name in self.op.instances:
5636         full_name = self.cfg.ExpandInstanceName(name)
5637         if full_name is None:
5638           raise errors.OpPrereqError("Instance '%s' not known" % name)
5639         self.wanted_names.append(full_name)
5640       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5641     else:
5642       self.wanted_names = None
5643       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5644
5645     self.needed_locks[locking.LEVEL_NODE] = []
5646     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5647
5648   def DeclareLocks(self, level):
5649     if level == locking.LEVEL_NODE:
5650       self._LockInstancesNodes()
5651
5652   def CheckPrereq(self):
5653     """Check prerequisites.
5654
5655     This only checks the optional instance list against the existing names.
5656
5657     """
5658     if self.wanted_names is None:
5659       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5660
5661     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5662                              in self.wanted_names]
5663     return
5664
5665   def _ComputeDiskStatus(self, instance, snode, dev):
5666     """Compute block device status.
5667
5668     """
5669     static = self.op.static
5670     if not static:
5671       self.cfg.SetDiskID(dev, instance.primary_node)
5672       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5673       if dev_pstatus.offline:
5674         dev_pstatus = None
5675       else:
5676         dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
5677         dev_pstatus = dev_pstatus.payload
5678     else:
5679       dev_pstatus = None
5680
5681     if dev.dev_type in constants.LDS_DRBD:
5682       # we change the snode then (otherwise we use the one passed in)
5683       if dev.logical_id[0] == instance.primary_node:
5684         snode = dev.logical_id[1]
5685       else:
5686         snode = dev.logical_id[0]
5687
5688     if snode and not static:
5689       self.cfg.SetDiskID(dev, snode)
5690       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5691       if dev_sstatus.offline:
5692         dev_sstatus = None
5693       else:
5694         dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
5695         dev_sstatus = dev_sstatus.payload
5696     else:
5697       dev_sstatus = None
5698
5699     if dev.children:
5700       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5701                       for child in dev.children]
5702     else:
5703       dev_children = []
5704
5705     data = {
5706       "iv_name": dev.iv_name,
5707       "dev_type": dev.dev_type,
5708       "logical_id": dev.logical_id,
5709       "physical_id": dev.physical_id,
5710       "pstatus": dev_pstatus,
5711       "sstatus": dev_sstatus,
5712       "children": dev_children,
5713       "mode": dev.mode,
5714       }
5715
5716     return data
5717
5718   def Exec(self, feedback_fn):
5719     """Gather and return data"""
5720     result = {}
5721
5722     cluster = self.cfg.GetClusterInfo()
5723
5724     for instance in self.wanted_instances:
5725       if not self.op.static:
5726         remote_info = self.rpc.call_instance_info(instance.primary_node,
5727                                                   instance.name,
5728                                                   instance.hypervisor)
5729         remote_info.Raise("Error checking node %s" % instance.primary_node)
5730         remote_info = remote_info.payload
5731         if remote_info and "state" in remote_info:
5732           remote_state = "up"
5733         else:
5734           remote_state = "down"
5735       else:
5736         remote_state = None
5737       if instance.admin_up:
5738         config_state = "up"
5739       else:
5740         config_state = "down"
5741
5742       disks = [self._ComputeDiskStatus(instance, None, device)
5743                for device in instance.disks]
5744
5745       idict = {
5746         "name": instance.name,
5747         "config_state": config_state,
5748         "run_state": remote_state,
5749         "pnode": instance.primary_node,
5750         "snodes": instance.secondary_nodes,
5751         "os": instance.os,
5752         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5753         "disks": disks,
5754         "hypervisor": instance.hypervisor,
5755         "network_port": instance.network_port,
5756         "hv_instance": instance.hvparams,
5757         "hv_actual": cluster.FillHV(instance),
5758         "be_instance": instance.beparams,
5759         "be_actual": cluster.FillBE(instance),
5760         }
5761
5762       result[instance.name] = idict
5763
5764     return result
5765
5766
5767 class LUSetInstanceParams(LogicalUnit):
5768   """Modifies an instances's parameters.
5769
5770   """
5771   HPATH = "instance-modify"
5772   HTYPE = constants.HTYPE_INSTANCE
5773   _OP_REQP = ["instance_name"]
5774   REQ_BGL = False
5775
5776   def CheckArguments(self):
5777     if not hasattr(self.op, 'nics'):
5778       self.op.nics = []
5779     if not hasattr(self.op, 'disks'):
5780       self.op.disks = []
5781     if not hasattr(self.op, 'beparams'):
5782       self.op.beparams = {}
5783     if not hasattr(self.op, 'hvparams'):
5784       self.op.hvparams = {}
5785     self.op.force = getattr(self.op, "force", False)
5786     if not (self.op.nics or self.op.disks or
5787             self.op.hvparams or self.op.beparams):
5788       raise errors.OpPrereqError("No changes submitted")
5789
5790     # Disk validation
5791     disk_addremove = 0
5792     for disk_op, disk_dict in self.op.disks:
5793       if disk_op == constants.DDM_REMOVE:
5794         disk_addremove += 1
5795         continue
5796       elif disk_op == constants.DDM_ADD:
5797         disk_addremove += 1
5798       else:
5799         if not isinstance(disk_op, int):
5800           raise errors.OpPrereqError("Invalid disk index")
5801       if disk_op == constants.DDM_ADD:
5802         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5803         if mode not in constants.DISK_ACCESS_SET:
5804           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5805         size = disk_dict.get('size', None)
5806         if size is None:
5807           raise errors.OpPrereqError("Required disk parameter size missing")
5808         try:
5809           size = int(size)
5810         except ValueError, err:
5811           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5812                                      str(err))
5813         disk_dict['size'] = size
5814       else:
5815         # modification of disk
5816         if 'size' in disk_dict:
5817           raise errors.OpPrereqError("Disk size change not possible, use"
5818                                      " grow-disk")
5819
5820     if disk_addremove > 1:
5821       raise errors.OpPrereqError("Only one disk add or remove operation"
5822                                  " supported at a time")
5823
5824     # NIC validation
5825     nic_addremove = 0
5826     for nic_op, nic_dict in self.op.nics:
5827       if nic_op == constants.DDM_REMOVE:
5828         nic_addremove += 1
5829         continue
5830       elif nic_op == constants.DDM_ADD:
5831         nic_addremove += 1
5832       else:
5833         if not isinstance(nic_op, int):
5834           raise errors.OpPrereqError("Invalid nic index")
5835
5836       # nic_dict should be a dict
5837       nic_ip = nic_dict.get('ip', None)
5838       if nic_ip is not None:
5839         if nic_ip.lower() == constants.VALUE_NONE:
5840           nic_dict['ip'] = None
5841         else:
5842           if not utils.IsValidIP(nic_ip):
5843             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5844
5845       nic_bridge = nic_dict.get('bridge', None)
5846       nic_link = nic_dict.get('link', None)
5847       if nic_bridge and nic_link:
5848         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5849       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5850         nic_dict['bridge'] = None
5851       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5852         nic_dict['link'] = None
5853
5854       if nic_op == constants.DDM_ADD:
5855         nic_mac = nic_dict.get('mac', None)
5856         if nic_mac is None:
5857           nic_dict['mac'] = constants.VALUE_AUTO
5858
5859       if 'mac' in nic_dict:
5860         nic_mac = nic_dict['mac']
5861         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5862           if not utils.IsValidMac(nic_mac):
5863             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5864         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5865           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5866                                      " modifying an existing nic")
5867
5868     if nic_addremove > 1:
5869       raise errors.OpPrereqError("Only one NIC add or remove operation"
5870                                  " supported at a time")
5871
5872   def ExpandNames(self):
5873     self._ExpandAndLockInstance()
5874     self.needed_locks[locking.LEVEL_NODE] = []
5875     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5876
5877   def DeclareLocks(self, level):
5878     if level == locking.LEVEL_NODE:
5879       self._LockInstancesNodes()
5880
5881   def BuildHooksEnv(self):
5882     """Build hooks env.
5883
5884     This runs on the master, primary and secondaries.
5885
5886     """
5887     args = dict()
5888     if constants.BE_MEMORY in self.be_new:
5889       args['memory'] = self.be_new[constants.BE_MEMORY]
5890     if constants.BE_VCPUS in self.be_new:
5891       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5892     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5893     # information at all.
5894     if self.op.nics:
5895       args['nics'] = []
5896       nic_override = dict(self.op.nics)
5897       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
5898       for idx, nic in enumerate(self.instance.nics):
5899         if idx in nic_override:
5900           this_nic_override = nic_override[idx]
5901         else:
5902           this_nic_override = {}
5903         if 'ip' in this_nic_override:
5904           ip = this_nic_override['ip']
5905         else:
5906           ip = nic.ip
5907         if 'mac' in this_nic_override:
5908           mac = this_nic_override['mac']
5909         else:
5910           mac = nic.mac
5911         if idx in self.nic_pnew:
5912           nicparams = self.nic_pnew[idx]
5913         else:
5914           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
5915         mode = nicparams[constants.NIC_MODE]
5916         link = nicparams[constants.NIC_LINK]
5917         args['nics'].append((ip, mac, mode, link))
5918       if constants.DDM_ADD in nic_override:
5919         ip = nic_override[constants.DDM_ADD].get('ip', None)
5920         mac = nic_override[constants.DDM_ADD]['mac']
5921         nicparams = self.nic_pnew[constants.DDM_ADD]
5922         mode = nicparams[constants.NIC_MODE]
5923         link = nicparams[constants.NIC_LINK]
5924         args['nics'].append((ip, mac, mode, link))
5925       elif constants.DDM_REMOVE in nic_override:
5926         del args['nics'][-1]
5927
5928     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5929     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5930     return env, nl, nl
5931
5932   def _GetUpdatedParams(self, old_params, update_dict,
5933                         default_values, parameter_types):
5934     """Return the new params dict for the given params.
5935
5936     @type old_params: dict
5937     @type old_params: old parameters
5938     @type update_dict: dict
5939     @type update_dict: dict containing new parameter values,
5940                        or constants.VALUE_DEFAULT to reset the
5941                        parameter to its default value
5942     @type default_values: dict
5943     @param default_values: default values for the filled parameters
5944     @type parameter_types: dict
5945     @param parameter_types: dict mapping target dict keys to types
5946                             in constants.ENFORCEABLE_TYPES
5947     @rtype: (dict, dict)
5948     @return: (new_parameters, filled_parameters)
5949
5950     """
5951     params_copy = copy.deepcopy(old_params)
5952     for key, val in update_dict.iteritems():
5953       if val == constants.VALUE_DEFAULT:
5954         try:
5955           del params_copy[key]
5956         except KeyError:
5957           pass
5958       else:
5959         params_copy[key] = val
5960     utils.ForceDictType(params_copy, parameter_types)
5961     params_filled = objects.FillDict(default_values, params_copy)
5962     return (params_copy, params_filled)
5963
5964   def CheckPrereq(self):
5965     """Check prerequisites.
5966
5967     This only checks the instance list against the existing names.
5968
5969     """
5970     force = self.force = self.op.force
5971
5972     # checking the new params on the primary/secondary nodes
5973
5974     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5975     cluster = self.cluster = self.cfg.GetClusterInfo()
5976     assert self.instance is not None, \
5977       "Cannot retrieve locked instance %s" % self.op.instance_name
5978     pnode = instance.primary_node
5979     nodelist = list(instance.all_nodes)
5980
5981     # hvparams processing
5982     if self.op.hvparams:
5983       i_hvdict, hv_new = self._GetUpdatedParams(
5984                              instance.hvparams, self.op.hvparams,
5985                              cluster.hvparams[instance.hypervisor],
5986                              constants.HVS_PARAMETER_TYPES)
5987       # local check
5988       hypervisor.GetHypervisor(
5989         instance.hypervisor).CheckParameterSyntax(hv_new)
5990       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5991       self.hv_new = hv_new # the new actual values
5992       self.hv_inst = i_hvdict # the new dict (without defaults)
5993     else:
5994       self.hv_new = self.hv_inst = {}
5995
5996     # beparams processing
5997     if self.op.beparams:
5998       i_bedict, be_new = self._GetUpdatedParams(
5999                              instance.beparams, self.op.beparams,
6000                              cluster.beparams[constants.PP_DEFAULT],
6001                              constants.BES_PARAMETER_TYPES)
6002       self.be_new = be_new # the new actual values
6003       self.be_inst = i_bedict # the new dict (without defaults)
6004     else:
6005       self.be_new = self.be_inst = {}
6006
6007     self.warn = []
6008
6009     if constants.BE_MEMORY in self.op.beparams and not self.force:
6010       mem_check_list = [pnode]
6011       if be_new[constants.BE_AUTO_BALANCE]:
6012         # either we changed auto_balance to yes or it was from before
6013         mem_check_list.extend(instance.secondary_nodes)
6014       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6015                                                   instance.hypervisor)
6016       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6017                                          instance.hypervisor)
6018       pninfo = nodeinfo[pnode]
6019       msg = pninfo.fail_msg
6020       if msg:
6021         # Assume the primary node is unreachable and go ahead
6022         self.warn.append("Can't get info from primary node %s: %s" %
6023                          (pnode,  msg))
6024       elif not isinstance(pninfo.payload.get('memory_free', None), int):
6025         self.warn.append("Node data from primary node %s doesn't contain"
6026                          " free memory information" % pnode)
6027       elif instance_info.fail_msg:
6028         self.warn.append("Can't get instance runtime information: %s" %
6029                         instance_info.fail_msg)
6030       else:
6031         if instance_info.payload:
6032           current_mem = int(instance_info.payload['memory'])
6033         else:
6034           # Assume instance not running
6035           # (there is a slight race condition here, but it's not very probable,
6036           # and we have no other way to check)
6037           current_mem = 0
6038         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6039                     pninfo.payload['memory_free'])
6040         if miss_mem > 0:
6041           raise errors.OpPrereqError("This change will prevent the instance"
6042                                      " from starting, due to %d MB of memory"
6043                                      " missing on its primary node" % miss_mem)
6044
6045       if be_new[constants.BE_AUTO_BALANCE]:
6046         for node, nres in nodeinfo.items():
6047           if node not in instance.secondary_nodes:
6048             continue
6049           msg = nres.fail_msg
6050           if msg:
6051             self.warn.append("Can't get info from secondary node %s: %s" %
6052                              (node, msg))
6053           elif not isinstance(nres.payload.get('memory_free', None), int):
6054             self.warn.append("Secondary node %s didn't return free"
6055                              " memory information" % node)
6056           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6057             self.warn.append("Not enough memory to failover instance to"
6058                              " secondary node %s" % node)
6059
6060     # NIC processing
6061     self.nic_pnew = {}
6062     self.nic_pinst = {}
6063     for nic_op, nic_dict in self.op.nics:
6064       if nic_op == constants.DDM_REMOVE:
6065         if not instance.nics:
6066           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6067         continue
6068       if nic_op != constants.DDM_ADD:
6069         # an existing nic
6070         if nic_op < 0 or nic_op >= len(instance.nics):
6071           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6072                                      " are 0 to %d" %
6073                                      (nic_op, len(instance.nics)))
6074         old_nic_params = instance.nics[nic_op].nicparams
6075         old_nic_ip = instance.nics[nic_op].ip
6076       else:
6077         old_nic_params = {}
6078         old_nic_ip = None
6079
6080       update_params_dict = dict([(key, nic_dict[key])
6081                                  for key in constants.NICS_PARAMETERS
6082                                  if key in nic_dict])
6083
6084       if 'bridge' in nic_dict:
6085         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6086
6087       new_nic_params, new_filled_nic_params = \
6088           self._GetUpdatedParams(old_nic_params, update_params_dict,
6089                                  cluster.nicparams[constants.PP_DEFAULT],
6090                                  constants.NICS_PARAMETER_TYPES)
6091       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6092       self.nic_pinst[nic_op] = new_nic_params
6093       self.nic_pnew[nic_op] = new_filled_nic_params
6094       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6095
6096       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6097         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6098         msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6099         if msg:
6100           msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6101           if self.force:
6102             self.warn.append(msg)
6103           else:
6104             raise errors.OpPrereqError(msg)
6105       if new_nic_mode == constants.NIC_MODE_ROUTED:
6106         if 'ip' in nic_dict:
6107           nic_ip = nic_dict['ip']
6108         else:
6109           nic_ip = old_nic_ip
6110         if nic_ip is None:
6111           raise errors.OpPrereqError('Cannot set the nic ip to None'
6112                                      ' on a routed nic')
6113       if 'mac' in nic_dict:
6114         nic_mac = nic_dict['mac']
6115         if nic_mac is None:
6116           raise errors.OpPrereqError('Cannot set the nic mac to None')
6117         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6118           # otherwise generate the mac
6119           nic_dict['mac'] = self.cfg.GenerateMAC()
6120         else:
6121           # or validate/reserve the current one
6122           if self.cfg.IsMacInUse(nic_mac):
6123             raise errors.OpPrereqError("MAC address %s already in use"
6124                                        " in cluster" % nic_mac)
6125
6126     # DISK processing
6127     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6128       raise errors.OpPrereqError("Disk operations not supported for"
6129                                  " diskless instances")
6130     for disk_op, disk_dict in self.op.disks:
6131       if disk_op == constants.DDM_REMOVE:
6132         if len(instance.disks) == 1:
6133           raise errors.OpPrereqError("Cannot remove the last disk of"
6134                                      " an instance")
6135         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6136         ins_l = ins_l[pnode]
6137         msg = ins_l.fail_msg
6138         if msg:
6139           raise errors.OpPrereqError("Can't contact node %s: %s" %
6140                                      (pnode, msg))
6141         if instance.name in ins_l.payload:
6142           raise errors.OpPrereqError("Instance is running, can't remove"
6143                                      " disks.")
6144
6145       if (disk_op == constants.DDM_ADD and
6146           len(instance.nics) >= constants.MAX_DISKS):
6147         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6148                                    " add more" % constants.MAX_DISKS)
6149       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6150         # an existing disk
6151         if disk_op < 0 or disk_op >= len(instance.disks):
6152           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6153                                      " are 0 to %d" %
6154                                      (disk_op, len(instance.disks)))
6155
6156     return
6157
6158   def Exec(self, feedback_fn):
6159     """Modifies an instance.
6160
6161     All parameters take effect only at the next restart of the instance.
6162
6163     """
6164     # Process here the warnings from CheckPrereq, as we don't have a
6165     # feedback_fn there.
6166     for warn in self.warn:
6167       feedback_fn("WARNING: %s" % warn)
6168
6169     result = []
6170     instance = self.instance
6171     cluster = self.cluster
6172     # disk changes
6173     for disk_op, disk_dict in self.op.disks:
6174       if disk_op == constants.DDM_REMOVE:
6175         # remove the last disk
6176         device = instance.disks.pop()
6177         device_idx = len(instance.disks)
6178         for node, disk in device.ComputeNodeTree(instance.primary_node):
6179           self.cfg.SetDiskID(disk, node)
6180           msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6181           if msg:
6182             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6183                             " continuing anyway", device_idx, node, msg)
6184         result.append(("disk/%d" % device_idx, "remove"))
6185       elif disk_op == constants.DDM_ADD:
6186         # add a new disk
6187         if instance.disk_template == constants.DT_FILE:
6188           file_driver, file_path = instance.disks[0].logical_id
6189           file_path = os.path.dirname(file_path)
6190         else:
6191           file_driver = file_path = None
6192         disk_idx_base = len(instance.disks)
6193         new_disk = _GenerateDiskTemplate(self,
6194                                          instance.disk_template,
6195                                          instance.name, instance.primary_node,
6196                                          instance.secondary_nodes,
6197                                          [disk_dict],
6198                                          file_path,
6199                                          file_driver,
6200                                          disk_idx_base)[0]
6201         instance.disks.append(new_disk)
6202         info = _GetInstanceInfoText(instance)
6203
6204         logging.info("Creating volume %s for instance %s",
6205                      new_disk.iv_name, instance.name)
6206         # Note: this needs to be kept in sync with _CreateDisks
6207         #HARDCODE
6208         for node in instance.all_nodes:
6209           f_create = node == instance.primary_node
6210           try:
6211             _CreateBlockDev(self, node, instance, new_disk,
6212                             f_create, info, f_create)
6213           except errors.OpExecError, err:
6214             self.LogWarning("Failed to create volume %s (%s) on"
6215                             " node %s: %s",
6216                             new_disk.iv_name, new_disk, node, err)
6217         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6218                        (new_disk.size, new_disk.mode)))
6219       else:
6220         # change a given disk
6221         instance.disks[disk_op].mode = disk_dict['mode']
6222         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6223     # NIC changes
6224     for nic_op, nic_dict in self.op.nics:
6225       if nic_op == constants.DDM_REMOVE:
6226         # remove the last nic
6227         del instance.nics[-1]
6228         result.append(("nic.%d" % len(instance.nics), "remove"))
6229       elif nic_op == constants.DDM_ADD:
6230         # mac and bridge should be set, by now
6231         mac = nic_dict['mac']
6232         ip = nic_dict.get('ip', None)
6233         nicparams = self.nic_pinst[constants.DDM_ADD]
6234         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6235         instance.nics.append(new_nic)
6236         result.append(("nic.%d" % (len(instance.nics) - 1),
6237                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6238                        (new_nic.mac, new_nic.ip,
6239                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6240                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6241                        )))
6242       else:
6243         for key in 'mac', 'ip':
6244           if key in nic_dict:
6245             setattr(instance.nics[nic_op], key, nic_dict[key])
6246         if nic_op in self.nic_pnew:
6247           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6248         for key, val in nic_dict.iteritems():
6249           result.append(("nic.%s/%d" % (key, nic_op), val))
6250
6251     # hvparams changes
6252     if self.op.hvparams:
6253       instance.hvparams = self.hv_inst
6254       for key, val in self.op.hvparams.iteritems():
6255         result.append(("hv/%s" % key, val))
6256
6257     # beparams changes
6258     if self.op.beparams:
6259       instance.beparams = self.be_inst
6260       for key, val in self.op.beparams.iteritems():
6261         result.append(("be/%s" % key, val))
6262
6263     self.cfg.Update(instance)
6264
6265     return result
6266
6267
6268 class LUQueryExports(NoHooksLU):
6269   """Query the exports list
6270
6271   """
6272   _OP_REQP = ['nodes']
6273   REQ_BGL = False
6274
6275   def ExpandNames(self):
6276     self.needed_locks = {}
6277     self.share_locks[locking.LEVEL_NODE] = 1
6278     if not self.op.nodes:
6279       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6280     else:
6281       self.needed_locks[locking.LEVEL_NODE] = \
6282         _GetWantedNodes(self, self.op.nodes)
6283
6284   def CheckPrereq(self):
6285     """Check prerequisites.
6286
6287     """
6288     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6289
6290   def Exec(self, feedback_fn):
6291     """Compute the list of all the exported system images.
6292
6293     @rtype: dict
6294     @return: a dictionary with the structure node->(export-list)
6295         where export-list is a list of the instances exported on
6296         that node.
6297
6298     """
6299     rpcresult = self.rpc.call_export_list(self.nodes)
6300     result = {}
6301     for node in rpcresult:
6302       if rpcresult[node].fail_msg:
6303         result[node] = False
6304       else:
6305         result[node] = rpcresult[node].payload
6306
6307     return result
6308
6309
6310 class LUExportInstance(LogicalUnit):
6311   """Export an instance to an image in the cluster.
6312
6313   """
6314   HPATH = "instance-export"
6315   HTYPE = constants.HTYPE_INSTANCE
6316   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6317   REQ_BGL = False
6318
6319   def ExpandNames(self):
6320     self._ExpandAndLockInstance()
6321     # FIXME: lock only instance primary and destination node
6322     #
6323     # Sad but true, for now we have do lock all nodes, as we don't know where
6324     # the previous export might be, and and in this LU we search for it and
6325     # remove it from its current node. In the future we could fix this by:
6326     #  - making a tasklet to search (share-lock all), then create the new one,
6327     #    then one to remove, after
6328     #  - removing the removal operation altoghether
6329     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6330
6331   def DeclareLocks(self, level):
6332     """Last minute lock declaration."""
6333     # All nodes are locked anyway, so nothing to do here.
6334
6335   def BuildHooksEnv(self):
6336     """Build hooks env.
6337
6338     This will run on the master, primary node and target node.
6339
6340     """
6341     env = {
6342       "EXPORT_NODE": self.op.target_node,
6343       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6344       }
6345     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6346     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6347           self.op.target_node]
6348     return env, nl, nl
6349
6350   def CheckPrereq(self):
6351     """Check prerequisites.
6352
6353     This checks that the instance and node names are valid.
6354
6355     """
6356     instance_name = self.op.instance_name
6357     self.instance = self.cfg.GetInstanceInfo(instance_name)
6358     assert self.instance is not None, \
6359           "Cannot retrieve locked instance %s" % self.op.instance_name
6360     _CheckNodeOnline(self, self.instance.primary_node)
6361
6362     self.dst_node = self.cfg.GetNodeInfo(
6363       self.cfg.ExpandNodeName(self.op.target_node))
6364
6365     if self.dst_node is None:
6366       # This is wrong node name, not a non-locked node
6367       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6368     _CheckNodeOnline(self, self.dst_node.name)
6369     _CheckNodeNotDrained(self, self.dst_node.name)
6370
6371     # instance disk type verification
6372     for disk in self.instance.disks:
6373       if disk.dev_type == constants.LD_FILE:
6374         raise errors.OpPrereqError("Export not supported for instances with"
6375                                    " file-based disks")
6376
6377   def Exec(self, feedback_fn):
6378     """Export an instance to an image in the cluster.
6379
6380     """
6381     instance = self.instance
6382     dst_node = self.dst_node
6383     src_node = instance.primary_node
6384     if self.op.shutdown:
6385       # shutdown the instance, but not the disks
6386       result = self.rpc.call_instance_shutdown(src_node, instance)
6387       result.Raise("Could not shutdown instance %s on"
6388                    " node %s" % (instance.name, src_node))
6389
6390     vgname = self.cfg.GetVGName()
6391
6392     snap_disks = []
6393
6394     # set the disks ID correctly since call_instance_start needs the
6395     # correct drbd minor to create the symlinks
6396     for disk in instance.disks:
6397       self.cfg.SetDiskID(disk, src_node)
6398
6399     try:
6400       for disk in instance.disks:
6401         # result.payload will be a snapshot of an lvm leaf of the one we passed
6402         result = self.rpc.call_blockdev_snapshot(src_node, disk)
6403         msg = result.fail_msg
6404         if msg:
6405           self.LogWarning("Could not snapshot block device %s on node %s: %s",
6406                           disk.logical_id[1], src_node, msg)
6407           snap_disks.append(False)
6408         else:
6409           disk_id = (vgname, result.payload)
6410           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6411                                  logical_id=disk_id, physical_id=disk_id,
6412                                  iv_name=disk.iv_name)
6413           snap_disks.append(new_dev)
6414
6415     finally:
6416       if self.op.shutdown and instance.admin_up:
6417         result = self.rpc.call_instance_start(src_node, instance, None, None)
6418         msg = result.fail_msg
6419         if msg:
6420           _ShutdownInstanceDisks(self, instance)
6421           raise errors.OpExecError("Could not start instance: %s" % msg)
6422
6423     # TODO: check for size
6424
6425     cluster_name = self.cfg.GetClusterName()
6426     for idx, dev in enumerate(snap_disks):
6427       if dev:
6428         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6429                                                instance, cluster_name, idx)
6430         msg = result.fail_msg
6431         if msg:
6432           self.LogWarning("Could not export block device %s from node %s to"
6433                           " node %s: %s", dev.logical_id[1], src_node,
6434                           dst_node.name, msg)
6435         msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
6436         if msg:
6437           self.LogWarning("Could not remove snapshot block device %s from node"
6438                           " %s: %s", dev.logical_id[1], src_node, msg)
6439
6440     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6441     msg = result.fail_msg
6442     if msg:
6443       self.LogWarning("Could not finalize export for instance %s"
6444                       " on node %s: %s", instance.name, dst_node.name, msg)
6445
6446     nodelist = self.cfg.GetNodeList()
6447     nodelist.remove(dst_node.name)
6448
6449     # on one-node clusters nodelist will be empty after the removal
6450     # if we proceed the backup would be removed because OpQueryExports
6451     # substitutes an empty list with the full cluster node list.
6452     iname = instance.name
6453     if nodelist:
6454       exportlist = self.rpc.call_export_list(nodelist)
6455       for node in exportlist:
6456         if exportlist[node].fail_msg:
6457           continue
6458         if iname in exportlist[node].payload:
6459           msg = self.rpc.call_export_remove(node, iname).fail_msg
6460           if msg:
6461             self.LogWarning("Could not remove older export for instance %s"
6462                             " on node %s: %s", iname, node, msg)
6463
6464
6465 class LURemoveExport(NoHooksLU):
6466   """Remove exports related to the named instance.
6467
6468   """
6469   _OP_REQP = ["instance_name"]
6470   REQ_BGL = False
6471
6472   def ExpandNames(self):
6473     self.needed_locks = {}
6474     # We need all nodes to be locked in order for RemoveExport to work, but we
6475     # don't need to lock the instance itself, as nothing will happen to it (and
6476     # we can remove exports also for a removed instance)
6477     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6478
6479   def CheckPrereq(self):
6480     """Check prerequisites.
6481     """
6482     pass
6483
6484   def Exec(self, feedback_fn):
6485     """Remove any export.
6486
6487     """
6488     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6489     # If the instance was not found we'll try with the name that was passed in.
6490     # This will only work if it was an FQDN, though.
6491     fqdn_warn = False
6492     if not instance_name:
6493       fqdn_warn = True
6494       instance_name = self.op.instance_name
6495
6496     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6497     exportlist = self.rpc.call_export_list(locked_nodes)
6498     found = False
6499     for node in exportlist:
6500       msg = exportlist[node].fail_msg
6501       if msg:
6502         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6503         continue
6504       if instance_name in exportlist[node].payload:
6505         found = True
6506         result = self.rpc.call_export_remove(node, instance_name)
6507         msg = result.fail_msg
6508         if msg:
6509           logging.error("Could not remove export for instance %s"
6510                         " on node %s: %s", instance_name, node, msg)
6511
6512     if fqdn_warn and not found:
6513       feedback_fn("Export not found. If trying to remove an export belonging"
6514                   " to a deleted instance please use its Fully Qualified"
6515                   " Domain Name.")
6516
6517
6518 class TagsLU(NoHooksLU):
6519   """Generic tags LU.
6520
6521   This is an abstract class which is the parent of all the other tags LUs.
6522
6523   """
6524
6525   def ExpandNames(self):
6526     self.needed_locks = {}
6527     if self.op.kind == constants.TAG_NODE:
6528       name = self.cfg.ExpandNodeName(self.op.name)
6529       if name is None:
6530         raise errors.OpPrereqError("Invalid node name (%s)" %
6531                                    (self.op.name,))
6532       self.op.name = name
6533       self.needed_locks[locking.LEVEL_NODE] = name
6534     elif self.op.kind == constants.TAG_INSTANCE:
6535       name = self.cfg.ExpandInstanceName(self.op.name)
6536       if name is None:
6537         raise errors.OpPrereqError("Invalid instance name (%s)" %
6538                                    (self.op.name,))
6539       self.op.name = name
6540       self.needed_locks[locking.LEVEL_INSTANCE] = name
6541
6542   def CheckPrereq(self):
6543     """Check prerequisites.
6544
6545     """
6546     if self.op.kind == constants.TAG_CLUSTER:
6547       self.target = self.cfg.GetClusterInfo()
6548     elif self.op.kind == constants.TAG_NODE:
6549       self.target = self.cfg.GetNodeInfo(self.op.name)
6550     elif self.op.kind == constants.TAG_INSTANCE:
6551       self.target = self.cfg.GetInstanceInfo(self.op.name)
6552     else:
6553       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6554                                  str(self.op.kind))
6555
6556
6557 class LUGetTags(TagsLU):
6558   """Returns the tags of a given object.
6559
6560   """
6561   _OP_REQP = ["kind", "name"]
6562   REQ_BGL = False
6563
6564   def Exec(self, feedback_fn):
6565     """Returns the tag list.
6566
6567     """
6568     return list(self.target.GetTags())
6569
6570
6571 class LUSearchTags(NoHooksLU):
6572   """Searches the tags for a given pattern.
6573
6574   """
6575   _OP_REQP = ["pattern"]
6576   REQ_BGL = False
6577
6578   def ExpandNames(self):
6579     self.needed_locks = {}
6580
6581   def CheckPrereq(self):
6582     """Check prerequisites.
6583
6584     This checks the pattern passed for validity by compiling it.
6585
6586     """
6587     try:
6588       self.re = re.compile(self.op.pattern)
6589     except re.error, err:
6590       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6591                                  (self.op.pattern, err))
6592
6593   def Exec(self, feedback_fn):
6594     """Returns the tag list.
6595
6596     """
6597     cfg = self.cfg
6598     tgts = [("/cluster", cfg.GetClusterInfo())]
6599     ilist = cfg.GetAllInstancesInfo().values()
6600     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6601     nlist = cfg.GetAllNodesInfo().values()
6602     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6603     results = []
6604     for path, target in tgts:
6605       for tag in target.GetTags():
6606         if self.re.search(tag):
6607           results.append((path, tag))
6608     return results
6609
6610
6611 class LUAddTags(TagsLU):
6612   """Sets a tag on a given object.
6613
6614   """
6615   _OP_REQP = ["kind", "name", "tags"]
6616   REQ_BGL = False
6617
6618   def CheckPrereq(self):
6619     """Check prerequisites.
6620
6621     This checks the type and length of the tag name and value.
6622
6623     """
6624     TagsLU.CheckPrereq(self)
6625     for tag in self.op.tags:
6626       objects.TaggableObject.ValidateTag(tag)
6627
6628   def Exec(self, feedback_fn):
6629     """Sets the tag.
6630
6631     """
6632     try:
6633       for tag in self.op.tags:
6634         self.target.AddTag(tag)
6635     except errors.TagError, err:
6636       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6637     try:
6638       self.cfg.Update(self.target)
6639     except errors.ConfigurationError:
6640       raise errors.OpRetryError("There has been a modification to the"
6641                                 " config file and the operation has been"
6642                                 " aborted. Please retry.")
6643
6644
6645 class LUDelTags(TagsLU):
6646   """Delete a list of tags from a given object.
6647
6648   """
6649   _OP_REQP = ["kind", "name", "tags"]
6650   REQ_BGL = False
6651
6652   def CheckPrereq(self):
6653     """Check prerequisites.
6654
6655     This checks that we have the given tag.
6656
6657     """
6658     TagsLU.CheckPrereq(self)
6659     for tag in self.op.tags:
6660       objects.TaggableObject.ValidateTag(tag)
6661     del_tags = frozenset(self.op.tags)
6662     cur_tags = self.target.GetTags()
6663     if not del_tags <= cur_tags:
6664       diff_tags = del_tags - cur_tags
6665       diff_names = ["'%s'" % tag for tag in diff_tags]
6666       diff_names.sort()
6667       raise errors.OpPrereqError("Tag(s) %s not found" %
6668                                  (",".join(diff_names)))
6669
6670   def Exec(self, feedback_fn):
6671     """Remove the tag from the object.
6672
6673     """
6674     for tag in self.op.tags:
6675       self.target.RemoveTag(tag)
6676     try:
6677       self.cfg.Update(self.target)
6678     except errors.ConfigurationError:
6679       raise errors.OpRetryError("There has been a modification to the"
6680                                 " config file and the operation has been"
6681                                 " aborted. Please retry.")
6682
6683
6684 class LUTestDelay(NoHooksLU):
6685   """Sleep for a specified amount of time.
6686
6687   This LU sleeps on the master and/or nodes for a specified amount of
6688   time.
6689
6690   """
6691   _OP_REQP = ["duration", "on_master", "on_nodes"]
6692   REQ_BGL = False
6693
6694   def ExpandNames(self):
6695     """Expand names and set required locks.
6696
6697     This expands the node list, if any.
6698
6699     """
6700     self.needed_locks = {}
6701     if self.op.on_nodes:
6702       # _GetWantedNodes can be used here, but is not always appropriate to use
6703       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6704       # more information.
6705       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6706       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6707
6708   def CheckPrereq(self):
6709     """Check prerequisites.
6710
6711     """
6712
6713   def Exec(self, feedback_fn):
6714     """Do the actual sleep.
6715
6716     """
6717     if self.op.on_master:
6718       if not utils.TestDelay(self.op.duration):
6719         raise errors.OpExecError("Error during master delay test")
6720     if self.op.on_nodes:
6721       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6722       for node, node_result in result.items():
6723         node_result.Raise("Failure during rpc call to node %s" % node)
6724
6725
6726 class IAllocator(object):
6727   """IAllocator framework.
6728
6729   An IAllocator instance has three sets of attributes:
6730     - cfg that is needed to query the cluster
6731     - input data (all members of the _KEYS class attribute are required)
6732     - four buffer attributes (in|out_data|text), that represent the
6733       input (to the external script) in text and data structure format,
6734       and the output from it, again in two formats
6735     - the result variables from the script (success, info, nodes) for
6736       easy usage
6737
6738   """
6739   _ALLO_KEYS = [
6740     "mem_size", "disks", "disk_template",
6741     "os", "tags", "nics", "vcpus", "hypervisor",
6742     ]
6743   _RELO_KEYS = [
6744     "relocate_from",
6745     ]
6746
6747   def __init__(self, lu, mode, name, **kwargs):
6748     self.lu = lu
6749     # init buffer variables
6750     self.in_text = self.out_text = self.in_data = self.out_data = None
6751     # init all input fields so that pylint is happy
6752     self.mode = mode
6753     self.name = name
6754     self.mem_size = self.disks = self.disk_template = None
6755     self.os = self.tags = self.nics = self.vcpus = None
6756     self.hypervisor = None
6757     self.relocate_from = None
6758     # computed fields
6759     self.required_nodes = None
6760     # init result fields
6761     self.success = self.info = self.nodes = None
6762     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6763       keyset = self._ALLO_KEYS
6764     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6765       keyset = self._RELO_KEYS
6766     else:
6767       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6768                                    " IAllocator" % self.mode)
6769     for key in kwargs:
6770       if key not in keyset:
6771         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6772                                      " IAllocator" % key)
6773       setattr(self, key, kwargs[key])
6774     for key in keyset:
6775       if key not in kwargs:
6776         raise errors.ProgrammerError("Missing input parameter '%s' to"
6777                                      " IAllocator" % key)
6778     self._BuildInputData()
6779
6780   def _ComputeClusterData(self):
6781     """Compute the generic allocator input data.
6782
6783     This is the data that is independent of the actual operation.
6784
6785     """
6786     cfg = self.lu.cfg
6787     cluster_info = cfg.GetClusterInfo()
6788     # cluster data
6789     data = {
6790       "version": constants.IALLOCATOR_VERSION,
6791       "cluster_name": cfg.GetClusterName(),
6792       "cluster_tags": list(cluster_info.GetTags()),
6793       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6794       # we don't have job IDs
6795       }
6796     iinfo = cfg.GetAllInstancesInfo().values()
6797     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6798
6799     # node data
6800     node_results = {}
6801     node_list = cfg.GetNodeList()
6802
6803     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6804       hypervisor_name = self.hypervisor
6805     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6806       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6807
6808     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6809                                            hypervisor_name)
6810     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6811                        cluster_info.enabled_hypervisors)
6812     for nname, nresult in node_data.items():
6813       # first fill in static (config-based) values
6814       ninfo = cfg.GetNodeInfo(nname)
6815       pnr = {
6816         "tags": list(ninfo.GetTags()),
6817         "primary_ip": ninfo.primary_ip,
6818         "secondary_ip": ninfo.secondary_ip,
6819         "offline": ninfo.offline,
6820         "drained": ninfo.drained,
6821         "master_candidate": ninfo.master_candidate,
6822         }
6823
6824       if not ninfo.offline:
6825         nresult.Raise("Can't get data for node %s" % nname)
6826         node_iinfo[nname].Raise("Can't get node instance info from node %s" %
6827                                 nname)
6828         remote_info = nresult.payload
6829         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6830                      'vg_size', 'vg_free', 'cpu_total']:
6831           if attr not in remote_info:
6832             raise errors.OpExecError("Node '%s' didn't return attribute"
6833                                      " '%s'" % (nname, attr))
6834           if not isinstance(remote_info[attr], int):
6835             raise errors.OpExecError("Node '%s' returned invalid value"
6836                                      " for '%s': %s" %
6837                                      (nname, attr, remote_info[attr]))
6838         # compute memory used by primary instances
6839         i_p_mem = i_p_up_mem = 0
6840         for iinfo, beinfo in i_list:
6841           if iinfo.primary_node == nname:
6842             i_p_mem += beinfo[constants.BE_MEMORY]
6843             if iinfo.name not in node_iinfo[nname].payload:
6844               i_used_mem = 0
6845             else:
6846               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6847             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6848             remote_info['memory_free'] -= max(0, i_mem_diff)
6849
6850             if iinfo.admin_up:
6851               i_p_up_mem += beinfo[constants.BE_MEMORY]
6852
6853         # compute memory used by instances
6854         pnr_dyn = {
6855           "total_memory": remote_info['memory_total'],
6856           "reserved_memory": remote_info['memory_dom0'],
6857           "free_memory": remote_info['memory_free'],
6858           "total_disk": remote_info['vg_size'],
6859           "free_disk": remote_info['vg_free'],
6860           "total_cpus": remote_info['cpu_total'],
6861           "i_pri_memory": i_p_mem,
6862           "i_pri_up_memory": i_p_up_mem,
6863           }
6864         pnr.update(pnr_dyn)
6865
6866       node_results[nname] = pnr
6867     data["nodes"] = node_results
6868
6869     # instance data
6870     instance_data = {}
6871     for iinfo, beinfo in i_list:
6872       nic_data = []
6873       for nic in iinfo.nics:
6874         filled_params = objects.FillDict(
6875             cluster_info.nicparams[constants.PP_DEFAULT],
6876             nic.nicparams)
6877         nic_dict = {"mac": nic.mac,
6878                     "ip": nic.ip,
6879                     "mode": filled_params[constants.NIC_MODE],
6880                     "link": filled_params[constants.NIC_LINK],
6881                    }
6882         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
6883           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
6884         nic_data.append(nic_dict)
6885       pir = {
6886         "tags": list(iinfo.GetTags()),
6887         "admin_up": iinfo.admin_up,
6888         "vcpus": beinfo[constants.BE_VCPUS],
6889         "memory": beinfo[constants.BE_MEMORY],
6890         "os": iinfo.os,
6891         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6892         "nics": nic_data,
6893         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6894         "disk_template": iinfo.disk_template,
6895         "hypervisor": iinfo.hypervisor,
6896         }
6897       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6898                                                  pir["disks"])
6899       instance_data[iinfo.name] = pir
6900
6901     data["instances"] = instance_data
6902
6903     self.in_data = data
6904
6905   def _AddNewInstance(self):
6906     """Add new instance data to allocator structure.
6907
6908     This in combination with _AllocatorGetClusterData will create the
6909     correct structure needed as input for the allocator.
6910
6911     The checks for the completeness of the opcode must have already been
6912     done.
6913
6914     """
6915     data = self.in_data
6916
6917     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6918
6919     if self.disk_template in constants.DTS_NET_MIRROR:
6920       self.required_nodes = 2
6921     else:
6922       self.required_nodes = 1
6923     request = {
6924       "type": "allocate",
6925       "name": self.name,
6926       "disk_template": self.disk_template,
6927       "tags": self.tags,
6928       "os": self.os,
6929       "vcpus": self.vcpus,
6930       "memory": self.mem_size,
6931       "disks": self.disks,
6932       "disk_space_total": disk_space,
6933       "nics": self.nics,
6934       "required_nodes": self.required_nodes,
6935       }
6936     data["request"] = request
6937
6938   def _AddRelocateInstance(self):
6939     """Add relocate instance data to allocator structure.
6940
6941     This in combination with _IAllocatorGetClusterData will create the
6942     correct structure needed as input for the allocator.
6943
6944     The checks for the completeness of the opcode must have already been
6945     done.
6946
6947     """
6948     instance = self.lu.cfg.GetInstanceInfo(self.name)
6949     if instance is None:
6950       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6951                                    " IAllocator" % self.name)
6952
6953     if instance.disk_template not in constants.DTS_NET_MIRROR:
6954       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6955
6956     if len(instance.secondary_nodes) != 1:
6957       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6958
6959     self.required_nodes = 1
6960     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6961     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6962
6963     request = {
6964       "type": "relocate",
6965       "name": self.name,
6966       "disk_space_total": disk_space,
6967       "required_nodes": self.required_nodes,
6968       "relocate_from": self.relocate_from,
6969       }
6970     self.in_data["request"] = request
6971
6972   def _BuildInputData(self):
6973     """Build input data structures.
6974
6975     """
6976     self._ComputeClusterData()
6977
6978     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6979       self._AddNewInstance()
6980     else:
6981       self._AddRelocateInstance()
6982
6983     self.in_text = serializer.Dump(self.in_data)
6984
6985   def Run(self, name, validate=True, call_fn=None):
6986     """Run an instance allocator and return the results.
6987
6988     """
6989     if call_fn is None:
6990       call_fn = self.lu.rpc.call_iallocator_runner
6991     data = self.in_text
6992
6993     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6994     result.Raise("Failure while running the iallocator script")
6995
6996     self.out_text = result.payload
6997     if validate:
6998       self._ValidateResult()
6999
7000   def _ValidateResult(self):
7001     """Process the allocator results.
7002
7003     This will process and if successful save the result in
7004     self.out_data and the other parameters.
7005
7006     """
7007     try:
7008       rdict = serializer.Load(self.out_text)
7009     except Exception, err:
7010       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7011
7012     if not isinstance(rdict, dict):
7013       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7014
7015     for key in "success", "info", "nodes":
7016       if key not in rdict:
7017         raise errors.OpExecError("Can't parse iallocator results:"
7018                                  " missing key '%s'" % key)
7019       setattr(self, key, rdict[key])
7020
7021     if not isinstance(rdict["nodes"], list):
7022       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7023                                " is not a list")
7024     self.out_data = rdict
7025
7026
7027 class LUTestAllocator(NoHooksLU):
7028   """Run allocator tests.
7029
7030   This LU runs the allocator tests
7031
7032   """
7033   _OP_REQP = ["direction", "mode", "name"]
7034
7035   def CheckPrereq(self):
7036     """Check prerequisites.
7037
7038     This checks the opcode parameters depending on the director and mode test.
7039
7040     """
7041     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7042       for attr in ["name", "mem_size", "disks", "disk_template",
7043                    "os", "tags", "nics", "vcpus"]:
7044         if not hasattr(self.op, attr):
7045           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7046                                      attr)
7047       iname = self.cfg.ExpandInstanceName(self.op.name)
7048       if iname is not None:
7049         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7050                                    iname)
7051       if not isinstance(self.op.nics, list):
7052         raise errors.OpPrereqError("Invalid parameter 'nics'")
7053       for row in self.op.nics:
7054         if (not isinstance(row, dict) or
7055             "mac" not in row or
7056             "ip" not in row or
7057             "bridge" not in row):
7058           raise errors.OpPrereqError("Invalid contents of the"
7059                                      " 'nics' parameter")
7060       if not isinstance(self.op.disks, list):
7061         raise errors.OpPrereqError("Invalid parameter 'disks'")
7062       for row in self.op.disks:
7063         if (not isinstance(row, dict) or
7064             "size" not in row or
7065             not isinstance(row["size"], int) or
7066             "mode" not in row or
7067             row["mode"] not in ['r', 'w']):
7068           raise errors.OpPrereqError("Invalid contents of the"
7069                                      " 'disks' parameter")
7070       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7071         self.op.hypervisor = self.cfg.GetHypervisorType()
7072     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7073       if not hasattr(self.op, "name"):
7074         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7075       fname = self.cfg.ExpandInstanceName(self.op.name)
7076       if fname is None:
7077         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7078                                    self.op.name)
7079       self.op.name = fname
7080       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7081     else:
7082       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7083                                  self.op.mode)
7084
7085     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7086       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7087         raise errors.OpPrereqError("Missing allocator name")
7088     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7089       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7090                                  self.op.direction)
7091
7092   def Exec(self, feedback_fn):
7093     """Run the allocator test.
7094
7095     """
7096     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7097       ial = IAllocator(self,
7098                        mode=self.op.mode,
7099                        name=self.op.name,
7100                        mem_size=self.op.mem_size,
7101                        disks=self.op.disks,
7102                        disk_template=self.op.disk_template,
7103                        os=self.op.os,
7104                        tags=self.op.tags,
7105                        nics=self.op.nics,
7106                        vcpus=self.op.vcpus,
7107                        hypervisor=self.op.hypervisor,
7108                        )
7109     else:
7110       ial = IAllocator(self,
7111                        mode=self.op.mode,
7112                        name=self.op.name,
7113                        relocate_from=list(self.relocate_from),
7114                        )
7115
7116     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7117       result = ial.in_text
7118     else:
7119       ial.Run(self.op.allocator, validate=False)
7120       result = ial.out_text
7121     return result