LUSetInstanceParams: abstract _GetUpdatedParams
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, bridge, mac) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
509       env["INSTANCE_NIC%d_MAC" % idx] = mac
510   else:
511     nic_count = 0
512
513   env["INSTANCE_NIC_COUNT"] = nic_count
514
515   if disks:
516     disk_count = len(disks)
517     for idx, (size, mode) in enumerate(disks):
518       env["INSTANCE_DISK%d_SIZE" % idx] = size
519       env["INSTANCE_DISK%d_MODE" % idx] = mode
520   else:
521     disk_count = 0
522
523   env["INSTANCE_DISK_COUNT"] = disk_count
524
525   return env
526
527
528 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
529   """Builds instance related env variables for hooks from an object.
530
531   @type lu: L{LogicalUnit}
532   @param lu: the logical unit on whose behalf we execute
533   @type instance: L{objects.Instance}
534   @param instance: the instance for which we should build the
535       environment
536   @type override: dict
537   @param override: dictionary with key/values that will override
538       our values
539   @rtype: dict
540   @return: the hook environment dictionary
541
542   """
543   bep = lu.cfg.GetClusterInfo().FillBE(instance)
544   args = {
545     'name': instance.name,
546     'primary_node': instance.primary_node,
547     'secondary_nodes': instance.secondary_nodes,
548     'os_type': instance.os,
549     'status': instance.admin_up,
550     'memory': bep[constants.BE_MEMORY],
551     'vcpus': bep[constants.BE_VCPUS],
552     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
553     'disk_template': instance.disk_template,
554     'disks': [(disk.size, disk.mode) for disk in instance.disks],
555   }
556   if override:
557     args.update(override)
558   return _BuildInstanceHookEnv(**args)
559
560
561 def _AdjustCandidatePool(lu):
562   """Adjust the candidate pool after node operations.
563
564   """
565   mod_list = lu.cfg.MaintainCandidatePool()
566   if mod_list:
567     lu.LogInfo("Promoted nodes to master candidate role: %s",
568                ", ".join(node.name for node in mod_list))
569     for name in mod_list:
570       lu.context.ReaddNode(name)
571   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
572   if mc_now > mc_max:
573     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
574                (mc_now, mc_max))
575
576
577 def _CheckInstanceBridgesExist(lu, instance):
578   """Check that the brigdes needed by an instance exist.
579
580   """
581   # check bridges existance
582   brlist = [nic.bridge for nic in instance.nics]
583   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
584   result.Raise()
585   if not result.data:
586     raise errors.OpPrereqError("One or more target bridges %s does not"
587                                " exist on destination node '%s'" %
588                                (brlist, instance.primary_node))
589
590
591 class LUDestroyCluster(NoHooksLU):
592   """Logical unit for destroying the cluster.
593
594   """
595   _OP_REQP = []
596
597   def CheckPrereq(self):
598     """Check prerequisites.
599
600     This checks whether the cluster is empty.
601
602     Any errors are signalled by raising errors.OpPrereqError.
603
604     """
605     master = self.cfg.GetMasterNode()
606
607     nodelist = self.cfg.GetNodeList()
608     if len(nodelist) != 1 or nodelist[0] != master:
609       raise errors.OpPrereqError("There are still %d node(s) in"
610                                  " this cluster." % (len(nodelist) - 1))
611     instancelist = self.cfg.GetInstanceList()
612     if instancelist:
613       raise errors.OpPrereqError("There are still %d instance(s) in"
614                                  " this cluster." % len(instancelist))
615
616   def Exec(self, feedback_fn):
617     """Destroys the cluster.
618
619     """
620     master = self.cfg.GetMasterNode()
621     result = self.rpc.call_node_stop_master(master, False)
622     result.Raise()
623     if not result.data:
624       raise errors.OpExecError("Could not disable the master role")
625     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
626     utils.CreateBackup(priv_key)
627     utils.CreateBackup(pub_key)
628     return master
629
630
631 class LUVerifyCluster(LogicalUnit):
632   """Verifies the cluster status.
633
634   """
635   HPATH = "cluster-verify"
636   HTYPE = constants.HTYPE_CLUSTER
637   _OP_REQP = ["skip_checks"]
638   REQ_BGL = False
639
640   def ExpandNames(self):
641     self.needed_locks = {
642       locking.LEVEL_NODE: locking.ALL_SET,
643       locking.LEVEL_INSTANCE: locking.ALL_SET,
644     }
645     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
646
647   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
648                   node_result, feedback_fn, master_files,
649                   drbd_map, vg_name):
650     """Run multiple tests against a node.
651
652     Test list:
653
654       - compares ganeti version
655       - checks vg existance and size > 20G
656       - checks config file checksum
657       - checks ssh to other nodes
658
659     @type nodeinfo: L{objects.Node}
660     @param nodeinfo: the node to check
661     @param file_list: required list of files
662     @param local_cksum: dictionary of local files and their checksums
663     @param node_result: the results from the node
664     @param feedback_fn: function used to accumulate results
665     @param master_files: list of files that only masters should have
666     @param drbd_map: the useddrbd minors for this node, in
667         form of minor: (instance, must_exist) which correspond to instances
668         and their running status
669     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
670
671     """
672     node = nodeinfo.name
673
674     # main result, node_result should be a non-empty dict
675     if not node_result or not isinstance(node_result, dict):
676       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
677       return True
678
679     # compares ganeti version
680     local_version = constants.PROTOCOL_VERSION
681     remote_version = node_result.get('version', None)
682     if not (remote_version and isinstance(remote_version, (list, tuple)) and
683             len(remote_version) == 2):
684       feedback_fn("  - ERROR: connection to %s failed" % (node))
685       return True
686
687     if local_version != remote_version[0]:
688       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
689                   " node %s %s" % (local_version, node, remote_version[0]))
690       return True
691
692     # node seems compatible, we can actually try to look into its results
693
694     bad = False
695
696     # full package version
697     if constants.RELEASE_VERSION != remote_version[1]:
698       feedback_fn("  - WARNING: software version mismatch: master %s,"
699                   " node %s %s" %
700                   (constants.RELEASE_VERSION, node, remote_version[1]))
701
702     # checks vg existence and size > 20G
703     if vg_name is not None:
704       vglist = node_result.get(constants.NV_VGLIST, None)
705       if not vglist:
706         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
707                         (node,))
708         bad = True
709       else:
710         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
711                                               constants.MIN_VG_SIZE)
712         if vgstatus:
713           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
714           bad = True
715
716     # checks config file checksum
717
718     remote_cksum = node_result.get(constants.NV_FILELIST, None)
719     if not isinstance(remote_cksum, dict):
720       bad = True
721       feedback_fn("  - ERROR: node hasn't returned file checksum data")
722     else:
723       for file_name in file_list:
724         node_is_mc = nodeinfo.master_candidate
725         must_have_file = file_name not in master_files
726         if file_name not in remote_cksum:
727           if node_is_mc or must_have_file:
728             bad = True
729             feedback_fn("  - ERROR: file '%s' missing" % file_name)
730         elif remote_cksum[file_name] != local_cksum[file_name]:
731           if node_is_mc or must_have_file:
732             bad = True
733             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
734           else:
735             # not candidate and this is not a must-have file
736             bad = True
737             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
738                         " '%s'" % file_name)
739         else:
740           # all good, except non-master/non-must have combination
741           if not node_is_mc and not must_have_file:
742             feedback_fn("  - ERROR: file '%s' should not exist on non master"
743                         " candidates" % file_name)
744
745     # checks ssh to any
746
747     if constants.NV_NODELIST not in node_result:
748       bad = True
749       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
750     else:
751       if node_result[constants.NV_NODELIST]:
752         bad = True
753         for node in node_result[constants.NV_NODELIST]:
754           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
755                           (node, node_result[constants.NV_NODELIST][node]))
756
757     if constants.NV_NODENETTEST not in node_result:
758       bad = True
759       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
760     else:
761       if node_result[constants.NV_NODENETTEST]:
762         bad = True
763         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
764         for node in nlist:
765           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
766                           (node, node_result[constants.NV_NODENETTEST][node]))
767
768     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
769     if isinstance(hyp_result, dict):
770       for hv_name, hv_result in hyp_result.iteritems():
771         if hv_result is not None:
772           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
773                       (hv_name, hv_result))
774
775     # check used drbd list
776     if vg_name is not None:
777       used_minors = node_result.get(constants.NV_DRBDLIST, [])
778       if not isinstance(used_minors, (tuple, list)):
779         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
780                     str(used_minors))
781       else:
782         for minor, (iname, must_exist) in drbd_map.items():
783           if minor not in used_minors and must_exist:
784             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
785                         " not active" % (minor, iname))
786             bad = True
787         for minor in used_minors:
788           if minor not in drbd_map:
789             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
790                         minor)
791             bad = True
792
793     return bad
794
795   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
796                       node_instance, feedback_fn, n_offline):
797     """Verify an instance.
798
799     This function checks to see if the required block devices are
800     available on the instance's node.
801
802     """
803     bad = False
804
805     node_current = instanceconfig.primary_node
806
807     node_vol_should = {}
808     instanceconfig.MapLVsByNode(node_vol_should)
809
810     for node in node_vol_should:
811       if node in n_offline:
812         # ignore missing volumes on offline nodes
813         continue
814       for volume in node_vol_should[node]:
815         if node not in node_vol_is or volume not in node_vol_is[node]:
816           feedback_fn("  - ERROR: volume %s missing on node %s" %
817                           (volume, node))
818           bad = True
819
820     if instanceconfig.admin_up:
821       if ((node_current not in node_instance or
822           not instance in node_instance[node_current]) and
823           node_current not in n_offline):
824         feedback_fn("  - ERROR: instance %s not running on node %s" %
825                         (instance, node_current))
826         bad = True
827
828     for node in node_instance:
829       if (not node == node_current):
830         if instance in node_instance[node]:
831           feedback_fn("  - ERROR: instance %s should not run on node %s" %
832                           (instance, node))
833           bad = True
834
835     return bad
836
837   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
838     """Verify if there are any unknown volumes in the cluster.
839
840     The .os, .swap and backup volumes are ignored. All other volumes are
841     reported as unknown.
842
843     """
844     bad = False
845
846     for node in node_vol_is:
847       for volume in node_vol_is[node]:
848         if node not in node_vol_should or volume not in node_vol_should[node]:
849           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
850                       (volume, node))
851           bad = True
852     return bad
853
854   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
855     """Verify the list of running instances.
856
857     This checks what instances are running but unknown to the cluster.
858
859     """
860     bad = False
861     for node in node_instance:
862       for runninginstance in node_instance[node]:
863         if runninginstance not in instancelist:
864           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
865                           (runninginstance, node))
866           bad = True
867     return bad
868
869   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
870     """Verify N+1 Memory Resilience.
871
872     Check that if one single node dies we can still start all the instances it
873     was primary for.
874
875     """
876     bad = False
877
878     for node, nodeinfo in node_info.iteritems():
879       # This code checks that every node which is now listed as secondary has
880       # enough memory to host all instances it is supposed to should a single
881       # other node in the cluster fail.
882       # FIXME: not ready for failover to an arbitrary node
883       # FIXME: does not support file-backed instances
884       # WARNING: we currently take into account down instances as well as up
885       # ones, considering that even if they're down someone might want to start
886       # them even in the event of a node failure.
887       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
888         needed_mem = 0
889         for instance in instances:
890           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
891           if bep[constants.BE_AUTO_BALANCE]:
892             needed_mem += bep[constants.BE_MEMORY]
893         if nodeinfo['mfree'] < needed_mem:
894           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
895                       " failovers should node %s fail" % (node, prinode))
896           bad = True
897     return bad
898
899   def CheckPrereq(self):
900     """Check prerequisites.
901
902     Transform the list of checks we're going to skip into a set and check that
903     all its members are valid.
904
905     """
906     self.skip_set = frozenset(self.op.skip_checks)
907     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
908       raise errors.OpPrereqError("Invalid checks to be skipped specified")
909
910   def BuildHooksEnv(self):
911     """Build hooks env.
912
913     Cluster-Verify hooks just rone in the post phase and their failure makes
914     the output be logged in the verify output and the verification to fail.
915
916     """
917     all_nodes = self.cfg.GetNodeList()
918     env = {
919       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
920       }
921     for node in self.cfg.GetAllNodesInfo().values():
922       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
923
924     return env, [], all_nodes
925
926   def Exec(self, feedback_fn):
927     """Verify integrity of cluster, performing various test on nodes.
928
929     """
930     bad = False
931     feedback_fn("* Verifying global settings")
932     for msg in self.cfg.VerifyConfig():
933       feedback_fn("  - ERROR: %s" % msg)
934
935     vg_name = self.cfg.GetVGName()
936     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
937     nodelist = utils.NiceSort(self.cfg.GetNodeList())
938     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
939     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
940     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
941                         for iname in instancelist)
942     i_non_redundant = [] # Non redundant instances
943     i_non_a_balanced = [] # Non auto-balanced instances
944     n_offline = [] # List of offline nodes
945     n_drained = [] # List of nodes being drained
946     node_volume = {}
947     node_instance = {}
948     node_info = {}
949     instance_cfg = {}
950
951     # FIXME: verify OS list
952     # do local checksums
953     master_files = [constants.CLUSTER_CONF_FILE]
954
955     file_names = ssconf.SimpleStore().GetFileList()
956     file_names.append(constants.SSL_CERT_FILE)
957     file_names.append(constants.RAPI_CERT_FILE)
958     file_names.extend(master_files)
959
960     local_checksums = utils.FingerprintFiles(file_names)
961
962     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
963     node_verify_param = {
964       constants.NV_FILELIST: file_names,
965       constants.NV_NODELIST: [node.name for node in nodeinfo
966                               if not node.offline],
967       constants.NV_HYPERVISOR: hypervisors,
968       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
969                                   node.secondary_ip) for node in nodeinfo
970                                  if not node.offline],
971       constants.NV_INSTANCELIST: hypervisors,
972       constants.NV_VERSION: None,
973       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
974       }
975     if vg_name is not None:
976       node_verify_param[constants.NV_VGLIST] = None
977       node_verify_param[constants.NV_LVLIST] = vg_name
978       node_verify_param[constants.NV_DRBDLIST] = None
979     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
980                                            self.cfg.GetClusterName())
981
982     cluster = self.cfg.GetClusterInfo()
983     master_node = self.cfg.GetMasterNode()
984     all_drbd_map = self.cfg.ComputeDRBDMap()
985
986     for node_i in nodeinfo:
987       node = node_i.name
988       nresult = all_nvinfo[node].data
989
990       if node_i.offline:
991         feedback_fn("* Skipping offline node %s" % (node,))
992         n_offline.append(node)
993         continue
994
995       if node == master_node:
996         ntype = "master"
997       elif node_i.master_candidate:
998         ntype = "master candidate"
999       elif node_i.drained:
1000         ntype = "drained"
1001         n_drained.append(node)
1002       else:
1003         ntype = "regular"
1004       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1005
1006       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1007         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1008         bad = True
1009         continue
1010
1011       node_drbd = {}
1012       for minor, instance in all_drbd_map[node].items():
1013         if instance not in instanceinfo:
1014           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1015                       instance)
1016           # ghost instance should not be running, but otherwise we
1017           # don't give double warnings (both ghost instance and
1018           # unallocated minor in use)
1019           node_drbd[minor] = (instance, False)
1020         else:
1021           instance = instanceinfo[instance]
1022           node_drbd[minor] = (instance.name, instance.admin_up)
1023       result = self._VerifyNode(node_i, file_names, local_checksums,
1024                                 nresult, feedback_fn, master_files,
1025                                 node_drbd, vg_name)
1026       bad = bad or result
1027
1028       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1029       if vg_name is None:
1030         node_volume[node] = {}
1031       elif isinstance(lvdata, basestring):
1032         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1033                     (node, utils.SafeEncode(lvdata)))
1034         bad = True
1035         node_volume[node] = {}
1036       elif not isinstance(lvdata, dict):
1037         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1038         bad = True
1039         continue
1040       else:
1041         node_volume[node] = lvdata
1042
1043       # node_instance
1044       idata = nresult.get(constants.NV_INSTANCELIST, None)
1045       if not isinstance(idata, list):
1046         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1047                     (node,))
1048         bad = True
1049         continue
1050
1051       node_instance[node] = idata
1052
1053       # node_info
1054       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1055       if not isinstance(nodeinfo, dict):
1056         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1057         bad = True
1058         continue
1059
1060       try:
1061         node_info[node] = {
1062           "mfree": int(nodeinfo['memory_free']),
1063           "pinst": [],
1064           "sinst": [],
1065           # dictionary holding all instances this node is secondary for,
1066           # grouped by their primary node. Each key is a cluster node, and each
1067           # value is a list of instances which have the key as primary and the
1068           # current node as secondary.  this is handy to calculate N+1 memory
1069           # availability if you can only failover from a primary to its
1070           # secondary.
1071           "sinst-by-pnode": {},
1072         }
1073         # FIXME: devise a free space model for file based instances as well
1074         if vg_name is not None:
1075           if (constants.NV_VGLIST not in nresult or
1076               vg_name not in nresult[constants.NV_VGLIST]):
1077             feedback_fn("  - ERROR: node %s didn't return data for the"
1078                         " volume group '%s' - it is either missing or broken" %
1079                         (node, vg_name))
1080             bad = True
1081             continue
1082           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1083       except (ValueError, KeyError):
1084         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1085                     " from node %s" % (node,))
1086         bad = True
1087         continue
1088
1089     node_vol_should = {}
1090
1091     for instance in instancelist:
1092       feedback_fn("* Verifying instance %s" % instance)
1093       inst_config = instanceinfo[instance]
1094       result =  self._VerifyInstance(instance, inst_config, node_volume,
1095                                      node_instance, feedback_fn, n_offline)
1096       bad = bad or result
1097       inst_nodes_offline = []
1098
1099       inst_config.MapLVsByNode(node_vol_should)
1100
1101       instance_cfg[instance] = inst_config
1102
1103       pnode = inst_config.primary_node
1104       if pnode in node_info:
1105         node_info[pnode]['pinst'].append(instance)
1106       elif pnode not in n_offline:
1107         feedback_fn("  - ERROR: instance %s, connection to primary node"
1108                     " %s failed" % (instance, pnode))
1109         bad = True
1110
1111       if pnode in n_offline:
1112         inst_nodes_offline.append(pnode)
1113
1114       # If the instance is non-redundant we cannot survive losing its primary
1115       # node, so we are not N+1 compliant. On the other hand we have no disk
1116       # templates with more than one secondary so that situation is not well
1117       # supported either.
1118       # FIXME: does not support file-backed instances
1119       if len(inst_config.secondary_nodes) == 0:
1120         i_non_redundant.append(instance)
1121       elif len(inst_config.secondary_nodes) > 1:
1122         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1123                     % instance)
1124
1125       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1126         i_non_a_balanced.append(instance)
1127
1128       for snode in inst_config.secondary_nodes:
1129         if snode in node_info:
1130           node_info[snode]['sinst'].append(instance)
1131           if pnode not in node_info[snode]['sinst-by-pnode']:
1132             node_info[snode]['sinst-by-pnode'][pnode] = []
1133           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1134         elif snode not in n_offline:
1135           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1136                       " %s failed" % (instance, snode))
1137           bad = True
1138         if snode in n_offline:
1139           inst_nodes_offline.append(snode)
1140
1141       if inst_nodes_offline:
1142         # warn that the instance lives on offline nodes, and set bad=True
1143         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1144                     ", ".join(inst_nodes_offline))
1145         bad = True
1146
1147     feedback_fn("* Verifying orphan volumes")
1148     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1149                                        feedback_fn)
1150     bad = bad or result
1151
1152     feedback_fn("* Verifying remaining instances")
1153     result = self._VerifyOrphanInstances(instancelist, node_instance,
1154                                          feedback_fn)
1155     bad = bad or result
1156
1157     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1158       feedback_fn("* Verifying N+1 Memory redundancy")
1159       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1160       bad = bad or result
1161
1162     feedback_fn("* Other Notes")
1163     if i_non_redundant:
1164       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1165                   % len(i_non_redundant))
1166
1167     if i_non_a_balanced:
1168       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1169                   % len(i_non_a_balanced))
1170
1171     if n_offline:
1172       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1173
1174     if n_drained:
1175       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1176
1177     return not bad
1178
1179   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1180     """Analize the post-hooks' result
1181
1182     This method analyses the hook result, handles it, and sends some
1183     nicely-formatted feedback back to the user.
1184
1185     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1186         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1187     @param hooks_results: the results of the multi-node hooks rpc call
1188     @param feedback_fn: function used send feedback back to the caller
1189     @param lu_result: previous Exec result
1190     @return: the new Exec result, based on the previous result
1191         and hook results
1192
1193     """
1194     # We only really run POST phase hooks, and are only interested in
1195     # their results
1196     if phase == constants.HOOKS_PHASE_POST:
1197       # Used to change hooks' output to proper indentation
1198       indent_re = re.compile('^', re.M)
1199       feedback_fn("* Hooks Results")
1200       if not hooks_results:
1201         feedback_fn("  - ERROR: general communication failure")
1202         lu_result = 1
1203       else:
1204         for node_name in hooks_results:
1205           show_node_header = True
1206           res = hooks_results[node_name]
1207           if res.failed or res.data is False or not isinstance(res.data, list):
1208             if res.offline:
1209               # no need to warn or set fail return value
1210               continue
1211             feedback_fn("    Communication failure in hooks execution")
1212             lu_result = 1
1213             continue
1214           for script, hkr, output in res.data:
1215             if hkr == constants.HKR_FAIL:
1216               # The node header is only shown once, if there are
1217               # failing hooks on that node
1218               if show_node_header:
1219                 feedback_fn("  Node %s:" % node_name)
1220                 show_node_header = False
1221               feedback_fn("    ERROR: Script %s failed, output:" % script)
1222               output = indent_re.sub('      ', output)
1223               feedback_fn("%s" % output)
1224               lu_result = 1
1225
1226       return lu_result
1227
1228
1229 class LUVerifyDisks(NoHooksLU):
1230   """Verifies the cluster disks status.
1231
1232   """
1233   _OP_REQP = []
1234   REQ_BGL = False
1235
1236   def ExpandNames(self):
1237     self.needed_locks = {
1238       locking.LEVEL_NODE: locking.ALL_SET,
1239       locking.LEVEL_INSTANCE: locking.ALL_SET,
1240     }
1241     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1242
1243   def CheckPrereq(self):
1244     """Check prerequisites.
1245
1246     This has no prerequisites.
1247
1248     """
1249     pass
1250
1251   def Exec(self, feedback_fn):
1252     """Verify integrity of cluster disks.
1253
1254     """
1255     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1256
1257     vg_name = self.cfg.GetVGName()
1258     nodes = utils.NiceSort(self.cfg.GetNodeList())
1259     instances = [self.cfg.GetInstanceInfo(name)
1260                  for name in self.cfg.GetInstanceList()]
1261
1262     nv_dict = {}
1263     for inst in instances:
1264       inst_lvs = {}
1265       if (not inst.admin_up or
1266           inst.disk_template not in constants.DTS_NET_MIRROR):
1267         continue
1268       inst.MapLVsByNode(inst_lvs)
1269       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1270       for node, vol_list in inst_lvs.iteritems():
1271         for vol in vol_list:
1272           nv_dict[(node, vol)] = inst
1273
1274     if not nv_dict:
1275       return result
1276
1277     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1278
1279     to_act = set()
1280     for node in nodes:
1281       # node_volume
1282       lvs = node_lvs[node]
1283       if lvs.failed:
1284         if not lvs.offline:
1285           self.LogWarning("Connection to node %s failed: %s" %
1286                           (node, lvs.data))
1287         continue
1288       lvs = lvs.data
1289       if isinstance(lvs, basestring):
1290         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1291         res_nlvm[node] = lvs
1292         continue
1293       elif not isinstance(lvs, dict):
1294         logging.warning("Connection to node %s failed or invalid data"
1295                         " returned", node)
1296         res_nodes.append(node)
1297         continue
1298
1299       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1300         inst = nv_dict.pop((node, lv_name), None)
1301         if (not lv_online and inst is not None
1302             and inst.name not in res_instances):
1303           res_instances.append(inst.name)
1304
1305     # any leftover items in nv_dict are missing LVs, let's arrange the
1306     # data better
1307     for key, inst in nv_dict.iteritems():
1308       if inst.name not in res_missing:
1309         res_missing[inst.name] = []
1310       res_missing[inst.name].append(key)
1311
1312     return result
1313
1314
1315 class LURenameCluster(LogicalUnit):
1316   """Rename the cluster.
1317
1318   """
1319   HPATH = "cluster-rename"
1320   HTYPE = constants.HTYPE_CLUSTER
1321   _OP_REQP = ["name"]
1322
1323   def BuildHooksEnv(self):
1324     """Build hooks env.
1325
1326     """
1327     env = {
1328       "OP_TARGET": self.cfg.GetClusterName(),
1329       "NEW_NAME": self.op.name,
1330       }
1331     mn = self.cfg.GetMasterNode()
1332     return env, [mn], [mn]
1333
1334   def CheckPrereq(self):
1335     """Verify that the passed name is a valid one.
1336
1337     """
1338     hostname = utils.HostInfo(self.op.name)
1339
1340     new_name = hostname.name
1341     self.ip = new_ip = hostname.ip
1342     old_name = self.cfg.GetClusterName()
1343     old_ip = self.cfg.GetMasterIP()
1344     if new_name == old_name and new_ip == old_ip:
1345       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1346                                  " cluster has changed")
1347     if new_ip != old_ip:
1348       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1349         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1350                                    " reachable on the network. Aborting." %
1351                                    new_ip)
1352
1353     self.op.name = new_name
1354
1355   def Exec(self, feedback_fn):
1356     """Rename the cluster.
1357
1358     """
1359     clustername = self.op.name
1360     ip = self.ip
1361
1362     # shutdown the master IP
1363     master = self.cfg.GetMasterNode()
1364     result = self.rpc.call_node_stop_master(master, False)
1365     if result.failed or not result.data:
1366       raise errors.OpExecError("Could not disable the master role")
1367
1368     try:
1369       cluster = self.cfg.GetClusterInfo()
1370       cluster.cluster_name = clustername
1371       cluster.master_ip = ip
1372       self.cfg.Update(cluster)
1373
1374       # update the known hosts file
1375       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1376       node_list = self.cfg.GetNodeList()
1377       try:
1378         node_list.remove(master)
1379       except ValueError:
1380         pass
1381       result = self.rpc.call_upload_file(node_list,
1382                                          constants.SSH_KNOWN_HOSTS_FILE)
1383       for to_node, to_result in result.iteritems():
1384          msg = to_result.RemoteFailMsg()
1385          if msg:
1386            msg = ("Copy of file %s to node %s failed: %s" %
1387                    (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1388            self.proc.LogWarning(msg)
1389
1390     finally:
1391       result = self.rpc.call_node_start_master(master, False)
1392       if result.failed or not result.data:
1393         self.LogWarning("Could not re-enable the master role on"
1394                         " the master, please restart manually.")
1395
1396
1397 def _RecursiveCheckIfLVMBased(disk):
1398   """Check if the given disk or its children are lvm-based.
1399
1400   @type disk: L{objects.Disk}
1401   @param disk: the disk to check
1402   @rtype: booleean
1403   @return: boolean indicating whether a LD_LV dev_type was found or not
1404
1405   """
1406   if disk.children:
1407     for chdisk in disk.children:
1408       if _RecursiveCheckIfLVMBased(chdisk):
1409         return True
1410   return disk.dev_type == constants.LD_LV
1411
1412
1413 class LUSetClusterParams(LogicalUnit):
1414   """Change the parameters of the cluster.
1415
1416   """
1417   HPATH = "cluster-modify"
1418   HTYPE = constants.HTYPE_CLUSTER
1419   _OP_REQP = []
1420   REQ_BGL = False
1421
1422   def CheckArguments(self):
1423     """Check parameters
1424
1425     """
1426     if not hasattr(self.op, "candidate_pool_size"):
1427       self.op.candidate_pool_size = None
1428     if self.op.candidate_pool_size is not None:
1429       try:
1430         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1431       except (ValueError, TypeError), err:
1432         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1433                                    str(err))
1434       if self.op.candidate_pool_size < 1:
1435         raise errors.OpPrereqError("At least one master candidate needed")
1436
1437   def ExpandNames(self):
1438     # FIXME: in the future maybe other cluster params won't require checking on
1439     # all nodes to be modified.
1440     self.needed_locks = {
1441       locking.LEVEL_NODE: locking.ALL_SET,
1442     }
1443     self.share_locks[locking.LEVEL_NODE] = 1
1444
1445   def BuildHooksEnv(self):
1446     """Build hooks env.
1447
1448     """
1449     env = {
1450       "OP_TARGET": self.cfg.GetClusterName(),
1451       "NEW_VG_NAME": self.op.vg_name,
1452       }
1453     mn = self.cfg.GetMasterNode()
1454     return env, [mn], [mn]
1455
1456   def CheckPrereq(self):
1457     """Check prerequisites.
1458
1459     This checks whether the given params don't conflict and
1460     if the given volume group is valid.
1461
1462     """
1463     if self.op.vg_name is not None and not self.op.vg_name:
1464       instances = self.cfg.GetAllInstancesInfo().values()
1465       for inst in instances:
1466         for disk in inst.disks:
1467           if _RecursiveCheckIfLVMBased(disk):
1468             raise errors.OpPrereqError("Cannot disable lvm storage while"
1469                                        " lvm-based instances exist")
1470
1471     node_list = self.acquired_locks[locking.LEVEL_NODE]
1472
1473     # if vg_name not None, checks given volume group on all nodes
1474     if self.op.vg_name:
1475       vglist = self.rpc.call_vg_list(node_list)
1476       for node in node_list:
1477         if vglist[node].failed:
1478           # ignoring down node
1479           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1480           continue
1481         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1482                                               self.op.vg_name,
1483                                               constants.MIN_VG_SIZE)
1484         if vgstatus:
1485           raise errors.OpPrereqError("Error on node '%s': %s" %
1486                                      (node, vgstatus))
1487
1488     self.cluster = cluster = self.cfg.GetClusterInfo()
1489     # validate params changes
1490     if self.op.beparams:
1491       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1492       self.new_beparams = objects.FillDict(
1493         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1494
1495     if self.op.nicparams:
1496       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1497       self.new_nicparams = objects.FillDict(
1498         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1499       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1500
1501     # hypervisor list/parameters
1502     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1503     if self.op.hvparams:
1504       if not isinstance(self.op.hvparams, dict):
1505         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1506       for hv_name, hv_dict in self.op.hvparams.items():
1507         if hv_name not in self.new_hvparams:
1508           self.new_hvparams[hv_name] = hv_dict
1509         else:
1510           self.new_hvparams[hv_name].update(hv_dict)
1511
1512     if self.op.enabled_hypervisors is not None:
1513       self.hv_list = self.op.enabled_hypervisors
1514     else:
1515       self.hv_list = cluster.enabled_hypervisors
1516
1517     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1518       # either the enabled list has changed, or the parameters have, validate
1519       for hv_name, hv_params in self.new_hvparams.items():
1520         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1521             (self.op.enabled_hypervisors and
1522              hv_name in self.op.enabled_hypervisors)):
1523           # either this is a new hypervisor, or its parameters have changed
1524           hv_class = hypervisor.GetHypervisor(hv_name)
1525           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1526           hv_class.CheckParameterSyntax(hv_params)
1527           _CheckHVParams(self, node_list, hv_name, hv_params)
1528
1529   def Exec(self, feedback_fn):
1530     """Change the parameters of the cluster.
1531
1532     """
1533     if self.op.vg_name is not None:
1534       new_volume = self.op.vg_name
1535       if not new_volume:
1536         new_volume = None
1537       if new_volume != self.cfg.GetVGName():
1538         self.cfg.SetVGName(new_volume)
1539       else:
1540         feedback_fn("Cluster LVM configuration already in desired"
1541                     " state, not changing")
1542     if self.op.hvparams:
1543       self.cluster.hvparams = self.new_hvparams
1544     if self.op.enabled_hypervisors is not None:
1545       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1546     if self.op.beparams:
1547       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1548     if self.op.nicparams:
1549       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1550
1551     if self.op.candidate_pool_size is not None:
1552       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1553
1554     self.cfg.Update(self.cluster)
1555
1556     # we want to update nodes after the cluster so that if any errors
1557     # happen, we have recorded and saved the cluster info
1558     if self.op.candidate_pool_size is not None:
1559       _AdjustCandidatePool(self)
1560
1561
1562 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1563   """Distribute additional files which are part of the cluster configuration.
1564
1565   ConfigWriter takes care of distributing the config and ssconf files, but
1566   there are more files which should be distributed to all nodes. This function
1567   makes sure those are copied.
1568
1569   @param lu: calling logical unit
1570   @param additional_nodes: list of nodes not in the config to distribute to
1571
1572   """
1573   # 1. Gather target nodes
1574   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1575   dist_nodes = lu.cfg.GetNodeList()
1576   if additional_nodes is not None:
1577     dist_nodes.extend(additional_nodes)
1578   if myself.name in dist_nodes:
1579     dist_nodes.remove(myself.name)
1580   # 2. Gather files to distribute
1581   dist_files = set([constants.ETC_HOSTS,
1582                     constants.SSH_KNOWN_HOSTS_FILE,
1583                     constants.RAPI_CERT_FILE,
1584                     constants.RAPI_USERS_FILE,
1585                    ])
1586
1587   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1588   for hv_name in enabled_hypervisors:
1589     hv_class = hypervisor.GetHypervisor(hv_name)
1590     dist_files.update(hv_class.GetAncillaryFiles())
1591
1592   # 3. Perform the files upload
1593   for fname in dist_files:
1594     if os.path.exists(fname):
1595       result = lu.rpc.call_upload_file(dist_nodes, fname)
1596       for to_node, to_result in result.items():
1597          msg = to_result.RemoteFailMsg()
1598          if msg:
1599            msg = ("Copy of file %s to node %s failed: %s" %
1600                    (fname, to_node, msg))
1601            lu.proc.LogWarning(msg)
1602
1603
1604 class LURedistributeConfig(NoHooksLU):
1605   """Force the redistribution of cluster configuration.
1606
1607   This is a very simple LU.
1608
1609   """
1610   _OP_REQP = []
1611   REQ_BGL = False
1612
1613   def ExpandNames(self):
1614     self.needed_locks = {
1615       locking.LEVEL_NODE: locking.ALL_SET,
1616     }
1617     self.share_locks[locking.LEVEL_NODE] = 1
1618
1619   def CheckPrereq(self):
1620     """Check prerequisites.
1621
1622     """
1623
1624   def Exec(self, feedback_fn):
1625     """Redistribute the configuration.
1626
1627     """
1628     self.cfg.Update(self.cfg.GetClusterInfo())
1629     _RedistributeAncillaryFiles(self)
1630
1631
1632 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1633   """Sleep and poll for an instance's disk to sync.
1634
1635   """
1636   if not instance.disks:
1637     return True
1638
1639   if not oneshot:
1640     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1641
1642   node = instance.primary_node
1643
1644   for dev in instance.disks:
1645     lu.cfg.SetDiskID(dev, node)
1646
1647   retries = 0
1648   while True:
1649     max_time = 0
1650     done = True
1651     cumul_degraded = False
1652     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1653     if rstats.failed or not rstats.data:
1654       lu.LogWarning("Can't get any data from node %s", node)
1655       retries += 1
1656       if retries >= 10:
1657         raise errors.RemoteError("Can't contact node %s for mirror data,"
1658                                  " aborting." % node)
1659       time.sleep(6)
1660       continue
1661     rstats = rstats.data
1662     retries = 0
1663     for i, mstat in enumerate(rstats):
1664       if mstat is None:
1665         lu.LogWarning("Can't compute data for node %s/%s",
1666                            node, instance.disks[i].iv_name)
1667         continue
1668       # we ignore the ldisk parameter
1669       perc_done, est_time, is_degraded, _ = mstat
1670       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1671       if perc_done is not None:
1672         done = False
1673         if est_time is not None:
1674           rem_time = "%d estimated seconds remaining" % est_time
1675           max_time = est_time
1676         else:
1677           rem_time = "no time estimate"
1678         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1679                         (instance.disks[i].iv_name, perc_done, rem_time))
1680     if done or oneshot:
1681       break
1682
1683     time.sleep(min(60, max_time))
1684
1685   if done:
1686     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1687   return not cumul_degraded
1688
1689
1690 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1691   """Check that mirrors are not degraded.
1692
1693   The ldisk parameter, if True, will change the test from the
1694   is_degraded attribute (which represents overall non-ok status for
1695   the device(s)) to the ldisk (representing the local storage status).
1696
1697   """
1698   lu.cfg.SetDiskID(dev, node)
1699   if ldisk:
1700     idx = 6
1701   else:
1702     idx = 5
1703
1704   result = True
1705   if on_primary or dev.AssembleOnSecondary():
1706     rstats = lu.rpc.call_blockdev_find(node, dev)
1707     msg = rstats.RemoteFailMsg()
1708     if msg:
1709       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1710       result = False
1711     elif not rstats.payload:
1712       lu.LogWarning("Can't find disk on node %s", node)
1713       result = False
1714     else:
1715       result = result and (not rstats.payload[idx])
1716   if dev.children:
1717     for child in dev.children:
1718       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1719
1720   return result
1721
1722
1723 class LUDiagnoseOS(NoHooksLU):
1724   """Logical unit for OS diagnose/query.
1725
1726   """
1727   _OP_REQP = ["output_fields", "names"]
1728   REQ_BGL = False
1729   _FIELDS_STATIC = utils.FieldSet()
1730   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1731
1732   def ExpandNames(self):
1733     if self.op.names:
1734       raise errors.OpPrereqError("Selective OS query not supported")
1735
1736     _CheckOutputFields(static=self._FIELDS_STATIC,
1737                        dynamic=self._FIELDS_DYNAMIC,
1738                        selected=self.op.output_fields)
1739
1740     # Lock all nodes, in shared mode
1741     # Temporary removal of locks, should be reverted later
1742     # TODO: reintroduce locks when they are lighter-weight
1743     self.needed_locks = {}
1744     #self.share_locks[locking.LEVEL_NODE] = 1
1745     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1746
1747   def CheckPrereq(self):
1748     """Check prerequisites.
1749
1750     """
1751
1752   @staticmethod
1753   def _DiagnoseByOS(node_list, rlist):
1754     """Remaps a per-node return list into an a per-os per-node dictionary
1755
1756     @param node_list: a list with the names of all nodes
1757     @param rlist: a map with node names as keys and OS objects as values
1758
1759     @rtype: dict
1760     @return: a dictionary with osnames as keys and as value another map, with
1761         nodes as keys and list of OS objects as values, eg::
1762
1763           {"debian-etch": {"node1": [<object>,...],
1764                            "node2": [<object>,]}
1765           }
1766
1767     """
1768     all_os = {}
1769     # we build here the list of nodes that didn't fail the RPC (at RPC
1770     # level), so that nodes with a non-responding node daemon don't
1771     # make all OSes invalid
1772     good_nodes = [node_name for node_name in rlist
1773                   if not rlist[node_name].failed]
1774     for node_name, nr in rlist.iteritems():
1775       if nr.failed or not nr.data:
1776         continue
1777       for os_obj in nr.data:
1778         if os_obj.name not in all_os:
1779           # build a list of nodes for this os containing empty lists
1780           # for each node in node_list
1781           all_os[os_obj.name] = {}
1782           for nname in good_nodes:
1783             all_os[os_obj.name][nname] = []
1784         all_os[os_obj.name][node_name].append(os_obj)
1785     return all_os
1786
1787   def Exec(self, feedback_fn):
1788     """Compute the list of OSes.
1789
1790     """
1791     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1792     node_data = self.rpc.call_os_diagnose(valid_nodes)
1793     if node_data == False:
1794       raise errors.OpExecError("Can't gather the list of OSes")
1795     pol = self._DiagnoseByOS(valid_nodes, node_data)
1796     output = []
1797     for os_name, os_data in pol.iteritems():
1798       row = []
1799       for field in self.op.output_fields:
1800         if field == "name":
1801           val = os_name
1802         elif field == "valid":
1803           val = utils.all([osl and osl[0] for osl in os_data.values()])
1804         elif field == "node_status":
1805           val = {}
1806           for node_name, nos_list in os_data.iteritems():
1807             val[node_name] = [(v.status, v.path) for v in nos_list]
1808         else:
1809           raise errors.ParameterError(field)
1810         row.append(val)
1811       output.append(row)
1812
1813     return output
1814
1815
1816 class LURemoveNode(LogicalUnit):
1817   """Logical unit for removing a node.
1818
1819   """
1820   HPATH = "node-remove"
1821   HTYPE = constants.HTYPE_NODE
1822   _OP_REQP = ["node_name"]
1823
1824   def BuildHooksEnv(self):
1825     """Build hooks env.
1826
1827     This doesn't run on the target node in the pre phase as a failed
1828     node would then be impossible to remove.
1829
1830     """
1831     env = {
1832       "OP_TARGET": self.op.node_name,
1833       "NODE_NAME": self.op.node_name,
1834       }
1835     all_nodes = self.cfg.GetNodeList()
1836     all_nodes.remove(self.op.node_name)
1837     return env, all_nodes, all_nodes
1838
1839   def CheckPrereq(self):
1840     """Check prerequisites.
1841
1842     This checks:
1843      - the node exists in the configuration
1844      - it does not have primary or secondary instances
1845      - it's not the master
1846
1847     Any errors are signalled by raising errors.OpPrereqError.
1848
1849     """
1850     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1851     if node is None:
1852       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1853
1854     instance_list = self.cfg.GetInstanceList()
1855
1856     masternode = self.cfg.GetMasterNode()
1857     if node.name == masternode:
1858       raise errors.OpPrereqError("Node is the master node,"
1859                                  " you need to failover first.")
1860
1861     for instance_name in instance_list:
1862       instance = self.cfg.GetInstanceInfo(instance_name)
1863       if node.name in instance.all_nodes:
1864         raise errors.OpPrereqError("Instance %s is still running on the node,"
1865                                    " please remove first." % instance_name)
1866     self.op.node_name = node.name
1867     self.node = node
1868
1869   def Exec(self, feedback_fn):
1870     """Removes the node from the cluster.
1871
1872     """
1873     node = self.node
1874     logging.info("Stopping the node daemon and removing configs from node %s",
1875                  node.name)
1876
1877     self.context.RemoveNode(node.name)
1878
1879     self.rpc.call_node_leave_cluster(node.name)
1880
1881     # Promote nodes to master candidate as needed
1882     _AdjustCandidatePool(self)
1883
1884
1885 class LUQueryNodes(NoHooksLU):
1886   """Logical unit for querying nodes.
1887
1888   """
1889   _OP_REQP = ["output_fields", "names", "use_locking"]
1890   REQ_BGL = False
1891   _FIELDS_DYNAMIC = utils.FieldSet(
1892     "dtotal", "dfree",
1893     "mtotal", "mnode", "mfree",
1894     "bootid",
1895     "ctotal", "cnodes", "csockets",
1896     )
1897
1898   _FIELDS_STATIC = utils.FieldSet(
1899     "name", "pinst_cnt", "sinst_cnt",
1900     "pinst_list", "sinst_list",
1901     "pip", "sip", "tags",
1902     "serial_no",
1903     "master_candidate",
1904     "master",
1905     "offline",
1906     "drained",
1907     )
1908
1909   def ExpandNames(self):
1910     _CheckOutputFields(static=self._FIELDS_STATIC,
1911                        dynamic=self._FIELDS_DYNAMIC,
1912                        selected=self.op.output_fields)
1913
1914     self.needed_locks = {}
1915     self.share_locks[locking.LEVEL_NODE] = 1
1916
1917     if self.op.names:
1918       self.wanted = _GetWantedNodes(self, self.op.names)
1919     else:
1920       self.wanted = locking.ALL_SET
1921
1922     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1923     self.do_locking = self.do_node_query and self.op.use_locking
1924     if self.do_locking:
1925       # if we don't request only static fields, we need to lock the nodes
1926       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1927
1928
1929   def CheckPrereq(self):
1930     """Check prerequisites.
1931
1932     """
1933     # The validation of the node list is done in the _GetWantedNodes,
1934     # if non empty, and if empty, there's no validation to do
1935     pass
1936
1937   def Exec(self, feedback_fn):
1938     """Computes the list of nodes and their attributes.
1939
1940     """
1941     all_info = self.cfg.GetAllNodesInfo()
1942     if self.do_locking:
1943       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1944     elif self.wanted != locking.ALL_SET:
1945       nodenames = self.wanted
1946       missing = set(nodenames).difference(all_info.keys())
1947       if missing:
1948         raise errors.OpExecError(
1949           "Some nodes were removed before retrieving their data: %s" % missing)
1950     else:
1951       nodenames = all_info.keys()
1952
1953     nodenames = utils.NiceSort(nodenames)
1954     nodelist = [all_info[name] for name in nodenames]
1955
1956     # begin data gathering
1957
1958     if self.do_node_query:
1959       live_data = {}
1960       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1961                                           self.cfg.GetHypervisorType())
1962       for name in nodenames:
1963         nodeinfo = node_data[name]
1964         if not nodeinfo.failed and nodeinfo.data:
1965           nodeinfo = nodeinfo.data
1966           fn = utils.TryConvert
1967           live_data[name] = {
1968             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1969             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1970             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1971             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1972             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1973             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1974             "bootid": nodeinfo.get('bootid', None),
1975             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1976             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1977             }
1978         else:
1979           live_data[name] = {}
1980     else:
1981       live_data = dict.fromkeys(nodenames, {})
1982
1983     node_to_primary = dict([(name, set()) for name in nodenames])
1984     node_to_secondary = dict([(name, set()) for name in nodenames])
1985
1986     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1987                              "sinst_cnt", "sinst_list"))
1988     if inst_fields & frozenset(self.op.output_fields):
1989       instancelist = self.cfg.GetInstanceList()
1990
1991       for instance_name in instancelist:
1992         inst = self.cfg.GetInstanceInfo(instance_name)
1993         if inst.primary_node in node_to_primary:
1994           node_to_primary[inst.primary_node].add(inst.name)
1995         for secnode in inst.secondary_nodes:
1996           if secnode in node_to_secondary:
1997             node_to_secondary[secnode].add(inst.name)
1998
1999     master_node = self.cfg.GetMasterNode()
2000
2001     # end data gathering
2002
2003     output = []
2004     for node in nodelist:
2005       node_output = []
2006       for field in self.op.output_fields:
2007         if field == "name":
2008           val = node.name
2009         elif field == "pinst_list":
2010           val = list(node_to_primary[node.name])
2011         elif field == "sinst_list":
2012           val = list(node_to_secondary[node.name])
2013         elif field == "pinst_cnt":
2014           val = len(node_to_primary[node.name])
2015         elif field == "sinst_cnt":
2016           val = len(node_to_secondary[node.name])
2017         elif field == "pip":
2018           val = node.primary_ip
2019         elif field == "sip":
2020           val = node.secondary_ip
2021         elif field == "tags":
2022           val = list(node.GetTags())
2023         elif field == "serial_no":
2024           val = node.serial_no
2025         elif field == "master_candidate":
2026           val = node.master_candidate
2027         elif field == "master":
2028           val = node.name == master_node
2029         elif field == "offline":
2030           val = node.offline
2031         elif field == "drained":
2032           val = node.drained
2033         elif self._FIELDS_DYNAMIC.Matches(field):
2034           val = live_data[node.name].get(field, None)
2035         else:
2036           raise errors.ParameterError(field)
2037         node_output.append(val)
2038       output.append(node_output)
2039
2040     return output
2041
2042
2043 class LUQueryNodeVolumes(NoHooksLU):
2044   """Logical unit for getting volumes on node(s).
2045
2046   """
2047   _OP_REQP = ["nodes", "output_fields"]
2048   REQ_BGL = False
2049   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2050   _FIELDS_STATIC = utils.FieldSet("node")
2051
2052   def ExpandNames(self):
2053     _CheckOutputFields(static=self._FIELDS_STATIC,
2054                        dynamic=self._FIELDS_DYNAMIC,
2055                        selected=self.op.output_fields)
2056
2057     self.needed_locks = {}
2058     self.share_locks[locking.LEVEL_NODE] = 1
2059     if not self.op.nodes:
2060       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2061     else:
2062       self.needed_locks[locking.LEVEL_NODE] = \
2063         _GetWantedNodes(self, self.op.nodes)
2064
2065   def CheckPrereq(self):
2066     """Check prerequisites.
2067
2068     This checks that the fields required are valid output fields.
2069
2070     """
2071     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2072
2073   def Exec(self, feedback_fn):
2074     """Computes the list of nodes and their attributes.
2075
2076     """
2077     nodenames = self.nodes
2078     volumes = self.rpc.call_node_volumes(nodenames)
2079
2080     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2081              in self.cfg.GetInstanceList()]
2082
2083     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2084
2085     output = []
2086     for node in nodenames:
2087       if node not in volumes or volumes[node].failed or not volumes[node].data:
2088         continue
2089
2090       node_vols = volumes[node].data[:]
2091       node_vols.sort(key=lambda vol: vol['dev'])
2092
2093       for vol in node_vols:
2094         node_output = []
2095         for field in self.op.output_fields:
2096           if field == "node":
2097             val = node
2098           elif field == "phys":
2099             val = vol['dev']
2100           elif field == "vg":
2101             val = vol['vg']
2102           elif field == "name":
2103             val = vol['name']
2104           elif field == "size":
2105             val = int(float(vol['size']))
2106           elif field == "instance":
2107             for inst in ilist:
2108               if node not in lv_by_node[inst]:
2109                 continue
2110               if vol['name'] in lv_by_node[inst][node]:
2111                 val = inst.name
2112                 break
2113             else:
2114               val = '-'
2115           else:
2116             raise errors.ParameterError(field)
2117           node_output.append(str(val))
2118
2119         output.append(node_output)
2120
2121     return output
2122
2123
2124 class LUAddNode(LogicalUnit):
2125   """Logical unit for adding node to the cluster.
2126
2127   """
2128   HPATH = "node-add"
2129   HTYPE = constants.HTYPE_NODE
2130   _OP_REQP = ["node_name"]
2131
2132   def BuildHooksEnv(self):
2133     """Build hooks env.
2134
2135     This will run on all nodes before, and on all nodes + the new node after.
2136
2137     """
2138     env = {
2139       "OP_TARGET": self.op.node_name,
2140       "NODE_NAME": self.op.node_name,
2141       "NODE_PIP": self.op.primary_ip,
2142       "NODE_SIP": self.op.secondary_ip,
2143       }
2144     nodes_0 = self.cfg.GetNodeList()
2145     nodes_1 = nodes_0 + [self.op.node_name, ]
2146     return env, nodes_0, nodes_1
2147
2148   def CheckPrereq(self):
2149     """Check prerequisites.
2150
2151     This checks:
2152      - the new node is not already in the config
2153      - it is resolvable
2154      - its parameters (single/dual homed) matches the cluster
2155
2156     Any errors are signalled by raising errors.OpPrereqError.
2157
2158     """
2159     node_name = self.op.node_name
2160     cfg = self.cfg
2161
2162     dns_data = utils.HostInfo(node_name)
2163
2164     node = dns_data.name
2165     primary_ip = self.op.primary_ip = dns_data.ip
2166     secondary_ip = getattr(self.op, "secondary_ip", None)
2167     if secondary_ip is None:
2168       secondary_ip = primary_ip
2169     if not utils.IsValidIP(secondary_ip):
2170       raise errors.OpPrereqError("Invalid secondary IP given")
2171     self.op.secondary_ip = secondary_ip
2172
2173     node_list = cfg.GetNodeList()
2174     if not self.op.readd and node in node_list:
2175       raise errors.OpPrereqError("Node %s is already in the configuration" %
2176                                  node)
2177     elif self.op.readd and node not in node_list:
2178       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2179
2180     for existing_node_name in node_list:
2181       existing_node = cfg.GetNodeInfo(existing_node_name)
2182
2183       if self.op.readd and node == existing_node_name:
2184         if (existing_node.primary_ip != primary_ip or
2185             existing_node.secondary_ip != secondary_ip):
2186           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2187                                      " address configuration as before")
2188         continue
2189
2190       if (existing_node.primary_ip == primary_ip or
2191           existing_node.secondary_ip == primary_ip or
2192           existing_node.primary_ip == secondary_ip or
2193           existing_node.secondary_ip == secondary_ip):
2194         raise errors.OpPrereqError("New node ip address(es) conflict with"
2195                                    " existing node %s" % existing_node.name)
2196
2197     # check that the type of the node (single versus dual homed) is the
2198     # same as for the master
2199     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2200     master_singlehomed = myself.secondary_ip == myself.primary_ip
2201     newbie_singlehomed = secondary_ip == primary_ip
2202     if master_singlehomed != newbie_singlehomed:
2203       if master_singlehomed:
2204         raise errors.OpPrereqError("The master has no private ip but the"
2205                                    " new node has one")
2206       else:
2207         raise errors.OpPrereqError("The master has a private ip but the"
2208                                    " new node doesn't have one")
2209
2210     # checks reachablity
2211     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2212       raise errors.OpPrereqError("Node not reachable by ping")
2213
2214     if not newbie_singlehomed:
2215       # check reachability from my secondary ip to newbie's secondary ip
2216       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2217                            source=myself.secondary_ip):
2218         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2219                                    " based ping to noded port")
2220
2221     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2222     mc_now, _ = self.cfg.GetMasterCandidateStats()
2223     master_candidate = mc_now < cp_size
2224
2225     self.new_node = objects.Node(name=node,
2226                                  primary_ip=primary_ip,
2227                                  secondary_ip=secondary_ip,
2228                                  master_candidate=master_candidate,
2229                                  offline=False, drained=False)
2230
2231   def Exec(self, feedback_fn):
2232     """Adds the new node to the cluster.
2233
2234     """
2235     new_node = self.new_node
2236     node = new_node.name
2237
2238     # check connectivity
2239     result = self.rpc.call_version([node])[node]
2240     result.Raise()
2241     if result.data:
2242       if constants.PROTOCOL_VERSION == result.data:
2243         logging.info("Communication to node %s fine, sw version %s match",
2244                      node, result.data)
2245       else:
2246         raise errors.OpExecError("Version mismatch master version %s,"
2247                                  " node version %s" %
2248                                  (constants.PROTOCOL_VERSION, result.data))
2249     else:
2250       raise errors.OpExecError("Cannot get version from the new node")
2251
2252     # setup ssh on node
2253     logging.info("Copy ssh key to node %s", node)
2254     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2255     keyarray = []
2256     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2257                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2258                 priv_key, pub_key]
2259
2260     for i in keyfiles:
2261       f = open(i, 'r')
2262       try:
2263         keyarray.append(f.read())
2264       finally:
2265         f.close()
2266
2267     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2268                                     keyarray[2],
2269                                     keyarray[3], keyarray[4], keyarray[5])
2270
2271     msg = result.RemoteFailMsg()
2272     if msg:
2273       raise errors.OpExecError("Cannot transfer ssh keys to the"
2274                                " new node: %s" % msg)
2275
2276     # Add node to our /etc/hosts, and add key to known_hosts
2277     if self.cfg.GetClusterInfo().modify_etc_hosts:
2278       utils.AddHostToEtcHosts(new_node.name)
2279
2280     if new_node.secondary_ip != new_node.primary_ip:
2281       result = self.rpc.call_node_has_ip_address(new_node.name,
2282                                                  new_node.secondary_ip)
2283       if result.failed or not result.data:
2284         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2285                                  " you gave (%s). Please fix and re-run this"
2286                                  " command." % new_node.secondary_ip)
2287
2288     node_verify_list = [self.cfg.GetMasterNode()]
2289     node_verify_param = {
2290       'nodelist': [node],
2291       # TODO: do a node-net-test as well?
2292     }
2293
2294     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2295                                        self.cfg.GetClusterName())
2296     for verifier in node_verify_list:
2297       if result[verifier].failed or not result[verifier].data:
2298         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2299                                  " for remote verification" % verifier)
2300       if result[verifier].data['nodelist']:
2301         for failed in result[verifier].data['nodelist']:
2302           feedback_fn("ssh/hostname verification failed %s -> %s" %
2303                       (verifier, result[verifier].data['nodelist'][failed]))
2304         raise errors.OpExecError("ssh/hostname verification failed.")
2305
2306     if self.op.readd:
2307       _RedistributeAncillaryFiles(self)
2308       self.context.ReaddNode(new_node)
2309     else:
2310       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2311       self.context.AddNode(new_node)
2312
2313
2314 class LUSetNodeParams(LogicalUnit):
2315   """Modifies the parameters of a node.
2316
2317   """
2318   HPATH = "node-modify"
2319   HTYPE = constants.HTYPE_NODE
2320   _OP_REQP = ["node_name"]
2321   REQ_BGL = False
2322
2323   def CheckArguments(self):
2324     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2325     if node_name is None:
2326       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2327     self.op.node_name = node_name
2328     _CheckBooleanOpField(self.op, 'master_candidate')
2329     _CheckBooleanOpField(self.op, 'offline')
2330     _CheckBooleanOpField(self.op, 'drained')
2331     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2332     if all_mods.count(None) == 3:
2333       raise errors.OpPrereqError("Please pass at least one modification")
2334     if all_mods.count(True) > 1:
2335       raise errors.OpPrereqError("Can't set the node into more than one"
2336                                  " state at the same time")
2337
2338   def ExpandNames(self):
2339     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2340
2341   def BuildHooksEnv(self):
2342     """Build hooks env.
2343
2344     This runs on the master node.
2345
2346     """
2347     env = {
2348       "OP_TARGET": self.op.node_name,
2349       "MASTER_CANDIDATE": str(self.op.master_candidate),
2350       "OFFLINE": str(self.op.offline),
2351       "DRAINED": str(self.op.drained),
2352       }
2353     nl = [self.cfg.GetMasterNode(),
2354           self.op.node_name]
2355     return env, nl, nl
2356
2357   def CheckPrereq(self):
2358     """Check prerequisites.
2359
2360     This only checks the instance list against the existing names.
2361
2362     """
2363     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2364
2365     if ((self.op.master_candidate == False or self.op.offline == True or
2366          self.op.drained == True) and node.master_candidate):
2367       # we will demote the node from master_candidate
2368       if self.op.node_name == self.cfg.GetMasterNode():
2369         raise errors.OpPrereqError("The master node has to be a"
2370                                    " master candidate, online and not drained")
2371       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2372       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2373       if num_candidates <= cp_size:
2374         msg = ("Not enough master candidates (desired"
2375                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2376         if self.op.force:
2377           self.LogWarning(msg)
2378         else:
2379           raise errors.OpPrereqError(msg)
2380
2381     if (self.op.master_candidate == True and
2382         ((node.offline and not self.op.offline == False) or
2383          (node.drained and not self.op.drained == False))):
2384       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2385                                  " to master_candidate" % node.name)
2386
2387     return
2388
2389   def Exec(self, feedback_fn):
2390     """Modifies a node.
2391
2392     """
2393     node = self.node
2394
2395     result = []
2396     changed_mc = False
2397
2398     if self.op.offline is not None:
2399       node.offline = self.op.offline
2400       result.append(("offline", str(self.op.offline)))
2401       if self.op.offline == True:
2402         if node.master_candidate:
2403           node.master_candidate = False
2404           changed_mc = True
2405           result.append(("master_candidate", "auto-demotion due to offline"))
2406         if node.drained:
2407           node.drained = False
2408           result.append(("drained", "clear drained status due to offline"))
2409
2410     if self.op.master_candidate is not None:
2411       node.master_candidate = self.op.master_candidate
2412       changed_mc = True
2413       result.append(("master_candidate", str(self.op.master_candidate)))
2414       if self.op.master_candidate == False:
2415         rrc = self.rpc.call_node_demote_from_mc(node.name)
2416         msg = rrc.RemoteFailMsg()
2417         if msg:
2418           self.LogWarning("Node failed to demote itself: %s" % msg)
2419
2420     if self.op.drained is not None:
2421       node.drained = self.op.drained
2422       result.append(("drained", str(self.op.drained)))
2423       if self.op.drained == True:
2424         if node.master_candidate:
2425           node.master_candidate = False
2426           changed_mc = True
2427           result.append(("master_candidate", "auto-demotion due to drain"))
2428         if node.offline:
2429           node.offline = False
2430           result.append(("offline", "clear offline status due to drain"))
2431
2432     # this will trigger configuration file update, if needed
2433     self.cfg.Update(node)
2434     # this will trigger job queue propagation or cleanup
2435     if changed_mc:
2436       self.context.ReaddNode(node)
2437
2438     return result
2439
2440
2441 class LUPowercycleNode(NoHooksLU):
2442   """Powercycles a node.
2443
2444   """
2445   _OP_REQP = ["node_name", "force"]
2446   REQ_BGL = False
2447
2448   def CheckArguments(self):
2449     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2450     if node_name is None:
2451       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2452     self.op.node_name = node_name
2453     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2454       raise errors.OpPrereqError("The node is the master and the force"
2455                                  " parameter was not set")
2456
2457   def ExpandNames(self):
2458     """Locking for PowercycleNode.
2459
2460     This is a last-resource option and shouldn't block on other
2461     jobs. Therefore, we grab no locks.
2462
2463     """
2464     self.needed_locks = {}
2465
2466   def CheckPrereq(self):
2467     """Check prerequisites.
2468
2469     This LU has no prereqs.
2470
2471     """
2472     pass
2473
2474   def Exec(self, feedback_fn):
2475     """Reboots a node.
2476
2477     """
2478     result = self.rpc.call_node_powercycle(self.op.node_name,
2479                                            self.cfg.GetHypervisorType())
2480     msg = result.RemoteFailMsg()
2481     if msg:
2482       raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2483     return result.payload
2484
2485
2486 class LUQueryClusterInfo(NoHooksLU):
2487   """Query cluster configuration.
2488
2489   """
2490   _OP_REQP = []
2491   REQ_BGL = False
2492
2493   def ExpandNames(self):
2494     self.needed_locks = {}
2495
2496   def CheckPrereq(self):
2497     """No prerequsites needed for this LU.
2498
2499     """
2500     pass
2501
2502   def Exec(self, feedback_fn):
2503     """Return cluster config.
2504
2505     """
2506     cluster = self.cfg.GetClusterInfo()
2507     result = {
2508       "software_version": constants.RELEASE_VERSION,
2509       "protocol_version": constants.PROTOCOL_VERSION,
2510       "config_version": constants.CONFIG_VERSION,
2511       "os_api_version": constants.OS_API_VERSION,
2512       "export_version": constants.EXPORT_VERSION,
2513       "architecture": (platform.architecture()[0], platform.machine()),
2514       "name": cluster.cluster_name,
2515       "master": cluster.master_node,
2516       "default_hypervisor": cluster.default_hypervisor,
2517       "enabled_hypervisors": cluster.enabled_hypervisors,
2518       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2519                         for hypervisor in cluster.enabled_hypervisors]),
2520       "beparams": cluster.beparams,
2521       "nicparams": cluster.nicparams,
2522       "candidate_pool_size": cluster.candidate_pool_size,
2523       "default_bridge": cluster.default_bridge,
2524       "master_netdev": cluster.master_netdev,
2525       "volume_group_name": cluster.volume_group_name,
2526       "file_storage_dir": cluster.file_storage_dir,
2527       }
2528
2529     return result
2530
2531
2532 class LUQueryConfigValues(NoHooksLU):
2533   """Return configuration values.
2534
2535   """
2536   _OP_REQP = []
2537   REQ_BGL = False
2538   _FIELDS_DYNAMIC = utils.FieldSet()
2539   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2540
2541   def ExpandNames(self):
2542     self.needed_locks = {}
2543
2544     _CheckOutputFields(static=self._FIELDS_STATIC,
2545                        dynamic=self._FIELDS_DYNAMIC,
2546                        selected=self.op.output_fields)
2547
2548   def CheckPrereq(self):
2549     """No prerequisites.
2550
2551     """
2552     pass
2553
2554   def Exec(self, feedback_fn):
2555     """Dump a representation of the cluster config to the standard output.
2556
2557     """
2558     values = []
2559     for field in self.op.output_fields:
2560       if field == "cluster_name":
2561         entry = self.cfg.GetClusterName()
2562       elif field == "master_node":
2563         entry = self.cfg.GetMasterNode()
2564       elif field == "drain_flag":
2565         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2566       else:
2567         raise errors.ParameterError(field)
2568       values.append(entry)
2569     return values
2570
2571
2572 class LUActivateInstanceDisks(NoHooksLU):
2573   """Bring up an instance's disks.
2574
2575   """
2576   _OP_REQP = ["instance_name"]
2577   REQ_BGL = False
2578
2579   def ExpandNames(self):
2580     self._ExpandAndLockInstance()
2581     self.needed_locks[locking.LEVEL_NODE] = []
2582     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2583
2584   def DeclareLocks(self, level):
2585     if level == locking.LEVEL_NODE:
2586       self._LockInstancesNodes()
2587
2588   def CheckPrereq(self):
2589     """Check prerequisites.
2590
2591     This checks that the instance is in the cluster.
2592
2593     """
2594     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2595     assert self.instance is not None, \
2596       "Cannot retrieve locked instance %s" % self.op.instance_name
2597     _CheckNodeOnline(self, self.instance.primary_node)
2598
2599   def Exec(self, feedback_fn):
2600     """Activate the disks.
2601
2602     """
2603     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2604     if not disks_ok:
2605       raise errors.OpExecError("Cannot activate block devices")
2606
2607     return disks_info
2608
2609
2610 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2611   """Prepare the block devices for an instance.
2612
2613   This sets up the block devices on all nodes.
2614
2615   @type lu: L{LogicalUnit}
2616   @param lu: the logical unit on whose behalf we execute
2617   @type instance: L{objects.Instance}
2618   @param instance: the instance for whose disks we assemble
2619   @type ignore_secondaries: boolean
2620   @param ignore_secondaries: if true, errors on secondary nodes
2621       won't result in an error return from the function
2622   @return: False if the operation failed, otherwise a list of
2623       (host, instance_visible_name, node_visible_name)
2624       with the mapping from node devices to instance devices
2625
2626   """
2627   device_info = []
2628   disks_ok = True
2629   iname = instance.name
2630   # With the two passes mechanism we try to reduce the window of
2631   # opportunity for the race condition of switching DRBD to primary
2632   # before handshaking occured, but we do not eliminate it
2633
2634   # The proper fix would be to wait (with some limits) until the
2635   # connection has been made and drbd transitions from WFConnection
2636   # into any other network-connected state (Connected, SyncTarget,
2637   # SyncSource, etc.)
2638
2639   # 1st pass, assemble on all nodes in secondary mode
2640   for inst_disk in instance.disks:
2641     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2642       lu.cfg.SetDiskID(node_disk, node)
2643       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2644       msg = result.RemoteFailMsg()
2645       if msg:
2646         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2647                            " (is_primary=False, pass=1): %s",
2648                            inst_disk.iv_name, node, msg)
2649         if not ignore_secondaries:
2650           disks_ok = False
2651
2652   # FIXME: race condition on drbd migration to primary
2653
2654   # 2nd pass, do only the primary node
2655   for inst_disk in instance.disks:
2656     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2657       if node != instance.primary_node:
2658         continue
2659       lu.cfg.SetDiskID(node_disk, node)
2660       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2661       msg = result.RemoteFailMsg()
2662       if msg:
2663         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2664                            " (is_primary=True, pass=2): %s",
2665                            inst_disk.iv_name, node, msg)
2666         disks_ok = False
2667     device_info.append((instance.primary_node, inst_disk.iv_name,
2668                         result.payload))
2669
2670   # leave the disks configured for the primary node
2671   # this is a workaround that would be fixed better by
2672   # improving the logical/physical id handling
2673   for disk in instance.disks:
2674     lu.cfg.SetDiskID(disk, instance.primary_node)
2675
2676   return disks_ok, device_info
2677
2678
2679 def _StartInstanceDisks(lu, instance, force):
2680   """Start the disks of an instance.
2681
2682   """
2683   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2684                                            ignore_secondaries=force)
2685   if not disks_ok:
2686     _ShutdownInstanceDisks(lu, instance)
2687     if force is not None and not force:
2688       lu.proc.LogWarning("", hint="If the message above refers to a"
2689                          " secondary node,"
2690                          " you can retry the operation using '--force'.")
2691     raise errors.OpExecError("Disk consistency error")
2692
2693
2694 class LUDeactivateInstanceDisks(NoHooksLU):
2695   """Shutdown an instance's disks.
2696
2697   """
2698   _OP_REQP = ["instance_name"]
2699   REQ_BGL = False
2700
2701   def ExpandNames(self):
2702     self._ExpandAndLockInstance()
2703     self.needed_locks[locking.LEVEL_NODE] = []
2704     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2705
2706   def DeclareLocks(self, level):
2707     if level == locking.LEVEL_NODE:
2708       self._LockInstancesNodes()
2709
2710   def CheckPrereq(self):
2711     """Check prerequisites.
2712
2713     This checks that the instance is in the cluster.
2714
2715     """
2716     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2717     assert self.instance is not None, \
2718       "Cannot retrieve locked instance %s" % self.op.instance_name
2719
2720   def Exec(self, feedback_fn):
2721     """Deactivate the disks
2722
2723     """
2724     instance = self.instance
2725     _SafeShutdownInstanceDisks(self, instance)
2726
2727
2728 def _SafeShutdownInstanceDisks(lu, instance):
2729   """Shutdown block devices of an instance.
2730
2731   This function checks if an instance is running, before calling
2732   _ShutdownInstanceDisks.
2733
2734   """
2735   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2736                                       [instance.hypervisor])
2737   ins_l = ins_l[instance.primary_node]
2738   if ins_l.failed or not isinstance(ins_l.data, list):
2739     raise errors.OpExecError("Can't contact node '%s'" %
2740                              instance.primary_node)
2741
2742   if instance.name in ins_l.data:
2743     raise errors.OpExecError("Instance is running, can't shutdown"
2744                              " block devices.")
2745
2746   _ShutdownInstanceDisks(lu, instance)
2747
2748
2749 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2750   """Shutdown block devices of an instance.
2751
2752   This does the shutdown on all nodes of the instance.
2753
2754   If the ignore_primary is false, errors on the primary node are
2755   ignored.
2756
2757   """
2758   all_result = True
2759   for disk in instance.disks:
2760     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2761       lu.cfg.SetDiskID(top_disk, node)
2762       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2763       msg = result.RemoteFailMsg()
2764       if msg:
2765         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2766                       disk.iv_name, node, msg)
2767         if not ignore_primary or node != instance.primary_node:
2768           all_result = False
2769   return all_result
2770
2771
2772 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2773   """Checks if a node has enough free memory.
2774
2775   This function check if a given node has the needed amount of free
2776   memory. In case the node has less memory or we cannot get the
2777   information from the node, this function raise an OpPrereqError
2778   exception.
2779
2780   @type lu: C{LogicalUnit}
2781   @param lu: a logical unit from which we get configuration data
2782   @type node: C{str}
2783   @param node: the node to check
2784   @type reason: C{str}
2785   @param reason: string to use in the error message
2786   @type requested: C{int}
2787   @param requested: the amount of memory in MiB to check for
2788   @type hypervisor_name: C{str}
2789   @param hypervisor_name: the hypervisor to ask for memory stats
2790   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2791       we cannot check the node
2792
2793   """
2794   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2795   nodeinfo[node].Raise()
2796   free_mem = nodeinfo[node].data.get('memory_free')
2797   if not isinstance(free_mem, int):
2798     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2799                              " was '%s'" % (node, free_mem))
2800   if requested > free_mem:
2801     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2802                              " needed %s MiB, available %s MiB" %
2803                              (node, reason, requested, free_mem))
2804
2805
2806 class LUStartupInstance(LogicalUnit):
2807   """Starts an instance.
2808
2809   """
2810   HPATH = "instance-start"
2811   HTYPE = constants.HTYPE_INSTANCE
2812   _OP_REQP = ["instance_name", "force"]
2813   REQ_BGL = False
2814
2815   def ExpandNames(self):
2816     self._ExpandAndLockInstance()
2817
2818   def BuildHooksEnv(self):
2819     """Build hooks env.
2820
2821     This runs on master, primary and secondary nodes of the instance.
2822
2823     """
2824     env = {
2825       "FORCE": self.op.force,
2826       }
2827     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2828     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2829     return env, nl, nl
2830
2831   def CheckPrereq(self):
2832     """Check prerequisites.
2833
2834     This checks that the instance is in the cluster.
2835
2836     """
2837     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2838     assert self.instance is not None, \
2839       "Cannot retrieve locked instance %s" % self.op.instance_name
2840
2841     # extra beparams
2842     self.beparams = getattr(self.op, "beparams", {})
2843     if self.beparams:
2844       if not isinstance(self.beparams, dict):
2845         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2846                                    " dict" % (type(self.beparams), ))
2847       # fill the beparams dict
2848       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2849       self.op.beparams = self.beparams
2850
2851     # extra hvparams
2852     self.hvparams = getattr(self.op, "hvparams", {})
2853     if self.hvparams:
2854       if not isinstance(self.hvparams, dict):
2855         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2856                                    " dict" % (type(self.hvparams), ))
2857
2858       # check hypervisor parameter syntax (locally)
2859       cluster = self.cfg.GetClusterInfo()
2860       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2861       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2862                                     instance.hvparams)
2863       filled_hvp.update(self.hvparams)
2864       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2865       hv_type.CheckParameterSyntax(filled_hvp)
2866       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2867       self.op.hvparams = self.hvparams
2868
2869     _CheckNodeOnline(self, instance.primary_node)
2870
2871     bep = self.cfg.GetClusterInfo().FillBE(instance)
2872     # check bridges existance
2873     _CheckInstanceBridgesExist(self, instance)
2874
2875     remote_info = self.rpc.call_instance_info(instance.primary_node,
2876                                               instance.name,
2877                                               instance.hypervisor)
2878     remote_info.Raise()
2879     if not remote_info.data:
2880       _CheckNodeFreeMemory(self, instance.primary_node,
2881                            "starting instance %s" % instance.name,
2882                            bep[constants.BE_MEMORY], instance.hypervisor)
2883
2884   def Exec(self, feedback_fn):
2885     """Start the instance.
2886
2887     """
2888     instance = self.instance
2889     force = self.op.force
2890
2891     self.cfg.MarkInstanceUp(instance.name)
2892
2893     node_current = instance.primary_node
2894
2895     _StartInstanceDisks(self, instance, force)
2896
2897     result = self.rpc.call_instance_start(node_current, instance,
2898                                           self.hvparams, self.beparams)
2899     msg = result.RemoteFailMsg()
2900     if msg:
2901       _ShutdownInstanceDisks(self, instance)
2902       raise errors.OpExecError("Could not start instance: %s" % msg)
2903
2904
2905 class LURebootInstance(LogicalUnit):
2906   """Reboot an instance.
2907
2908   """
2909   HPATH = "instance-reboot"
2910   HTYPE = constants.HTYPE_INSTANCE
2911   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2912   REQ_BGL = False
2913
2914   def ExpandNames(self):
2915     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2916                                    constants.INSTANCE_REBOOT_HARD,
2917                                    constants.INSTANCE_REBOOT_FULL]:
2918       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2919                                   (constants.INSTANCE_REBOOT_SOFT,
2920                                    constants.INSTANCE_REBOOT_HARD,
2921                                    constants.INSTANCE_REBOOT_FULL))
2922     self._ExpandAndLockInstance()
2923
2924   def BuildHooksEnv(self):
2925     """Build hooks env.
2926
2927     This runs on master, primary and secondary nodes of the instance.
2928
2929     """
2930     env = {
2931       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2932       "REBOOT_TYPE": self.op.reboot_type,
2933       }
2934     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2935     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2936     return env, nl, nl
2937
2938   def CheckPrereq(self):
2939     """Check prerequisites.
2940
2941     This checks that the instance is in the cluster.
2942
2943     """
2944     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2945     assert self.instance is not None, \
2946       "Cannot retrieve locked instance %s" % self.op.instance_name
2947
2948     _CheckNodeOnline(self, instance.primary_node)
2949
2950     # check bridges existance
2951     _CheckInstanceBridgesExist(self, instance)
2952
2953   def Exec(self, feedback_fn):
2954     """Reboot the instance.
2955
2956     """
2957     instance = self.instance
2958     ignore_secondaries = self.op.ignore_secondaries
2959     reboot_type = self.op.reboot_type
2960
2961     node_current = instance.primary_node
2962
2963     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2964                        constants.INSTANCE_REBOOT_HARD]:
2965       for disk in instance.disks:
2966         self.cfg.SetDiskID(disk, node_current)
2967       result = self.rpc.call_instance_reboot(node_current, instance,
2968                                              reboot_type)
2969       msg = result.RemoteFailMsg()
2970       if msg:
2971         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2972     else:
2973       result = self.rpc.call_instance_shutdown(node_current, instance)
2974       msg = result.RemoteFailMsg()
2975       if msg:
2976         raise errors.OpExecError("Could not shutdown instance for"
2977                                  " full reboot: %s" % msg)
2978       _ShutdownInstanceDisks(self, instance)
2979       _StartInstanceDisks(self, instance, ignore_secondaries)
2980       result = self.rpc.call_instance_start(node_current, instance, None, None)
2981       msg = result.RemoteFailMsg()
2982       if msg:
2983         _ShutdownInstanceDisks(self, instance)
2984         raise errors.OpExecError("Could not start instance for"
2985                                  " full reboot: %s" % msg)
2986
2987     self.cfg.MarkInstanceUp(instance.name)
2988
2989
2990 class LUShutdownInstance(LogicalUnit):
2991   """Shutdown an instance.
2992
2993   """
2994   HPATH = "instance-stop"
2995   HTYPE = constants.HTYPE_INSTANCE
2996   _OP_REQP = ["instance_name"]
2997   REQ_BGL = False
2998
2999   def ExpandNames(self):
3000     self._ExpandAndLockInstance()
3001
3002   def BuildHooksEnv(self):
3003     """Build hooks env.
3004
3005     This runs on master, primary and secondary nodes of the instance.
3006
3007     """
3008     env = _BuildInstanceHookEnvByObject(self, self.instance)
3009     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3010     return env, nl, nl
3011
3012   def CheckPrereq(self):
3013     """Check prerequisites.
3014
3015     This checks that the instance is in the cluster.
3016
3017     """
3018     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3019     assert self.instance is not None, \
3020       "Cannot retrieve locked instance %s" % self.op.instance_name
3021     _CheckNodeOnline(self, self.instance.primary_node)
3022
3023   def Exec(self, feedback_fn):
3024     """Shutdown the instance.
3025
3026     """
3027     instance = self.instance
3028     node_current = instance.primary_node
3029     self.cfg.MarkInstanceDown(instance.name)
3030     result = self.rpc.call_instance_shutdown(node_current, instance)
3031     msg = result.RemoteFailMsg()
3032     if msg:
3033       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3034
3035     _ShutdownInstanceDisks(self, instance)
3036
3037
3038 class LUReinstallInstance(LogicalUnit):
3039   """Reinstall an instance.
3040
3041   """
3042   HPATH = "instance-reinstall"
3043   HTYPE = constants.HTYPE_INSTANCE
3044   _OP_REQP = ["instance_name"]
3045   REQ_BGL = False
3046
3047   def ExpandNames(self):
3048     self._ExpandAndLockInstance()
3049
3050   def BuildHooksEnv(self):
3051     """Build hooks env.
3052
3053     This runs on master, primary and secondary nodes of the instance.
3054
3055     """
3056     env = _BuildInstanceHookEnvByObject(self, self.instance)
3057     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3058     return env, nl, nl
3059
3060   def CheckPrereq(self):
3061     """Check prerequisites.
3062
3063     This checks that the instance is in the cluster and is not running.
3064
3065     """
3066     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3067     assert instance is not None, \
3068       "Cannot retrieve locked instance %s" % self.op.instance_name
3069     _CheckNodeOnline(self, instance.primary_node)
3070
3071     if instance.disk_template == constants.DT_DISKLESS:
3072       raise errors.OpPrereqError("Instance '%s' has no disks" %
3073                                  self.op.instance_name)
3074     if instance.admin_up:
3075       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3076                                  self.op.instance_name)
3077     remote_info = self.rpc.call_instance_info(instance.primary_node,
3078                                               instance.name,
3079                                               instance.hypervisor)
3080     remote_info.Raise()
3081     if remote_info.data:
3082       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3083                                  (self.op.instance_name,
3084                                   instance.primary_node))
3085
3086     self.op.os_type = getattr(self.op, "os_type", None)
3087     if self.op.os_type is not None:
3088       # OS verification
3089       pnode = self.cfg.GetNodeInfo(
3090         self.cfg.ExpandNodeName(instance.primary_node))
3091       if pnode is None:
3092         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3093                                    self.op.pnode)
3094       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3095       result.Raise()
3096       if not isinstance(result.data, objects.OS):
3097         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3098                                    " primary node"  % self.op.os_type)
3099
3100     self.instance = instance
3101
3102   def Exec(self, feedback_fn):
3103     """Reinstall the instance.
3104
3105     """
3106     inst = self.instance
3107
3108     if self.op.os_type is not None:
3109       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3110       inst.os = self.op.os_type
3111       self.cfg.Update(inst)
3112
3113     _StartInstanceDisks(self, inst, None)
3114     try:
3115       feedback_fn("Running the instance OS create scripts...")
3116       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3117       msg = result.RemoteFailMsg()
3118       if msg:
3119         raise errors.OpExecError("Could not install OS for instance %s"
3120                                  " on node %s: %s" %
3121                                  (inst.name, inst.primary_node, msg))
3122     finally:
3123       _ShutdownInstanceDisks(self, inst)
3124
3125
3126 class LURenameInstance(LogicalUnit):
3127   """Rename an instance.
3128
3129   """
3130   HPATH = "instance-rename"
3131   HTYPE = constants.HTYPE_INSTANCE
3132   _OP_REQP = ["instance_name", "new_name"]
3133
3134   def BuildHooksEnv(self):
3135     """Build hooks env.
3136
3137     This runs on master, primary and secondary nodes of the instance.
3138
3139     """
3140     env = _BuildInstanceHookEnvByObject(self, self.instance)
3141     env["INSTANCE_NEW_NAME"] = self.op.new_name
3142     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3143     return env, nl, nl
3144
3145   def CheckPrereq(self):
3146     """Check prerequisites.
3147
3148     This checks that the instance is in the cluster and is not running.
3149
3150     """
3151     instance = self.cfg.GetInstanceInfo(
3152       self.cfg.ExpandInstanceName(self.op.instance_name))
3153     if instance is None:
3154       raise errors.OpPrereqError("Instance '%s' not known" %
3155                                  self.op.instance_name)
3156     _CheckNodeOnline(self, instance.primary_node)
3157
3158     if instance.admin_up:
3159       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3160                                  self.op.instance_name)
3161     remote_info = self.rpc.call_instance_info(instance.primary_node,
3162                                               instance.name,
3163                                               instance.hypervisor)
3164     remote_info.Raise()
3165     if remote_info.data:
3166       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3167                                  (self.op.instance_name,
3168                                   instance.primary_node))
3169     self.instance = instance
3170
3171     # new name verification
3172     name_info = utils.HostInfo(self.op.new_name)
3173
3174     self.op.new_name = new_name = name_info.name
3175     instance_list = self.cfg.GetInstanceList()
3176     if new_name in instance_list:
3177       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3178                                  new_name)
3179
3180     if not getattr(self.op, "ignore_ip", False):
3181       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3182         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3183                                    (name_info.ip, new_name))
3184
3185
3186   def Exec(self, feedback_fn):
3187     """Reinstall the instance.
3188
3189     """
3190     inst = self.instance
3191     old_name = inst.name
3192
3193     if inst.disk_template == constants.DT_FILE:
3194       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3195
3196     self.cfg.RenameInstance(inst.name, self.op.new_name)
3197     # Change the instance lock. This is definitely safe while we hold the BGL
3198     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3199     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3200
3201     # re-read the instance from the configuration after rename
3202     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3203
3204     if inst.disk_template == constants.DT_FILE:
3205       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3206       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3207                                                      old_file_storage_dir,
3208                                                      new_file_storage_dir)
3209       result.Raise()
3210       if not result.data:
3211         raise errors.OpExecError("Could not connect to node '%s' to rename"
3212                                  " directory '%s' to '%s' (but the instance"
3213                                  " has been renamed in Ganeti)" % (
3214                                  inst.primary_node, old_file_storage_dir,
3215                                  new_file_storage_dir))
3216
3217       if not result.data[0]:
3218         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3219                                  " (but the instance has been renamed in"
3220                                  " Ganeti)" % (old_file_storage_dir,
3221                                                new_file_storage_dir))
3222
3223     _StartInstanceDisks(self, inst, None)
3224     try:
3225       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3226                                                  old_name)
3227       msg = result.RemoteFailMsg()
3228       if msg:
3229         msg = ("Could not run OS rename script for instance %s on node %s"
3230                " (but the instance has been renamed in Ganeti): %s" %
3231                (inst.name, inst.primary_node, msg))
3232         self.proc.LogWarning(msg)
3233     finally:
3234       _ShutdownInstanceDisks(self, inst)
3235
3236
3237 class LURemoveInstance(LogicalUnit):
3238   """Remove an instance.
3239
3240   """
3241   HPATH = "instance-remove"
3242   HTYPE = constants.HTYPE_INSTANCE
3243   _OP_REQP = ["instance_name", "ignore_failures"]
3244   REQ_BGL = False
3245
3246   def ExpandNames(self):
3247     self._ExpandAndLockInstance()
3248     self.needed_locks[locking.LEVEL_NODE] = []
3249     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3250
3251   def DeclareLocks(self, level):
3252     if level == locking.LEVEL_NODE:
3253       self._LockInstancesNodes()
3254
3255   def BuildHooksEnv(self):
3256     """Build hooks env.
3257
3258     This runs on master, primary and secondary nodes of the instance.
3259
3260     """
3261     env = _BuildInstanceHookEnvByObject(self, self.instance)
3262     nl = [self.cfg.GetMasterNode()]
3263     return env, nl, nl
3264
3265   def CheckPrereq(self):
3266     """Check prerequisites.
3267
3268     This checks that the instance is in the cluster.
3269
3270     """
3271     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3272     assert self.instance is not None, \
3273       "Cannot retrieve locked instance %s" % self.op.instance_name
3274
3275   def Exec(self, feedback_fn):
3276     """Remove the instance.
3277
3278     """
3279     instance = self.instance
3280     logging.info("Shutting down instance %s on node %s",
3281                  instance.name, instance.primary_node)
3282
3283     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3284     msg = result.RemoteFailMsg()
3285     if msg:
3286       if self.op.ignore_failures:
3287         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3288       else:
3289         raise errors.OpExecError("Could not shutdown instance %s on"
3290                                  " node %s: %s" %
3291                                  (instance.name, instance.primary_node, msg))
3292
3293     logging.info("Removing block devices for instance %s", instance.name)
3294
3295     if not _RemoveDisks(self, instance):
3296       if self.op.ignore_failures:
3297         feedback_fn("Warning: can't remove instance's disks")
3298       else:
3299         raise errors.OpExecError("Can't remove instance's disks")
3300
3301     logging.info("Removing instance %s out of cluster config", instance.name)
3302
3303     self.cfg.RemoveInstance(instance.name)
3304     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3305
3306
3307 class LUQueryInstances(NoHooksLU):
3308   """Logical unit for querying instances.
3309
3310   """
3311   _OP_REQP = ["output_fields", "names", "use_locking"]
3312   REQ_BGL = False
3313   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3314                                     "admin_state",
3315                                     "disk_template", "ip", "mac", "bridge",
3316                                     "sda_size", "sdb_size", "vcpus", "tags",
3317                                     "network_port", "beparams",
3318                                     r"(disk)\.(size)/([0-9]+)",
3319                                     r"(disk)\.(sizes)", "disk_usage",
3320                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3321                                     r"(nic)\.(macs|ips|bridges)",
3322                                     r"(disk|nic)\.(count)",
3323                                     "serial_no", "hypervisor", "hvparams",] +
3324                                   ["hv/%s" % name
3325                                    for name in constants.HVS_PARAMETERS] +
3326                                   ["be/%s" % name
3327                                    for name in constants.BES_PARAMETERS])
3328   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3329
3330
3331   def ExpandNames(self):
3332     _CheckOutputFields(static=self._FIELDS_STATIC,
3333                        dynamic=self._FIELDS_DYNAMIC,
3334                        selected=self.op.output_fields)
3335
3336     self.needed_locks = {}
3337     self.share_locks[locking.LEVEL_INSTANCE] = 1
3338     self.share_locks[locking.LEVEL_NODE] = 1
3339
3340     if self.op.names:
3341       self.wanted = _GetWantedInstances(self, self.op.names)
3342     else:
3343       self.wanted = locking.ALL_SET
3344
3345     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3346     self.do_locking = self.do_node_query and self.op.use_locking
3347     if self.do_locking:
3348       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3349       self.needed_locks[locking.LEVEL_NODE] = []
3350       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3351
3352   def DeclareLocks(self, level):
3353     if level == locking.LEVEL_NODE and self.do_locking:
3354       self._LockInstancesNodes()
3355
3356   def CheckPrereq(self):
3357     """Check prerequisites.
3358
3359     """
3360     pass
3361
3362   def Exec(self, feedback_fn):
3363     """Computes the list of nodes and their attributes.
3364
3365     """
3366     all_info = self.cfg.GetAllInstancesInfo()
3367     if self.wanted == locking.ALL_SET:
3368       # caller didn't specify instance names, so ordering is not important
3369       if self.do_locking:
3370         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3371       else:
3372         instance_names = all_info.keys()
3373       instance_names = utils.NiceSort(instance_names)
3374     else:
3375       # caller did specify names, so we must keep the ordering
3376       if self.do_locking:
3377         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3378       else:
3379         tgt_set = all_info.keys()
3380       missing = set(self.wanted).difference(tgt_set)
3381       if missing:
3382         raise errors.OpExecError("Some instances were removed before"
3383                                  " retrieving their data: %s" % missing)
3384       instance_names = self.wanted
3385
3386     instance_list = [all_info[iname] for iname in instance_names]
3387
3388     # begin data gathering
3389
3390     nodes = frozenset([inst.primary_node for inst in instance_list])
3391     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3392
3393     bad_nodes = []
3394     off_nodes = []
3395     if self.do_node_query:
3396       live_data = {}
3397       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3398       for name in nodes:
3399         result = node_data[name]
3400         if result.offline:
3401           # offline nodes will be in both lists
3402           off_nodes.append(name)
3403         if result.failed:
3404           bad_nodes.append(name)
3405         else:
3406           if result.data:
3407             live_data.update(result.data)
3408             # else no instance is alive
3409     else:
3410       live_data = dict([(name, {}) for name in instance_names])
3411
3412     # end data gathering
3413
3414     HVPREFIX = "hv/"
3415     BEPREFIX = "be/"
3416     output = []
3417     for instance in instance_list:
3418       iout = []
3419       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3420       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3421       for field in self.op.output_fields:
3422         st_match = self._FIELDS_STATIC.Matches(field)
3423         if field == "name":
3424           val = instance.name
3425         elif field == "os":
3426           val = instance.os
3427         elif field == "pnode":
3428           val = instance.primary_node
3429         elif field == "snodes":
3430           val = list(instance.secondary_nodes)
3431         elif field == "admin_state":
3432           val = instance.admin_up
3433         elif field == "oper_state":
3434           if instance.primary_node in bad_nodes:
3435             val = None
3436           else:
3437             val = bool(live_data.get(instance.name))
3438         elif field == "status":
3439           if instance.primary_node in off_nodes:
3440             val = "ERROR_nodeoffline"
3441           elif instance.primary_node in bad_nodes:
3442             val = "ERROR_nodedown"
3443           else:
3444             running = bool(live_data.get(instance.name))
3445             if running:
3446               if instance.admin_up:
3447                 val = "running"
3448               else:
3449                 val = "ERROR_up"
3450             else:
3451               if instance.admin_up:
3452                 val = "ERROR_down"
3453               else:
3454                 val = "ADMIN_down"
3455         elif field == "oper_ram":
3456           if instance.primary_node in bad_nodes:
3457             val = None
3458           elif instance.name in live_data:
3459             val = live_data[instance.name].get("memory", "?")
3460           else:
3461             val = "-"
3462         elif field == "disk_template":
3463           val = instance.disk_template
3464         elif field == "ip":
3465           val = instance.nics[0].ip
3466         elif field == "bridge":
3467           val = instance.nics[0].bridge
3468         elif field == "mac":
3469           val = instance.nics[0].mac
3470         elif field == "sda_size" or field == "sdb_size":
3471           idx = ord(field[2]) - ord('a')
3472           try:
3473             val = instance.FindDisk(idx).size
3474           except errors.OpPrereqError:
3475             val = None
3476         elif field == "disk_usage": # total disk usage per node
3477           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3478           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3479         elif field == "tags":
3480           val = list(instance.GetTags())
3481         elif field == "serial_no":
3482           val = instance.serial_no
3483         elif field == "network_port":
3484           val = instance.network_port
3485         elif field == "hypervisor":
3486           val = instance.hypervisor
3487         elif field == "hvparams":
3488           val = i_hv
3489         elif (field.startswith(HVPREFIX) and
3490               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3491           val = i_hv.get(field[len(HVPREFIX):], None)
3492         elif field == "beparams":
3493           val = i_be
3494         elif (field.startswith(BEPREFIX) and
3495               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3496           val = i_be.get(field[len(BEPREFIX):], None)
3497         elif st_match and st_match.groups():
3498           # matches a variable list
3499           st_groups = st_match.groups()
3500           if st_groups and st_groups[0] == "disk":
3501             if st_groups[1] == "count":
3502               val = len(instance.disks)
3503             elif st_groups[1] == "sizes":
3504               val = [disk.size for disk in instance.disks]
3505             elif st_groups[1] == "size":
3506               try:
3507                 val = instance.FindDisk(st_groups[2]).size
3508               except errors.OpPrereqError:
3509                 val = None
3510             else:
3511               assert False, "Unhandled disk parameter"
3512           elif st_groups[0] == "nic":
3513             if st_groups[1] == "count":
3514               val = len(instance.nics)
3515             elif st_groups[1] == "macs":
3516               val = [nic.mac for nic in instance.nics]
3517             elif st_groups[1] == "ips":
3518               val = [nic.ip for nic in instance.nics]
3519             elif st_groups[1] == "bridges":
3520               val = [nic.bridge for nic in instance.nics]
3521             else:
3522               # index-based item
3523               nic_idx = int(st_groups[2])
3524               if nic_idx >= len(instance.nics):
3525                 val = None
3526               else:
3527                 if st_groups[1] == "mac":
3528                   val = instance.nics[nic_idx].mac
3529                 elif st_groups[1] == "ip":
3530                   val = instance.nics[nic_idx].ip
3531                 elif st_groups[1] == "bridge":
3532                   val = instance.nics[nic_idx].bridge
3533                 else:
3534                   assert False, "Unhandled NIC parameter"
3535           else:
3536             assert False, "Unhandled variable parameter"
3537         else:
3538           raise errors.ParameterError(field)
3539         iout.append(val)
3540       output.append(iout)
3541
3542     return output
3543
3544
3545 class LUFailoverInstance(LogicalUnit):
3546   """Failover an instance.
3547
3548   """
3549   HPATH = "instance-failover"
3550   HTYPE = constants.HTYPE_INSTANCE
3551   _OP_REQP = ["instance_name", "ignore_consistency"]
3552   REQ_BGL = False
3553
3554   def ExpandNames(self):
3555     self._ExpandAndLockInstance()
3556     self.needed_locks[locking.LEVEL_NODE] = []
3557     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3558
3559   def DeclareLocks(self, level):
3560     if level == locking.LEVEL_NODE:
3561       self._LockInstancesNodes()
3562
3563   def BuildHooksEnv(self):
3564     """Build hooks env.
3565
3566     This runs on master, primary and secondary nodes of the instance.
3567
3568     """
3569     env = {
3570       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3571       }
3572     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3573     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3574     return env, nl, nl
3575
3576   def CheckPrereq(self):
3577     """Check prerequisites.
3578
3579     This checks that the instance is in the cluster.
3580
3581     """
3582     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3583     assert self.instance is not None, \
3584       "Cannot retrieve locked instance %s" % self.op.instance_name
3585
3586     bep = self.cfg.GetClusterInfo().FillBE(instance)
3587     if instance.disk_template not in constants.DTS_NET_MIRROR:
3588       raise errors.OpPrereqError("Instance's disk layout is not"
3589                                  " network mirrored, cannot failover.")
3590
3591     secondary_nodes = instance.secondary_nodes
3592     if not secondary_nodes:
3593       raise errors.ProgrammerError("no secondary node but using "
3594                                    "a mirrored disk template")
3595
3596     target_node = secondary_nodes[0]
3597     _CheckNodeOnline(self, target_node)
3598     _CheckNodeNotDrained(self, target_node)
3599     # check memory requirements on the secondary node
3600     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3601                          instance.name, bep[constants.BE_MEMORY],
3602                          instance.hypervisor)
3603
3604     # check bridge existance
3605     brlist = [nic.bridge for nic in instance.nics]
3606     result = self.rpc.call_bridges_exist(target_node, brlist)
3607     result.Raise()
3608     if not result.data:
3609       raise errors.OpPrereqError("One or more target bridges %s does not"
3610                                  " exist on destination node '%s'" %
3611                                  (brlist, target_node))
3612
3613   def Exec(self, feedback_fn):
3614     """Failover an instance.
3615
3616     The failover is done by shutting it down on its present node and
3617     starting it on the secondary.
3618
3619     """
3620     instance = self.instance
3621
3622     source_node = instance.primary_node
3623     target_node = instance.secondary_nodes[0]
3624
3625     feedback_fn("* checking disk consistency between source and target")
3626     for dev in instance.disks:
3627       # for drbd, these are drbd over lvm
3628       if not _CheckDiskConsistency(self, dev, target_node, False):
3629         if instance.admin_up and not self.op.ignore_consistency:
3630           raise errors.OpExecError("Disk %s is degraded on target node,"
3631                                    " aborting failover." % dev.iv_name)
3632
3633     feedback_fn("* shutting down instance on source node")
3634     logging.info("Shutting down instance %s on node %s",
3635                  instance.name, source_node)
3636
3637     result = self.rpc.call_instance_shutdown(source_node, instance)
3638     msg = result.RemoteFailMsg()
3639     if msg:
3640       if self.op.ignore_consistency:
3641         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3642                              " Proceeding anyway. Please make sure node"
3643                              " %s is down. Error details: %s",
3644                              instance.name, source_node, source_node, msg)
3645       else:
3646         raise errors.OpExecError("Could not shutdown instance %s on"
3647                                  " node %s: %s" %
3648                                  (instance.name, source_node, msg))
3649
3650     feedback_fn("* deactivating the instance's disks on source node")
3651     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3652       raise errors.OpExecError("Can't shut down the instance's disks.")
3653
3654     instance.primary_node = target_node
3655     # distribute new instance config to the other nodes
3656     self.cfg.Update(instance)
3657
3658     # Only start the instance if it's marked as up
3659     if instance.admin_up:
3660       feedback_fn("* activating the instance's disks on target node")
3661       logging.info("Starting instance %s on node %s",
3662                    instance.name, target_node)
3663
3664       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3665                                                ignore_secondaries=True)
3666       if not disks_ok:
3667         _ShutdownInstanceDisks(self, instance)
3668         raise errors.OpExecError("Can't activate the instance's disks")
3669
3670       feedback_fn("* starting the instance on the target node")
3671       result = self.rpc.call_instance_start(target_node, instance, None, None)
3672       msg = result.RemoteFailMsg()
3673       if msg:
3674         _ShutdownInstanceDisks(self, instance)
3675         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3676                                  (instance.name, target_node, msg))
3677
3678
3679 class LUMigrateInstance(LogicalUnit):
3680   """Migrate an instance.
3681
3682   This is migration without shutting down, compared to the failover,
3683   which is done with shutdown.
3684
3685   """
3686   HPATH = "instance-migrate"
3687   HTYPE = constants.HTYPE_INSTANCE
3688   _OP_REQP = ["instance_name", "live", "cleanup"]
3689
3690   REQ_BGL = False
3691
3692   def ExpandNames(self):
3693     self._ExpandAndLockInstance()
3694     self.needed_locks[locking.LEVEL_NODE] = []
3695     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3696
3697   def DeclareLocks(self, level):
3698     if level == locking.LEVEL_NODE:
3699       self._LockInstancesNodes()
3700
3701   def BuildHooksEnv(self):
3702     """Build hooks env.
3703
3704     This runs on master, primary and secondary nodes of the instance.
3705
3706     """
3707     env = _BuildInstanceHookEnvByObject(self, self.instance)
3708     env["MIGRATE_LIVE"] = self.op.live
3709     env["MIGRATE_CLEANUP"] = self.op.cleanup
3710     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3711     return env, nl, nl
3712
3713   def CheckPrereq(self):
3714     """Check prerequisites.
3715
3716     This checks that the instance is in the cluster.
3717
3718     """
3719     instance = self.cfg.GetInstanceInfo(
3720       self.cfg.ExpandInstanceName(self.op.instance_name))
3721     if instance is None:
3722       raise errors.OpPrereqError("Instance '%s' not known" %
3723                                  self.op.instance_name)
3724
3725     if instance.disk_template != constants.DT_DRBD8:
3726       raise errors.OpPrereqError("Instance's disk layout is not"
3727                                  " drbd8, cannot migrate.")
3728
3729     secondary_nodes = instance.secondary_nodes
3730     if not secondary_nodes:
3731       raise errors.ConfigurationError("No secondary node but using"
3732                                       " drbd8 disk template")
3733
3734     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3735
3736     target_node = secondary_nodes[0]
3737     # check memory requirements on the secondary node
3738     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3739                          instance.name, i_be[constants.BE_MEMORY],
3740                          instance.hypervisor)
3741
3742     # check bridge existance
3743     brlist = [nic.bridge for nic in instance.nics]
3744     result = self.rpc.call_bridges_exist(target_node, brlist)
3745     if result.failed or not result.data:
3746       raise errors.OpPrereqError("One or more target bridges %s does not"
3747                                  " exist on destination node '%s'" %
3748                                  (brlist, target_node))
3749
3750     if not self.op.cleanup:
3751       _CheckNodeNotDrained(self, target_node)
3752       result = self.rpc.call_instance_migratable(instance.primary_node,
3753                                                  instance)
3754       msg = result.RemoteFailMsg()
3755       if msg:
3756         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3757                                    msg)
3758
3759     self.instance = instance
3760
3761   def _WaitUntilSync(self):
3762     """Poll with custom rpc for disk sync.
3763
3764     This uses our own step-based rpc call.
3765
3766     """
3767     self.feedback_fn("* wait until resync is done")
3768     all_done = False
3769     while not all_done:
3770       all_done = True
3771       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3772                                             self.nodes_ip,
3773                                             self.instance.disks)
3774       min_percent = 100
3775       for node, nres in result.items():
3776         msg = nres.RemoteFailMsg()
3777         if msg:
3778           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3779                                    (node, msg))
3780         node_done, node_percent = nres.payload
3781         all_done = all_done and node_done
3782         if node_percent is not None:
3783           min_percent = min(min_percent, node_percent)
3784       if not all_done:
3785         if min_percent < 100:
3786           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3787         time.sleep(2)
3788
3789   def _EnsureSecondary(self, node):
3790     """Demote a node to secondary.
3791
3792     """
3793     self.feedback_fn("* switching node %s to secondary mode" % node)
3794
3795     for dev in self.instance.disks:
3796       self.cfg.SetDiskID(dev, node)
3797
3798     result = self.rpc.call_blockdev_close(node, self.instance.name,
3799                                           self.instance.disks)
3800     msg = result.RemoteFailMsg()
3801     if msg:
3802       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3803                                " error %s" % (node, msg))
3804
3805   def _GoStandalone(self):
3806     """Disconnect from the network.
3807
3808     """
3809     self.feedback_fn("* changing into standalone mode")
3810     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3811                                                self.instance.disks)
3812     for node, nres in result.items():
3813       msg = nres.RemoteFailMsg()
3814       if msg:
3815         raise errors.OpExecError("Cannot disconnect disks node %s,"
3816                                  " error %s" % (node, msg))
3817
3818   def _GoReconnect(self, multimaster):
3819     """Reconnect to the network.
3820
3821     """
3822     if multimaster:
3823       msg = "dual-master"
3824     else:
3825       msg = "single-master"
3826     self.feedback_fn("* changing disks into %s mode" % msg)
3827     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3828                                            self.instance.disks,
3829                                            self.instance.name, multimaster)
3830     for node, nres in result.items():
3831       msg = nres.RemoteFailMsg()
3832       if msg:
3833         raise errors.OpExecError("Cannot change disks config on node %s,"
3834                                  " error: %s" % (node, msg))
3835
3836   def _ExecCleanup(self):
3837     """Try to cleanup after a failed migration.
3838
3839     The cleanup is done by:
3840       - check that the instance is running only on one node
3841         (and update the config if needed)
3842       - change disks on its secondary node to secondary
3843       - wait until disks are fully synchronized
3844       - disconnect from the network
3845       - change disks into single-master mode
3846       - wait again until disks are fully synchronized
3847
3848     """
3849     instance = self.instance
3850     target_node = self.target_node
3851     source_node = self.source_node
3852
3853     # check running on only one node
3854     self.feedback_fn("* checking where the instance actually runs"
3855                      " (if this hangs, the hypervisor might be in"
3856                      " a bad state)")
3857     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3858     for node, result in ins_l.items():
3859       result.Raise()
3860       if not isinstance(result.data, list):
3861         raise errors.OpExecError("Can't contact node '%s'" % node)
3862
3863     runningon_source = instance.name in ins_l[source_node].data
3864     runningon_target = instance.name in ins_l[target_node].data
3865
3866     if runningon_source and runningon_target:
3867       raise errors.OpExecError("Instance seems to be running on two nodes,"
3868                                " or the hypervisor is confused. You will have"
3869                                " to ensure manually that it runs only on one"
3870                                " and restart this operation.")
3871
3872     if not (runningon_source or runningon_target):
3873       raise errors.OpExecError("Instance does not seem to be running at all."
3874                                " In this case, it's safer to repair by"
3875                                " running 'gnt-instance stop' to ensure disk"
3876                                " shutdown, and then restarting it.")
3877
3878     if runningon_target:
3879       # the migration has actually succeeded, we need to update the config
3880       self.feedback_fn("* instance running on secondary node (%s),"
3881                        " updating config" % target_node)
3882       instance.primary_node = target_node
3883       self.cfg.Update(instance)
3884       demoted_node = source_node
3885     else:
3886       self.feedback_fn("* instance confirmed to be running on its"
3887                        " primary node (%s)" % source_node)
3888       demoted_node = target_node
3889
3890     self._EnsureSecondary(demoted_node)
3891     try:
3892       self._WaitUntilSync()
3893     except errors.OpExecError:
3894       # we ignore here errors, since if the device is standalone, it
3895       # won't be able to sync
3896       pass
3897     self._GoStandalone()
3898     self._GoReconnect(False)
3899     self._WaitUntilSync()
3900
3901     self.feedback_fn("* done")
3902
3903   def _RevertDiskStatus(self):
3904     """Try to revert the disk status after a failed migration.
3905
3906     """
3907     target_node = self.target_node
3908     try:
3909       self._EnsureSecondary(target_node)
3910       self._GoStandalone()
3911       self._GoReconnect(False)
3912       self._WaitUntilSync()
3913     except errors.OpExecError, err:
3914       self.LogWarning("Migration failed and I can't reconnect the"
3915                       " drives: error '%s'\n"
3916                       "Please look and recover the instance status" %
3917                       str(err))
3918
3919   def _AbortMigration(self):
3920     """Call the hypervisor code to abort a started migration.
3921
3922     """
3923     instance = self.instance
3924     target_node = self.target_node
3925     migration_info = self.migration_info
3926
3927     abort_result = self.rpc.call_finalize_migration(target_node,
3928                                                     instance,
3929                                                     migration_info,
3930                                                     False)
3931     abort_msg = abort_result.RemoteFailMsg()
3932     if abort_msg:
3933       logging.error("Aborting migration failed on target node %s: %s" %
3934                     (target_node, abort_msg))
3935       # Don't raise an exception here, as we stil have to try to revert the
3936       # disk status, even if this step failed.
3937
3938   def _ExecMigration(self):
3939     """Migrate an instance.
3940
3941     The migrate is done by:
3942       - change the disks into dual-master mode
3943       - wait until disks are fully synchronized again
3944       - migrate the instance
3945       - change disks on the new secondary node (the old primary) to secondary
3946       - wait until disks are fully synchronized
3947       - change disks into single-master mode
3948
3949     """
3950     instance = self.instance
3951     target_node = self.target_node
3952     source_node = self.source_node
3953
3954     self.feedback_fn("* checking disk consistency between source and target")
3955     for dev in instance.disks:
3956       if not _CheckDiskConsistency(self, dev, target_node, False):
3957         raise errors.OpExecError("Disk %s is degraded or not fully"
3958                                  " synchronized on target node,"
3959                                  " aborting migrate." % dev.iv_name)
3960
3961     # First get the migration information from the remote node
3962     result = self.rpc.call_migration_info(source_node, instance)
3963     msg = result.RemoteFailMsg()
3964     if msg:
3965       log_err = ("Failed fetching source migration information from %s: %s" %
3966                  (source_node, msg))
3967       logging.error(log_err)
3968       raise errors.OpExecError(log_err)
3969
3970     self.migration_info = migration_info = result.payload
3971
3972     # Then switch the disks to master/master mode
3973     self._EnsureSecondary(target_node)
3974     self._GoStandalone()
3975     self._GoReconnect(True)
3976     self._WaitUntilSync()
3977
3978     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3979     result = self.rpc.call_accept_instance(target_node,
3980                                            instance,
3981                                            migration_info,
3982                                            self.nodes_ip[target_node])
3983
3984     msg = result.RemoteFailMsg()
3985     if msg:
3986       logging.error("Instance pre-migration failed, trying to revert"
3987                     " disk status: %s", msg)
3988       self._AbortMigration()
3989       self._RevertDiskStatus()
3990       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3991                                (instance.name, msg))
3992
3993     self.feedback_fn("* migrating instance to %s" % target_node)
3994     time.sleep(10)
3995     result = self.rpc.call_instance_migrate(source_node, instance,
3996                                             self.nodes_ip[target_node],
3997                                             self.op.live)
3998     msg = result.RemoteFailMsg()
3999     if msg:
4000       logging.error("Instance migration failed, trying to revert"
4001                     " disk status: %s", msg)
4002       self._AbortMigration()
4003       self._RevertDiskStatus()
4004       raise errors.OpExecError("Could not migrate instance %s: %s" %
4005                                (instance.name, msg))
4006     time.sleep(10)
4007
4008     instance.primary_node = target_node
4009     # distribute new instance config to the other nodes
4010     self.cfg.Update(instance)
4011
4012     result = self.rpc.call_finalize_migration(target_node,
4013                                               instance,
4014                                               migration_info,
4015                                               True)
4016     msg = result.RemoteFailMsg()
4017     if msg:
4018       logging.error("Instance migration succeeded, but finalization failed:"
4019                     " %s" % msg)
4020       raise errors.OpExecError("Could not finalize instance migration: %s" %
4021                                msg)
4022
4023     self._EnsureSecondary(source_node)
4024     self._WaitUntilSync()
4025     self._GoStandalone()
4026     self._GoReconnect(False)
4027     self._WaitUntilSync()
4028
4029     self.feedback_fn("* done")
4030
4031   def Exec(self, feedback_fn):
4032     """Perform the migration.
4033
4034     """
4035     self.feedback_fn = feedback_fn
4036
4037     self.source_node = self.instance.primary_node
4038     self.target_node = self.instance.secondary_nodes[0]
4039     self.all_nodes = [self.source_node, self.target_node]
4040     self.nodes_ip = {
4041       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4042       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4043       }
4044     if self.op.cleanup:
4045       return self._ExecCleanup()
4046     else:
4047       return self._ExecMigration()
4048
4049
4050 def _CreateBlockDev(lu, node, instance, device, force_create,
4051                     info, force_open):
4052   """Create a tree of block devices on a given node.
4053
4054   If this device type has to be created on secondaries, create it and
4055   all its children.
4056
4057   If not, just recurse to children keeping the same 'force' value.
4058
4059   @param lu: the lu on whose behalf we execute
4060   @param node: the node on which to create the device
4061   @type instance: L{objects.Instance}
4062   @param instance: the instance which owns the device
4063   @type device: L{objects.Disk}
4064   @param device: the device to create
4065   @type force_create: boolean
4066   @param force_create: whether to force creation of this device; this
4067       will be change to True whenever we find a device which has
4068       CreateOnSecondary() attribute
4069   @param info: the extra 'metadata' we should attach to the device
4070       (this will be represented as a LVM tag)
4071   @type force_open: boolean
4072   @param force_open: this parameter will be passes to the
4073       L{backend.BlockdevCreate} function where it specifies
4074       whether we run on primary or not, and it affects both
4075       the child assembly and the device own Open() execution
4076
4077   """
4078   if device.CreateOnSecondary():
4079     force_create = True
4080
4081   if device.children:
4082     for child in device.children:
4083       _CreateBlockDev(lu, node, instance, child, force_create,
4084                       info, force_open)
4085
4086   if not force_create:
4087     return
4088
4089   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4090
4091
4092 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4093   """Create a single block device on a given node.
4094
4095   This will not recurse over children of the device, so they must be
4096   created in advance.
4097
4098   @param lu: the lu on whose behalf we execute
4099   @param node: the node on which to create the device
4100   @type instance: L{objects.Instance}
4101   @param instance: the instance which owns the device
4102   @type device: L{objects.Disk}
4103   @param device: the device to create
4104   @param info: the extra 'metadata' we should attach to the device
4105       (this will be represented as a LVM tag)
4106   @type force_open: boolean
4107   @param force_open: this parameter will be passes to the
4108       L{backend.BlockdevCreate} function where it specifies
4109       whether we run on primary or not, and it affects both
4110       the child assembly and the device own Open() execution
4111
4112   """
4113   lu.cfg.SetDiskID(device, node)
4114   result = lu.rpc.call_blockdev_create(node, device, device.size,
4115                                        instance.name, force_open, info)
4116   msg = result.RemoteFailMsg()
4117   if msg:
4118     raise errors.OpExecError("Can't create block device %s on"
4119                              " node %s for instance %s: %s" %
4120                              (device, node, instance.name, msg))
4121   if device.physical_id is None:
4122     device.physical_id = result.payload
4123
4124
4125 def _GenerateUniqueNames(lu, exts):
4126   """Generate a suitable LV name.
4127
4128   This will generate a logical volume name for the given instance.
4129
4130   """
4131   results = []
4132   for val in exts:
4133     new_id = lu.cfg.GenerateUniqueID()
4134     results.append("%s%s" % (new_id, val))
4135   return results
4136
4137
4138 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4139                          p_minor, s_minor):
4140   """Generate a drbd8 device complete with its children.
4141
4142   """
4143   port = lu.cfg.AllocatePort()
4144   vgname = lu.cfg.GetVGName()
4145   shared_secret = lu.cfg.GenerateDRBDSecret()
4146   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4147                           logical_id=(vgname, names[0]))
4148   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4149                           logical_id=(vgname, names[1]))
4150   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4151                           logical_id=(primary, secondary, port,
4152                                       p_minor, s_minor,
4153                                       shared_secret),
4154                           children=[dev_data, dev_meta],
4155                           iv_name=iv_name)
4156   return drbd_dev
4157
4158
4159 def _GenerateDiskTemplate(lu, template_name,
4160                           instance_name, primary_node,
4161                           secondary_nodes, disk_info,
4162                           file_storage_dir, file_driver,
4163                           base_index):
4164   """Generate the entire disk layout for a given template type.
4165
4166   """
4167   #TODO: compute space requirements
4168
4169   vgname = lu.cfg.GetVGName()
4170   disk_count = len(disk_info)
4171   disks = []
4172   if template_name == constants.DT_DISKLESS:
4173     pass
4174   elif template_name == constants.DT_PLAIN:
4175     if len(secondary_nodes) != 0:
4176       raise errors.ProgrammerError("Wrong template configuration")
4177
4178     names = _GenerateUniqueNames(lu, [".disk%d" % i
4179                                       for i in range(disk_count)])
4180     for idx, disk in enumerate(disk_info):
4181       disk_index = idx + base_index
4182       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4183                               logical_id=(vgname, names[idx]),
4184                               iv_name="disk/%d" % disk_index,
4185                               mode=disk["mode"])
4186       disks.append(disk_dev)
4187   elif template_name == constants.DT_DRBD8:
4188     if len(secondary_nodes) != 1:
4189       raise errors.ProgrammerError("Wrong template configuration")
4190     remote_node = secondary_nodes[0]
4191     minors = lu.cfg.AllocateDRBDMinor(
4192       [primary_node, remote_node] * len(disk_info), instance_name)
4193
4194     names = []
4195     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4196                                                for i in range(disk_count)]):
4197       names.append(lv_prefix + "_data")
4198       names.append(lv_prefix + "_meta")
4199     for idx, disk in enumerate(disk_info):
4200       disk_index = idx + base_index
4201       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4202                                       disk["size"], names[idx*2:idx*2+2],
4203                                       "disk/%d" % disk_index,
4204                                       minors[idx*2], minors[idx*2+1])
4205       disk_dev.mode = disk["mode"]
4206       disks.append(disk_dev)
4207   elif template_name == constants.DT_FILE:
4208     if len(secondary_nodes) != 0:
4209       raise errors.ProgrammerError("Wrong template configuration")
4210
4211     for idx, disk in enumerate(disk_info):
4212       disk_index = idx + base_index
4213       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4214                               iv_name="disk/%d" % disk_index,
4215                               logical_id=(file_driver,
4216                                           "%s/disk%d" % (file_storage_dir,
4217                                                          disk_index)),
4218                               mode=disk["mode"])
4219       disks.append(disk_dev)
4220   else:
4221     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4222   return disks
4223
4224
4225 def _GetInstanceInfoText(instance):
4226   """Compute that text that should be added to the disk's metadata.
4227
4228   """
4229   return "originstname+%s" % instance.name
4230
4231
4232 def _CreateDisks(lu, instance):
4233   """Create all disks for an instance.
4234
4235   This abstracts away some work from AddInstance.
4236
4237   @type lu: L{LogicalUnit}
4238   @param lu: the logical unit on whose behalf we execute
4239   @type instance: L{objects.Instance}
4240   @param instance: the instance whose disks we should create
4241   @rtype: boolean
4242   @return: the success of the creation
4243
4244   """
4245   info = _GetInstanceInfoText(instance)
4246   pnode = instance.primary_node
4247
4248   if instance.disk_template == constants.DT_FILE:
4249     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4250     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4251
4252     if result.failed or not result.data:
4253       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4254
4255     if not result.data[0]:
4256       raise errors.OpExecError("Failed to create directory '%s'" %
4257                                file_storage_dir)
4258
4259   # Note: this needs to be kept in sync with adding of disks in
4260   # LUSetInstanceParams
4261   for device in instance.disks:
4262     logging.info("Creating volume %s for instance %s",
4263                  device.iv_name, instance.name)
4264     #HARDCODE
4265     for node in instance.all_nodes:
4266       f_create = node == pnode
4267       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4268
4269
4270 def _RemoveDisks(lu, instance):
4271   """Remove all disks for an instance.
4272
4273   This abstracts away some work from `AddInstance()` and
4274   `RemoveInstance()`. Note that in case some of the devices couldn't
4275   be removed, the removal will continue with the other ones (compare
4276   with `_CreateDisks()`).
4277
4278   @type lu: L{LogicalUnit}
4279   @param lu: the logical unit on whose behalf we execute
4280   @type instance: L{objects.Instance}
4281   @param instance: the instance whose disks we should remove
4282   @rtype: boolean
4283   @return: the success of the removal
4284
4285   """
4286   logging.info("Removing block devices for instance %s", instance.name)
4287
4288   all_result = True
4289   for device in instance.disks:
4290     for node, disk in device.ComputeNodeTree(instance.primary_node):
4291       lu.cfg.SetDiskID(disk, node)
4292       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4293       if msg:
4294         lu.LogWarning("Could not remove block device %s on node %s,"
4295                       " continuing anyway: %s", device.iv_name, node, msg)
4296         all_result = False
4297
4298   if instance.disk_template == constants.DT_FILE:
4299     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4300     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4301                                                  file_storage_dir)
4302     if result.failed or not result.data:
4303       logging.error("Could not remove directory '%s'", file_storage_dir)
4304       all_result = False
4305
4306   return all_result
4307
4308
4309 def _ComputeDiskSize(disk_template, disks):
4310   """Compute disk size requirements in the volume group
4311
4312   """
4313   # Required free disk space as a function of disk and swap space
4314   req_size_dict = {
4315     constants.DT_DISKLESS: None,
4316     constants.DT_PLAIN: sum(d["size"] for d in disks),
4317     # 128 MB are added for drbd metadata for each disk
4318     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4319     constants.DT_FILE: None,
4320   }
4321
4322   if disk_template not in req_size_dict:
4323     raise errors.ProgrammerError("Disk template '%s' size requirement"
4324                                  " is unknown" %  disk_template)
4325
4326   return req_size_dict[disk_template]
4327
4328
4329 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4330   """Hypervisor parameter validation.
4331
4332   This function abstract the hypervisor parameter validation to be
4333   used in both instance create and instance modify.
4334
4335   @type lu: L{LogicalUnit}
4336   @param lu: the logical unit for which we check
4337   @type nodenames: list
4338   @param nodenames: the list of nodes on which we should check
4339   @type hvname: string
4340   @param hvname: the name of the hypervisor we should use
4341   @type hvparams: dict
4342   @param hvparams: the parameters which we need to check
4343   @raise errors.OpPrereqError: if the parameters are not valid
4344
4345   """
4346   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4347                                                   hvname,
4348                                                   hvparams)
4349   for node in nodenames:
4350     info = hvinfo[node]
4351     if info.offline:
4352       continue
4353     msg = info.RemoteFailMsg()
4354     if msg:
4355       raise errors.OpPrereqError("Hypervisor parameter validation"
4356                                  " failed on node %s: %s" % (node, msg))
4357
4358
4359 class LUCreateInstance(LogicalUnit):
4360   """Create an instance.
4361
4362   """
4363   HPATH = "instance-add"
4364   HTYPE = constants.HTYPE_INSTANCE
4365   _OP_REQP = ["instance_name", "disks", "disk_template",
4366               "mode", "start",
4367               "wait_for_sync", "ip_check", "nics",
4368               "hvparams", "beparams"]
4369   REQ_BGL = False
4370
4371   def _ExpandNode(self, node):
4372     """Expands and checks one node name.
4373
4374     """
4375     node_full = self.cfg.ExpandNodeName(node)
4376     if node_full is None:
4377       raise errors.OpPrereqError("Unknown node %s" % node)
4378     return node_full
4379
4380   def ExpandNames(self):
4381     """ExpandNames for CreateInstance.
4382
4383     Figure out the right locks for instance creation.
4384
4385     """
4386     self.needed_locks = {}
4387
4388     # set optional parameters to none if they don't exist
4389     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4390       if not hasattr(self.op, attr):
4391         setattr(self.op, attr, None)
4392
4393     # cheap checks, mostly valid constants given
4394
4395     # verify creation mode
4396     if self.op.mode not in (constants.INSTANCE_CREATE,
4397                             constants.INSTANCE_IMPORT):
4398       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4399                                  self.op.mode)
4400
4401     # disk template and mirror node verification
4402     if self.op.disk_template not in constants.DISK_TEMPLATES:
4403       raise errors.OpPrereqError("Invalid disk template name")
4404
4405     if self.op.hypervisor is None:
4406       self.op.hypervisor = self.cfg.GetHypervisorType()
4407
4408     cluster = self.cfg.GetClusterInfo()
4409     enabled_hvs = cluster.enabled_hypervisors
4410     if self.op.hypervisor not in enabled_hvs:
4411       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4412                                  " cluster (%s)" % (self.op.hypervisor,
4413                                   ",".join(enabled_hvs)))
4414
4415     # check hypervisor parameter syntax (locally)
4416     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4417     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4418                                   self.op.hvparams)
4419     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4420     hv_type.CheckParameterSyntax(filled_hvp)
4421
4422     # fill and remember the beparams dict
4423     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4424     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4425                                     self.op.beparams)
4426
4427     #### instance parameters check
4428
4429     # instance name verification
4430     hostname1 = utils.HostInfo(self.op.instance_name)
4431     self.op.instance_name = instance_name = hostname1.name
4432
4433     # this is just a preventive check, but someone might still add this
4434     # instance in the meantime, and creation will fail at lock-add time
4435     if instance_name in self.cfg.GetInstanceList():
4436       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4437                                  instance_name)
4438
4439     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4440
4441     # NIC buildup
4442     self.nics = []
4443     for idx, nic in enumerate(self.op.nics):
4444       nic_mode_req = nic.get("mode", None)
4445       nic_mode = nic_mode_req
4446       if nic_mode is None:
4447         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4448
4449       # in routed mode, for the first nic, the default ip is 'auto'
4450       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4451         default_ip_mode = constants.VALUE_AUTO
4452       else:
4453         default_ip_mode = constants.VALUE_NONE
4454
4455       # ip validity checks
4456       ip = nic.get("ip", default_ip_mode)
4457       if ip is None or ip.lower() == constants.VALUE_NONE:
4458         nic_ip = None
4459       elif ip.lower() == constants.VALUE_AUTO:
4460         nic_ip = hostname1.ip
4461       else:
4462         if not utils.IsValidIP(ip):
4463           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4464                                      " like a valid IP" % ip)
4465         nic_ip = ip
4466
4467       # TODO: check the ip for uniqueness !!
4468       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4469         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4470
4471       # MAC address verification
4472       mac = nic.get("mac", constants.VALUE_AUTO)
4473       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4474         if not utils.IsValidMac(mac.lower()):
4475           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4476                                      mac)
4477       # bridge verification
4478       bridge = nic.get("bridge", None)
4479       link = nic.get("link", None)
4480       if bridge and link:
4481         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4482       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4483         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4484       elif bridge:
4485         link = bridge
4486
4487       nicparams = {}
4488       if nic_mode_req:
4489         nicparams[constants.NIC_MODE] = nic_mode_req
4490       if link:
4491         nicparams[constants.NIC_LINK] = link
4492
4493       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4494                                       nicparams)
4495       objects.NIC.CheckParameterSyntax(check_params)
4496       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4497
4498     # disk checks/pre-build
4499     self.disks = []
4500     for disk in self.op.disks:
4501       mode = disk.get("mode", constants.DISK_RDWR)
4502       if mode not in constants.DISK_ACCESS_SET:
4503         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4504                                    mode)
4505       size = disk.get("size", None)
4506       if size is None:
4507         raise errors.OpPrereqError("Missing disk size")
4508       try:
4509         size = int(size)
4510       except ValueError:
4511         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4512       self.disks.append({"size": size, "mode": mode})
4513
4514     # used in CheckPrereq for ip ping check
4515     self.check_ip = hostname1.ip
4516
4517     # file storage checks
4518     if (self.op.file_driver and
4519         not self.op.file_driver in constants.FILE_DRIVER):
4520       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4521                                  self.op.file_driver)
4522
4523     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4524       raise errors.OpPrereqError("File storage directory path not absolute")
4525
4526     ### Node/iallocator related checks
4527     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4528       raise errors.OpPrereqError("One and only one of iallocator and primary"
4529                                  " node must be given")
4530
4531     if self.op.iallocator:
4532       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4533     else:
4534       self.op.pnode = self._ExpandNode(self.op.pnode)
4535       nodelist = [self.op.pnode]
4536       if self.op.snode is not None:
4537         self.op.snode = self._ExpandNode(self.op.snode)
4538         nodelist.append(self.op.snode)
4539       self.needed_locks[locking.LEVEL_NODE] = nodelist
4540
4541     # in case of import lock the source node too
4542     if self.op.mode == constants.INSTANCE_IMPORT:
4543       src_node = getattr(self.op, "src_node", None)
4544       src_path = getattr(self.op, "src_path", None)
4545
4546       if src_path is None:
4547         self.op.src_path = src_path = self.op.instance_name
4548
4549       if src_node is None:
4550         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4551         self.op.src_node = None
4552         if os.path.isabs(src_path):
4553           raise errors.OpPrereqError("Importing an instance from an absolute"
4554                                      " path requires a source node option.")
4555       else:
4556         self.op.src_node = src_node = self._ExpandNode(src_node)
4557         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4558           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4559         if not os.path.isabs(src_path):
4560           self.op.src_path = src_path = \
4561             os.path.join(constants.EXPORT_DIR, src_path)
4562
4563     else: # INSTANCE_CREATE
4564       if getattr(self.op, "os_type", None) is None:
4565         raise errors.OpPrereqError("No guest OS specified")
4566
4567   def _RunAllocator(self):
4568     """Run the allocator based on input opcode.
4569
4570     """
4571     nics = [n.ToDict() for n in self.nics]
4572     ial = IAllocator(self,
4573                      mode=constants.IALLOCATOR_MODE_ALLOC,
4574                      name=self.op.instance_name,
4575                      disk_template=self.op.disk_template,
4576                      tags=[],
4577                      os=self.op.os_type,
4578                      vcpus=self.be_full[constants.BE_VCPUS],
4579                      mem_size=self.be_full[constants.BE_MEMORY],
4580                      disks=self.disks,
4581                      nics=nics,
4582                      hypervisor=self.op.hypervisor,
4583                      )
4584
4585     ial.Run(self.op.iallocator)
4586
4587     if not ial.success:
4588       raise errors.OpPrereqError("Can't compute nodes using"
4589                                  " iallocator '%s': %s" % (self.op.iallocator,
4590                                                            ial.info))
4591     if len(ial.nodes) != ial.required_nodes:
4592       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4593                                  " of nodes (%s), required %s" %
4594                                  (self.op.iallocator, len(ial.nodes),
4595                                   ial.required_nodes))
4596     self.op.pnode = ial.nodes[0]
4597     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4598                  self.op.instance_name, self.op.iallocator,
4599                  ", ".join(ial.nodes))
4600     if ial.required_nodes == 2:
4601       self.op.snode = ial.nodes[1]
4602
4603   def BuildHooksEnv(self):
4604     """Build hooks env.
4605
4606     This runs on master, primary and secondary nodes of the instance.
4607
4608     """
4609     env = {
4610       "ADD_MODE": self.op.mode,
4611       }
4612     if self.op.mode == constants.INSTANCE_IMPORT:
4613       env["SRC_NODE"] = self.op.src_node
4614       env["SRC_PATH"] = self.op.src_path
4615       env["SRC_IMAGES"] = self.src_images
4616
4617     env.update(_BuildInstanceHookEnv(
4618       name=self.op.instance_name,
4619       primary_node=self.op.pnode,
4620       secondary_nodes=self.secondaries,
4621       status=self.op.start,
4622       os_type=self.op.os_type,
4623       memory=self.be_full[constants.BE_MEMORY],
4624       vcpus=self.be_full[constants.BE_VCPUS],
4625       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4626       disk_template=self.op.disk_template,
4627       disks=[(d["size"], d["mode"]) for d in self.disks],
4628     ))
4629
4630     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4631           self.secondaries)
4632     return env, nl, nl
4633
4634
4635   def CheckPrereq(self):
4636     """Check prerequisites.
4637
4638     """
4639     if (not self.cfg.GetVGName() and
4640         self.op.disk_template not in constants.DTS_NOT_LVM):
4641       raise errors.OpPrereqError("Cluster does not support lvm-based"
4642                                  " instances")
4643
4644     if self.op.mode == constants.INSTANCE_IMPORT:
4645       src_node = self.op.src_node
4646       src_path = self.op.src_path
4647
4648       if src_node is None:
4649         exp_list = self.rpc.call_export_list(
4650           self.acquired_locks[locking.LEVEL_NODE])
4651         found = False
4652         for node in exp_list:
4653           if not exp_list[node].failed and src_path in exp_list[node].data:
4654             found = True
4655             self.op.src_node = src_node = node
4656             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4657                                                        src_path)
4658             break
4659         if not found:
4660           raise errors.OpPrereqError("No export found for relative path %s" %
4661                                       src_path)
4662
4663       _CheckNodeOnline(self, src_node)
4664       result = self.rpc.call_export_info(src_node, src_path)
4665       result.Raise()
4666       if not result.data:
4667         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4668
4669       export_info = result.data
4670       if not export_info.has_section(constants.INISECT_EXP):
4671         raise errors.ProgrammerError("Corrupted export config")
4672
4673       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4674       if (int(ei_version) != constants.EXPORT_VERSION):
4675         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4676                                    (ei_version, constants.EXPORT_VERSION))
4677
4678       # Check that the new instance doesn't have less disks than the export
4679       instance_disks = len(self.disks)
4680       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4681       if instance_disks < export_disks:
4682         raise errors.OpPrereqError("Not enough disks to import."
4683                                    " (instance: %d, export: %d)" %
4684                                    (instance_disks, export_disks))
4685
4686       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4687       disk_images = []
4688       for idx in range(export_disks):
4689         option = 'disk%d_dump' % idx
4690         if export_info.has_option(constants.INISECT_INS, option):
4691           # FIXME: are the old os-es, disk sizes, etc. useful?
4692           export_name = export_info.get(constants.INISECT_INS, option)
4693           image = os.path.join(src_path, export_name)
4694           disk_images.append(image)
4695         else:
4696           disk_images.append(False)
4697
4698       self.src_images = disk_images
4699
4700       old_name = export_info.get(constants.INISECT_INS, 'name')
4701       # FIXME: int() here could throw a ValueError on broken exports
4702       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4703       if self.op.instance_name == old_name:
4704         for idx, nic in enumerate(self.nics):
4705           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4706             nic_mac_ini = 'nic%d_mac' % idx
4707             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4708
4709     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4710     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4711     if self.op.start and not self.op.ip_check:
4712       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4713                                  " adding an instance in start mode")
4714
4715     if self.op.ip_check:
4716       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4717         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4718                                    (self.check_ip, self.op.instance_name))
4719
4720     #### mac address generation
4721     # By generating here the mac address both the allocator and the hooks get
4722     # the real final mac address rather than the 'auto' or 'generate' value.
4723     # There is a race condition between the generation and the instance object
4724     # creation, which means that we know the mac is valid now, but we're not
4725     # sure it will be when we actually add the instance. If things go bad
4726     # adding the instance will abort because of a duplicate mac, and the
4727     # creation job will fail.
4728     for nic in self.nics:
4729       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4730         nic.mac = self.cfg.GenerateMAC()
4731
4732     #### allocator run
4733
4734     if self.op.iallocator is not None:
4735       self._RunAllocator()
4736
4737     #### node related checks
4738
4739     # check primary node
4740     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4741     assert self.pnode is not None, \
4742       "Cannot retrieve locked node %s" % self.op.pnode
4743     if pnode.offline:
4744       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4745                                  pnode.name)
4746     if pnode.drained:
4747       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4748                                  pnode.name)
4749
4750     self.secondaries = []
4751
4752     # mirror node verification
4753     if self.op.disk_template in constants.DTS_NET_MIRROR:
4754       if self.op.snode is None:
4755         raise errors.OpPrereqError("The networked disk templates need"
4756                                    " a mirror node")
4757       if self.op.snode == pnode.name:
4758         raise errors.OpPrereqError("The secondary node cannot be"
4759                                    " the primary node.")
4760       _CheckNodeOnline(self, self.op.snode)
4761       _CheckNodeNotDrained(self, self.op.snode)
4762       self.secondaries.append(self.op.snode)
4763
4764     nodenames = [pnode.name] + self.secondaries
4765
4766     req_size = _ComputeDiskSize(self.op.disk_template,
4767                                 self.disks)
4768
4769     # Check lv size requirements
4770     if req_size is not None:
4771       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4772                                          self.op.hypervisor)
4773       for node in nodenames:
4774         info = nodeinfo[node]
4775         info.Raise()
4776         info = info.data
4777         if not info:
4778           raise errors.OpPrereqError("Cannot get current information"
4779                                      " from node '%s'" % node)
4780         vg_free = info.get('vg_free', None)
4781         if not isinstance(vg_free, int):
4782           raise errors.OpPrereqError("Can't compute free disk space on"
4783                                      " node %s" % node)
4784         if req_size > info['vg_free']:
4785           raise errors.OpPrereqError("Not enough disk space on target node %s."
4786                                      " %d MB available, %d MB required" %
4787                                      (node, info['vg_free'], req_size))
4788
4789     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4790
4791     # os verification
4792     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4793     result.Raise()
4794     if not isinstance(result.data, objects.OS):
4795       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4796                                  " primary node"  % self.op.os_type)
4797
4798     # bridge check on primary node
4799     bridges = [n.bridge for n in self.nics]
4800     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4801     result.Raise()
4802     if not result.data:
4803       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4804                                  " exist on destination node '%s'" %
4805                                  (",".join(bridges), pnode.name))
4806
4807     # memory check on primary node
4808     if self.op.start:
4809       _CheckNodeFreeMemory(self, self.pnode.name,
4810                            "creating instance %s" % self.op.instance_name,
4811                            self.be_full[constants.BE_MEMORY],
4812                            self.op.hypervisor)
4813
4814   def Exec(self, feedback_fn):
4815     """Create and add the instance to the cluster.
4816
4817     """
4818     instance = self.op.instance_name
4819     pnode_name = self.pnode.name
4820
4821     ht_kind = self.op.hypervisor
4822     if ht_kind in constants.HTS_REQ_PORT:
4823       network_port = self.cfg.AllocatePort()
4824     else:
4825       network_port = None
4826
4827     ##if self.op.vnc_bind_address is None:
4828     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4829
4830     # this is needed because os.path.join does not accept None arguments
4831     if self.op.file_storage_dir is None:
4832       string_file_storage_dir = ""
4833     else:
4834       string_file_storage_dir = self.op.file_storage_dir
4835
4836     # build the full file storage dir path
4837     file_storage_dir = os.path.normpath(os.path.join(
4838                                         self.cfg.GetFileStorageDir(),
4839                                         string_file_storage_dir, instance))
4840
4841
4842     disks = _GenerateDiskTemplate(self,
4843                                   self.op.disk_template,
4844                                   instance, pnode_name,
4845                                   self.secondaries,
4846                                   self.disks,
4847                                   file_storage_dir,
4848                                   self.op.file_driver,
4849                                   0)
4850
4851     iobj = objects.Instance(name=instance, os=self.op.os_type,
4852                             primary_node=pnode_name,
4853                             nics=self.nics, disks=disks,
4854                             disk_template=self.op.disk_template,
4855                             admin_up=False,
4856                             network_port=network_port,
4857                             beparams=self.op.beparams,
4858                             hvparams=self.op.hvparams,
4859                             hypervisor=self.op.hypervisor,
4860                             )
4861
4862     feedback_fn("* creating instance disks...")
4863     try:
4864       _CreateDisks(self, iobj)
4865     except errors.OpExecError:
4866       self.LogWarning("Device creation failed, reverting...")
4867       try:
4868         _RemoveDisks(self, iobj)
4869       finally:
4870         self.cfg.ReleaseDRBDMinors(instance)
4871         raise
4872
4873     feedback_fn("adding instance %s to cluster config" % instance)
4874
4875     self.cfg.AddInstance(iobj)
4876     # Declare that we don't want to remove the instance lock anymore, as we've
4877     # added the instance to the config
4878     del self.remove_locks[locking.LEVEL_INSTANCE]
4879     # Unlock all the nodes
4880     if self.op.mode == constants.INSTANCE_IMPORT:
4881       nodes_keep = [self.op.src_node]
4882       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4883                        if node != self.op.src_node]
4884       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4885       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4886     else:
4887       self.context.glm.release(locking.LEVEL_NODE)
4888       del self.acquired_locks[locking.LEVEL_NODE]
4889
4890     if self.op.wait_for_sync:
4891       disk_abort = not _WaitForSync(self, iobj)
4892     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4893       # make sure the disks are not degraded (still sync-ing is ok)
4894       time.sleep(15)
4895       feedback_fn("* checking mirrors status")
4896       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4897     else:
4898       disk_abort = False
4899
4900     if disk_abort:
4901       _RemoveDisks(self, iobj)
4902       self.cfg.RemoveInstance(iobj.name)
4903       # Make sure the instance lock gets removed
4904       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4905       raise errors.OpExecError("There are some degraded disks for"
4906                                " this instance")
4907
4908     feedback_fn("creating os for instance %s on node %s" %
4909                 (instance, pnode_name))
4910
4911     if iobj.disk_template != constants.DT_DISKLESS:
4912       if self.op.mode == constants.INSTANCE_CREATE:
4913         feedback_fn("* running the instance OS create scripts...")
4914         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4915         msg = result.RemoteFailMsg()
4916         if msg:
4917           raise errors.OpExecError("Could not add os for instance %s"
4918                                    " on node %s: %s" %
4919                                    (instance, pnode_name, msg))
4920
4921       elif self.op.mode == constants.INSTANCE_IMPORT:
4922         feedback_fn("* running the instance OS import scripts...")
4923         src_node = self.op.src_node
4924         src_images = self.src_images
4925         cluster_name = self.cfg.GetClusterName()
4926         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4927                                                          src_node, src_images,
4928                                                          cluster_name)
4929         import_result.Raise()
4930         for idx, result in enumerate(import_result.data):
4931           if not result:
4932             self.LogWarning("Could not import the image %s for instance"
4933                             " %s, disk %d, on node %s" %
4934                             (src_images[idx], instance, idx, pnode_name))
4935       else:
4936         # also checked in the prereq part
4937         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4938                                      % self.op.mode)
4939
4940     if self.op.start:
4941       iobj.admin_up = True
4942       self.cfg.Update(iobj)
4943       logging.info("Starting instance %s on node %s", instance, pnode_name)
4944       feedback_fn("* starting instance...")
4945       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4946       msg = result.RemoteFailMsg()
4947       if msg:
4948         raise errors.OpExecError("Could not start instance: %s" % msg)
4949
4950
4951 class LUConnectConsole(NoHooksLU):
4952   """Connect to an instance's console.
4953
4954   This is somewhat special in that it returns the command line that
4955   you need to run on the master node in order to connect to the
4956   console.
4957
4958   """
4959   _OP_REQP = ["instance_name"]
4960   REQ_BGL = False
4961
4962   def ExpandNames(self):
4963     self._ExpandAndLockInstance()
4964
4965   def CheckPrereq(self):
4966     """Check prerequisites.
4967
4968     This checks that the instance is in the cluster.
4969
4970     """
4971     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4972     assert self.instance is not None, \
4973       "Cannot retrieve locked instance %s" % self.op.instance_name
4974     _CheckNodeOnline(self, self.instance.primary_node)
4975
4976   def Exec(self, feedback_fn):
4977     """Connect to the console of an instance
4978
4979     """
4980     instance = self.instance
4981     node = instance.primary_node
4982
4983     node_insts = self.rpc.call_instance_list([node],
4984                                              [instance.hypervisor])[node]
4985     node_insts.Raise()
4986
4987     if instance.name not in node_insts.data:
4988       raise errors.OpExecError("Instance %s is not running." % instance.name)
4989
4990     logging.debug("Connecting to console of %s on %s", instance.name, node)
4991
4992     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4993     cluster = self.cfg.GetClusterInfo()
4994     # beparams and hvparams are passed separately, to avoid editing the
4995     # instance and then saving the defaults in the instance itself.
4996     hvparams = cluster.FillHV(instance)
4997     beparams = cluster.FillBE(instance)
4998     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4999
5000     # build ssh cmdline
5001     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5002
5003
5004 class LUReplaceDisks(LogicalUnit):
5005   """Replace the disks of an instance.
5006
5007   """
5008   HPATH = "mirrors-replace"
5009   HTYPE = constants.HTYPE_INSTANCE
5010   _OP_REQP = ["instance_name", "mode", "disks"]
5011   REQ_BGL = False
5012
5013   def CheckArguments(self):
5014     if not hasattr(self.op, "remote_node"):
5015       self.op.remote_node = None
5016     if not hasattr(self.op, "iallocator"):
5017       self.op.iallocator = None
5018
5019     # check for valid parameter combination
5020     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5021     if self.op.mode == constants.REPLACE_DISK_CHG:
5022       if cnt == 2:
5023         raise errors.OpPrereqError("When changing the secondary either an"
5024                                    " iallocator script must be used or the"
5025                                    " new node given")
5026       elif cnt == 0:
5027         raise errors.OpPrereqError("Give either the iallocator or the new"
5028                                    " secondary, not both")
5029     else: # not replacing the secondary
5030       if cnt != 2:
5031         raise errors.OpPrereqError("The iallocator and new node options can"
5032                                    " be used only when changing the"
5033                                    " secondary node")
5034
5035   def ExpandNames(self):
5036     self._ExpandAndLockInstance()
5037
5038     if self.op.iallocator is not None:
5039       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5040     elif self.op.remote_node is not None:
5041       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5042       if remote_node is None:
5043         raise errors.OpPrereqError("Node '%s' not known" %
5044                                    self.op.remote_node)
5045       self.op.remote_node = remote_node
5046       # Warning: do not remove the locking of the new secondary here
5047       # unless DRBD8.AddChildren is changed to work in parallel;
5048       # currently it doesn't since parallel invocations of
5049       # FindUnusedMinor will conflict
5050       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5051       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5052     else:
5053       self.needed_locks[locking.LEVEL_NODE] = []
5054       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5055
5056   def DeclareLocks(self, level):
5057     # If we're not already locking all nodes in the set we have to declare the
5058     # instance's primary/secondary nodes.
5059     if (level == locking.LEVEL_NODE and
5060         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5061       self._LockInstancesNodes()
5062
5063   def _RunAllocator(self):
5064     """Compute a new secondary node using an IAllocator.
5065
5066     """
5067     ial = IAllocator(self,
5068                      mode=constants.IALLOCATOR_MODE_RELOC,
5069                      name=self.op.instance_name,
5070                      relocate_from=[self.sec_node])
5071
5072     ial.Run(self.op.iallocator)
5073
5074     if not ial.success:
5075       raise errors.OpPrereqError("Can't compute nodes using"
5076                                  " iallocator '%s': %s" % (self.op.iallocator,
5077                                                            ial.info))
5078     if len(ial.nodes) != ial.required_nodes:
5079       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5080                                  " of nodes (%s), required %s" %
5081                                  (len(ial.nodes), ial.required_nodes))
5082     self.op.remote_node = ial.nodes[0]
5083     self.LogInfo("Selected new secondary for the instance: %s",
5084                  self.op.remote_node)
5085
5086   def BuildHooksEnv(self):
5087     """Build hooks env.
5088
5089     This runs on the master, the primary and all the secondaries.
5090
5091     """
5092     env = {
5093       "MODE": self.op.mode,
5094       "NEW_SECONDARY": self.op.remote_node,
5095       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5096       }
5097     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5098     nl = [
5099       self.cfg.GetMasterNode(),
5100       self.instance.primary_node,
5101       ]
5102     if self.op.remote_node is not None:
5103       nl.append(self.op.remote_node)
5104     return env, nl, nl
5105
5106   def CheckPrereq(self):
5107     """Check prerequisites.
5108
5109     This checks that the instance is in the cluster.
5110
5111     """
5112     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5113     assert instance is not None, \
5114       "Cannot retrieve locked instance %s" % self.op.instance_name
5115     self.instance = instance
5116
5117     if instance.disk_template != constants.DT_DRBD8:
5118       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5119                                  " instances")
5120
5121     if len(instance.secondary_nodes) != 1:
5122       raise errors.OpPrereqError("The instance has a strange layout,"
5123                                  " expected one secondary but found %d" %
5124                                  len(instance.secondary_nodes))
5125
5126     self.sec_node = instance.secondary_nodes[0]
5127
5128     if self.op.iallocator is not None:
5129       self._RunAllocator()
5130
5131     remote_node = self.op.remote_node
5132     if remote_node is not None:
5133       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5134       assert self.remote_node_info is not None, \
5135         "Cannot retrieve locked node %s" % remote_node
5136     else:
5137       self.remote_node_info = None
5138     if remote_node == instance.primary_node:
5139       raise errors.OpPrereqError("The specified node is the primary node of"
5140                                  " the instance.")
5141     elif remote_node == self.sec_node:
5142       raise errors.OpPrereqError("The specified node is already the"
5143                                  " secondary node of the instance.")
5144
5145     if self.op.mode == constants.REPLACE_DISK_PRI:
5146       n1 = self.tgt_node = instance.primary_node
5147       n2 = self.oth_node = self.sec_node
5148     elif self.op.mode == constants.REPLACE_DISK_SEC:
5149       n1 = self.tgt_node = self.sec_node
5150       n2 = self.oth_node = instance.primary_node
5151     elif self.op.mode == constants.REPLACE_DISK_CHG:
5152       n1 = self.new_node = remote_node
5153       n2 = self.oth_node = instance.primary_node
5154       self.tgt_node = self.sec_node
5155       _CheckNodeNotDrained(self, remote_node)
5156     else:
5157       raise errors.ProgrammerError("Unhandled disk replace mode")
5158
5159     _CheckNodeOnline(self, n1)
5160     _CheckNodeOnline(self, n2)
5161
5162     if not self.op.disks:
5163       self.op.disks = range(len(instance.disks))
5164
5165     for disk_idx in self.op.disks:
5166       instance.FindDisk(disk_idx)
5167
5168   def _ExecD8DiskOnly(self, feedback_fn):
5169     """Replace a disk on the primary or secondary for dbrd8.
5170
5171     The algorithm for replace is quite complicated:
5172
5173       1. for each disk to be replaced:
5174
5175         1. create new LVs on the target node with unique names
5176         1. detach old LVs from the drbd device
5177         1. rename old LVs to name_replaced.<time_t>
5178         1. rename new LVs to old LVs
5179         1. attach the new LVs (with the old names now) to the drbd device
5180
5181       1. wait for sync across all devices
5182
5183       1. for each modified disk:
5184
5185         1. remove old LVs (which have the name name_replaces.<time_t>)
5186
5187     Failures are not very well handled.
5188
5189     """
5190     steps_total = 6
5191     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5192     instance = self.instance
5193     iv_names = {}
5194     vgname = self.cfg.GetVGName()
5195     # start of work
5196     cfg = self.cfg
5197     tgt_node = self.tgt_node
5198     oth_node = self.oth_node
5199
5200     # Step: check device activation
5201     self.proc.LogStep(1, steps_total, "check device existence")
5202     info("checking volume groups")
5203     my_vg = cfg.GetVGName()
5204     results = self.rpc.call_vg_list([oth_node, tgt_node])
5205     if not results:
5206       raise errors.OpExecError("Can't list volume groups on the nodes")
5207     for node in oth_node, tgt_node:
5208       res = results[node]
5209       if res.failed or not res.data or my_vg not in res.data:
5210         raise errors.OpExecError("Volume group '%s' not found on %s" %
5211                                  (my_vg, node))
5212     for idx, dev in enumerate(instance.disks):
5213       if idx not in self.op.disks:
5214         continue
5215       for node in tgt_node, oth_node:
5216         info("checking disk/%d on %s" % (idx, node))
5217         cfg.SetDiskID(dev, node)
5218         result = self.rpc.call_blockdev_find(node, dev)
5219         msg = result.RemoteFailMsg()
5220         if not msg and not result.payload:
5221           msg = "disk not found"
5222         if msg:
5223           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5224                                    (idx, node, msg))
5225
5226     # Step: check other node consistency
5227     self.proc.LogStep(2, steps_total, "check peer consistency")
5228     for idx, dev in enumerate(instance.disks):
5229       if idx not in self.op.disks:
5230         continue
5231       info("checking disk/%d consistency on %s" % (idx, oth_node))
5232       if not _CheckDiskConsistency(self, dev, oth_node,
5233                                    oth_node==instance.primary_node):
5234         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5235                                  " to replace disks on this node (%s)" %
5236                                  (oth_node, tgt_node))
5237
5238     # Step: create new storage
5239     self.proc.LogStep(3, steps_total, "allocate new storage")
5240     for idx, dev in enumerate(instance.disks):
5241       if idx not in self.op.disks:
5242         continue
5243       size = dev.size
5244       cfg.SetDiskID(dev, tgt_node)
5245       lv_names = [".disk%d_%s" % (idx, suf)
5246                   for suf in ["data", "meta"]]
5247       names = _GenerateUniqueNames(self, lv_names)
5248       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5249                              logical_id=(vgname, names[0]))
5250       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5251                              logical_id=(vgname, names[1]))
5252       new_lvs = [lv_data, lv_meta]
5253       old_lvs = dev.children
5254       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5255       info("creating new local storage on %s for %s" %
5256            (tgt_node, dev.iv_name))
5257       # we pass force_create=True to force the LVM creation
5258       for new_lv in new_lvs:
5259         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5260                         _GetInstanceInfoText(instance), False)
5261
5262     # Step: for each lv, detach+rename*2+attach
5263     self.proc.LogStep(4, steps_total, "change drbd configuration")
5264     for dev, old_lvs, new_lvs in iv_names.itervalues():
5265       info("detaching %s drbd from local storage" % dev.iv_name)
5266       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5267       msg = result.RemoteFailMsg()
5268       if msg:
5269         raise errors.OpExecError("Can't detach drbd from local storage on node"
5270                                  " %s for device %s: %s" %
5271                                  (tgt_node, dev.iv_name, msg))
5272       #dev.children = []
5273       #cfg.Update(instance)
5274
5275       # ok, we created the new LVs, so now we know we have the needed
5276       # storage; as such, we proceed on the target node to rename
5277       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5278       # using the assumption that logical_id == physical_id (which in
5279       # turn is the unique_id on that node)
5280
5281       # FIXME(iustin): use a better name for the replaced LVs
5282       temp_suffix = int(time.time())
5283       ren_fn = lambda d, suff: (d.physical_id[0],
5284                                 d.physical_id[1] + "_replaced-%s" % suff)
5285       # build the rename list based on what LVs exist on the node
5286       rlist = []
5287       for to_ren in old_lvs:
5288         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5289         if not result.RemoteFailMsg() and result.payload:
5290           # device exists
5291           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5292
5293       info("renaming the old LVs on the target node")
5294       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5295       msg = result.RemoteFailMsg()
5296       if msg:
5297         raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5298                                  (tgt_node, msg))
5299       # now we rename the new LVs to the old LVs
5300       info("renaming the new LVs on the target node")
5301       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5302       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5303       msg = result.RemoteFailMsg()
5304       if msg:
5305         raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5306                                  (tgt_node, msg))
5307
5308       for old, new in zip(old_lvs, new_lvs):
5309         new.logical_id = old.logical_id
5310         cfg.SetDiskID(new, tgt_node)
5311
5312       for disk in old_lvs:
5313         disk.logical_id = ren_fn(disk, temp_suffix)
5314         cfg.SetDiskID(disk, tgt_node)
5315
5316       # now that the new lvs have the old name, we can add them to the device
5317       info("adding new mirror component on %s" % tgt_node)
5318       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5319       msg = result.RemoteFailMsg()
5320       if msg:
5321         for new_lv in new_lvs:
5322           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5323           if msg:
5324             warning("Can't rollback device %s: %s", dev, msg,
5325                     hint="cleanup manually the unused logical volumes")
5326         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5327
5328       dev.children = new_lvs
5329       cfg.Update(instance)
5330
5331     # Step: wait for sync
5332
5333     # this can fail as the old devices are degraded and _WaitForSync
5334     # does a combined result over all disks, so we don't check its
5335     # return value
5336     self.proc.LogStep(5, steps_total, "sync devices")
5337     _WaitForSync(self, instance, unlock=True)
5338
5339     # so check manually all the devices
5340     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5341       cfg.SetDiskID(dev, instance.primary_node)
5342       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5343       msg = result.RemoteFailMsg()
5344       if not msg and not result.payload:
5345         msg = "disk not found"
5346       if msg:
5347         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5348                                  (name, msg))
5349       if result.payload[5]:
5350         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5351
5352     # Step: remove old storage
5353     self.proc.LogStep(6, steps_total, "removing old storage")
5354     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5355       info("remove logical volumes for %s" % name)
5356       for lv in old_lvs:
5357         cfg.SetDiskID(lv, tgt_node)
5358         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5359         if msg:
5360           warning("Can't remove old LV: %s" % msg,
5361                   hint="manually remove unused LVs")
5362           continue
5363
5364   def _ExecD8Secondary(self, feedback_fn):
5365     """Replace the secondary node for drbd8.
5366
5367     The algorithm for replace is quite complicated:
5368       - for all disks of the instance:
5369         - create new LVs on the new node with same names
5370         - shutdown the drbd device on the old secondary
5371         - disconnect the drbd network on the primary
5372         - create the drbd device on the new secondary
5373         - network attach the drbd on the primary, using an artifice:
5374           the drbd code for Attach() will connect to the network if it
5375           finds a device which is connected to the good local disks but
5376           not network enabled
5377       - wait for sync across all devices
5378       - remove all disks from the old secondary
5379
5380     Failures are not very well handled.
5381
5382     """
5383     steps_total = 6
5384     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5385     instance = self.instance
5386     iv_names = {}
5387     # start of work
5388     cfg = self.cfg
5389     old_node = self.tgt_node
5390     new_node = self.new_node
5391     pri_node = instance.primary_node
5392     nodes_ip = {
5393       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5394       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5395       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5396       }
5397
5398     # Step: check device activation
5399     self.proc.LogStep(1, steps_total, "check device existence")
5400     info("checking volume groups")
5401     my_vg = cfg.GetVGName()
5402     results = self.rpc.call_vg_list([pri_node, new_node])
5403     for node in pri_node, new_node:
5404       res = results[node]
5405       if res.failed or not res.data or my_vg not in res.data:
5406         raise errors.OpExecError("Volume group '%s' not found on %s" %
5407                                  (my_vg, node))
5408     for idx, dev in enumerate(instance.disks):
5409       if idx not in self.op.disks:
5410         continue
5411       info("checking disk/%d on %s" % (idx, pri_node))
5412       cfg.SetDiskID(dev, pri_node)
5413       result = self.rpc.call_blockdev_find(pri_node, dev)
5414       msg = result.RemoteFailMsg()
5415       if not msg and not result.payload:
5416         msg = "disk not found"
5417       if msg:
5418         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5419                                  (idx, pri_node, msg))
5420
5421     # Step: check other node consistency
5422     self.proc.LogStep(2, steps_total, "check peer consistency")
5423     for idx, dev in enumerate(instance.disks):
5424       if idx not in self.op.disks:
5425         continue
5426       info("checking disk/%d consistency on %s" % (idx, pri_node))
5427       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5428         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5429                                  " unsafe to replace the secondary" %
5430                                  pri_node)
5431
5432     # Step: create new storage
5433     self.proc.LogStep(3, steps_total, "allocate new storage")
5434     for idx, dev in enumerate(instance.disks):
5435       info("adding new local storage on %s for disk/%d" %
5436            (new_node, idx))
5437       # we pass force_create=True to force LVM creation
5438       for new_lv in dev.children:
5439         _CreateBlockDev(self, new_node, instance, new_lv, True,
5440                         _GetInstanceInfoText(instance), False)
5441
5442     # Step 4: dbrd minors and drbd setups changes
5443     # after this, we must manually remove the drbd minors on both the
5444     # error and the success paths
5445     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5446                                    instance.name)
5447     logging.debug("Allocated minors %s" % (minors,))
5448     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5449     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5450       size = dev.size
5451       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5452       # create new devices on new_node; note that we create two IDs:
5453       # one without port, so the drbd will be activated without
5454       # networking information on the new node at this stage, and one
5455       # with network, for the latter activation in step 4
5456       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5457       if pri_node == o_node1:
5458         p_minor = o_minor1
5459       else:
5460         p_minor = o_minor2
5461
5462       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5463       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5464
5465       iv_names[idx] = (dev, dev.children, new_net_id)
5466       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5467                     new_net_id)
5468       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5469                               logical_id=new_alone_id,
5470                               children=dev.children)
5471       try:
5472         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5473                               _GetInstanceInfoText(instance), False)
5474       except errors.GenericError:
5475         self.cfg.ReleaseDRBDMinors(instance.name)
5476         raise
5477
5478     for idx, dev in enumerate(instance.disks):
5479       # we have new devices, shutdown the drbd on the old secondary
5480       info("shutting down drbd for disk/%d on old node" % idx)
5481       cfg.SetDiskID(dev, old_node)
5482       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5483       if msg:
5484         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5485                 (idx, msg),
5486                 hint="Please cleanup this device manually as soon as possible")
5487
5488     info("detaching primary drbds from the network (=> standalone)")
5489     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5490                                                instance.disks)[pri_node]
5491
5492     msg = result.RemoteFailMsg()
5493     if msg:
5494       # detaches didn't succeed (unlikely)
5495       self.cfg.ReleaseDRBDMinors(instance.name)
5496       raise errors.OpExecError("Can't detach the disks from the network on"
5497                                " old node: %s" % (msg,))
5498
5499     # if we managed to detach at least one, we update all the disks of
5500     # the instance to point to the new secondary
5501     info("updating instance configuration")
5502     for dev, _, new_logical_id in iv_names.itervalues():
5503       dev.logical_id = new_logical_id
5504       cfg.SetDiskID(dev, pri_node)
5505     cfg.Update(instance)
5506
5507     # and now perform the drbd attach
5508     info("attaching primary drbds to new secondary (standalone => connected)")
5509     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5510                                            instance.disks, instance.name,
5511                                            False)
5512     for to_node, to_result in result.items():
5513       msg = to_result.RemoteFailMsg()
5514       if msg:
5515         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5516                 hint="please do a gnt-instance info to see the"
5517                 " status of disks")
5518
5519     # this can fail as the old devices are degraded and _WaitForSync
5520     # does a combined result over all disks, so we don't check its
5521     # return value
5522     self.proc.LogStep(5, steps_total, "sync devices")
5523     _WaitForSync(self, instance, unlock=True)
5524
5525     # so check manually all the devices
5526     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5527       cfg.SetDiskID(dev, pri_node)
5528       result = self.rpc.call_blockdev_find(pri_node, dev)
5529       msg = result.RemoteFailMsg()
5530       if not msg and not result.payload:
5531         msg = "disk not found"
5532       if msg:
5533         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5534                                  (idx, msg))
5535       if result.payload[5]:
5536         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5537
5538     self.proc.LogStep(6, steps_total, "removing old storage")
5539     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5540       info("remove logical volumes for disk/%d" % idx)
5541       for lv in old_lvs:
5542         cfg.SetDiskID(lv, old_node)
5543         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5544         if msg:
5545           warning("Can't remove LV on old secondary: %s", msg,
5546                   hint="Cleanup stale volumes by hand")
5547
5548   def Exec(self, feedback_fn):
5549     """Execute disk replacement.
5550
5551     This dispatches the disk replacement to the appropriate handler.
5552
5553     """
5554     instance = self.instance
5555
5556     # Activate the instance disks if we're replacing them on a down instance
5557     if not instance.admin_up:
5558       _StartInstanceDisks(self, instance, True)
5559
5560     if self.op.mode == constants.REPLACE_DISK_CHG:
5561       fn = self._ExecD8Secondary
5562     else:
5563       fn = self._ExecD8DiskOnly
5564
5565     ret = fn(feedback_fn)
5566
5567     # Deactivate the instance disks if we're replacing them on a down instance
5568     if not instance.admin_up:
5569       _SafeShutdownInstanceDisks(self, instance)
5570
5571     return ret
5572
5573
5574 class LUGrowDisk(LogicalUnit):
5575   """Grow a disk of an instance.
5576
5577   """
5578   HPATH = "disk-grow"
5579   HTYPE = constants.HTYPE_INSTANCE
5580   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5581   REQ_BGL = False
5582
5583   def ExpandNames(self):
5584     self._ExpandAndLockInstance()
5585     self.needed_locks[locking.LEVEL_NODE] = []
5586     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5587
5588   def DeclareLocks(self, level):
5589     if level == locking.LEVEL_NODE:
5590       self._LockInstancesNodes()
5591
5592   def BuildHooksEnv(self):
5593     """Build hooks env.
5594
5595     This runs on the master, the primary and all the secondaries.
5596
5597     """
5598     env = {
5599       "DISK": self.op.disk,
5600       "AMOUNT": self.op.amount,
5601       }
5602     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5603     nl = [
5604       self.cfg.GetMasterNode(),
5605       self.instance.primary_node,
5606       ]
5607     return env, nl, nl
5608
5609   def CheckPrereq(self):
5610     """Check prerequisites.
5611
5612     This checks that the instance is in the cluster.
5613
5614     """
5615     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5616     assert instance is not None, \
5617       "Cannot retrieve locked instance %s" % self.op.instance_name
5618     nodenames = list(instance.all_nodes)
5619     for node in nodenames:
5620       _CheckNodeOnline(self, node)
5621
5622
5623     self.instance = instance
5624
5625     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5626       raise errors.OpPrereqError("Instance's disk layout does not support"
5627                                  " growing.")
5628
5629     self.disk = instance.FindDisk(self.op.disk)
5630
5631     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5632                                        instance.hypervisor)
5633     for node in nodenames:
5634       info = nodeinfo[node]
5635       if info.failed or not info.data:
5636         raise errors.OpPrereqError("Cannot get current information"
5637                                    " from node '%s'" % node)
5638       vg_free = info.data.get('vg_free', None)
5639       if not isinstance(vg_free, int):
5640         raise errors.OpPrereqError("Can't compute free disk space on"
5641                                    " node %s" % node)
5642       if self.op.amount > vg_free:
5643         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5644                                    " %d MiB available, %d MiB required" %
5645                                    (node, vg_free, self.op.amount))
5646
5647   def Exec(self, feedback_fn):
5648     """Execute disk grow.
5649
5650     """
5651     instance = self.instance
5652     disk = self.disk
5653     for node in instance.all_nodes:
5654       self.cfg.SetDiskID(disk, node)
5655       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5656       msg = result.RemoteFailMsg()
5657       if msg:
5658         raise errors.OpExecError("Grow request failed to node %s: %s" %
5659                                  (node, msg))
5660     disk.RecordGrow(self.op.amount)
5661     self.cfg.Update(instance)
5662     if self.op.wait_for_sync:
5663       disk_abort = not _WaitForSync(self, instance)
5664       if disk_abort:
5665         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5666                              " status.\nPlease check the instance.")
5667
5668
5669 class LUQueryInstanceData(NoHooksLU):
5670   """Query runtime instance data.
5671
5672   """
5673   _OP_REQP = ["instances", "static"]
5674   REQ_BGL = False
5675
5676   def ExpandNames(self):
5677     self.needed_locks = {}
5678     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5679
5680     if not isinstance(self.op.instances, list):
5681       raise errors.OpPrereqError("Invalid argument type 'instances'")
5682
5683     if self.op.instances:
5684       self.wanted_names = []
5685       for name in self.op.instances:
5686         full_name = self.cfg.ExpandInstanceName(name)
5687         if full_name is None:
5688           raise errors.OpPrereqError("Instance '%s' not known" % name)
5689         self.wanted_names.append(full_name)
5690       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5691     else:
5692       self.wanted_names = None
5693       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5694
5695     self.needed_locks[locking.LEVEL_NODE] = []
5696     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5697
5698   def DeclareLocks(self, level):
5699     if level == locking.LEVEL_NODE:
5700       self._LockInstancesNodes()
5701
5702   def CheckPrereq(self):
5703     """Check prerequisites.
5704
5705     This only checks the optional instance list against the existing names.
5706
5707     """
5708     if self.wanted_names is None:
5709       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5710
5711     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5712                              in self.wanted_names]
5713     return
5714
5715   def _ComputeDiskStatus(self, instance, snode, dev):
5716     """Compute block device status.
5717
5718     """
5719     static = self.op.static
5720     if not static:
5721       self.cfg.SetDiskID(dev, instance.primary_node)
5722       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5723       if dev_pstatus.offline:
5724         dev_pstatus = None
5725       else:
5726         msg = dev_pstatus.RemoteFailMsg()
5727         if msg:
5728           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5729                                    (instance.name, msg))
5730         dev_pstatus = dev_pstatus.payload
5731     else:
5732       dev_pstatus = None
5733
5734     if dev.dev_type in constants.LDS_DRBD:
5735       # we change the snode then (otherwise we use the one passed in)
5736       if dev.logical_id[0] == instance.primary_node:
5737         snode = dev.logical_id[1]
5738       else:
5739         snode = dev.logical_id[0]
5740
5741     if snode and not static:
5742       self.cfg.SetDiskID(dev, snode)
5743       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5744       if dev_sstatus.offline:
5745         dev_sstatus = None
5746       else:
5747         msg = dev_sstatus.RemoteFailMsg()
5748         if msg:
5749           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5750                                    (instance.name, msg))
5751         dev_sstatus = dev_sstatus.payload
5752     else:
5753       dev_sstatus = None
5754
5755     if dev.children:
5756       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5757                       for child in dev.children]
5758     else:
5759       dev_children = []
5760
5761     data = {
5762       "iv_name": dev.iv_name,
5763       "dev_type": dev.dev_type,
5764       "logical_id": dev.logical_id,
5765       "physical_id": dev.physical_id,
5766       "pstatus": dev_pstatus,
5767       "sstatus": dev_sstatus,
5768       "children": dev_children,
5769       "mode": dev.mode,
5770       }
5771
5772     return data
5773
5774   def Exec(self, feedback_fn):
5775     """Gather and return data"""
5776     result = {}
5777
5778     cluster = self.cfg.GetClusterInfo()
5779
5780     for instance in self.wanted_instances:
5781       if not self.op.static:
5782         remote_info = self.rpc.call_instance_info(instance.primary_node,
5783                                                   instance.name,
5784                                                   instance.hypervisor)
5785         remote_info.Raise()
5786         remote_info = remote_info.data
5787         if remote_info and "state" in remote_info:
5788           remote_state = "up"
5789         else:
5790           remote_state = "down"
5791       else:
5792         remote_state = None
5793       if instance.admin_up:
5794         config_state = "up"
5795       else:
5796         config_state = "down"
5797
5798       disks = [self._ComputeDiskStatus(instance, None, device)
5799                for device in instance.disks]
5800
5801       idict = {
5802         "name": instance.name,
5803         "config_state": config_state,
5804         "run_state": remote_state,
5805         "pnode": instance.primary_node,
5806         "snodes": instance.secondary_nodes,
5807         "os": instance.os,
5808         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5809         "disks": disks,
5810         "hypervisor": instance.hypervisor,
5811         "network_port": instance.network_port,
5812         "hv_instance": instance.hvparams,
5813         "hv_actual": cluster.FillHV(instance),
5814         "be_instance": instance.beparams,
5815         "be_actual": cluster.FillBE(instance),
5816         }
5817
5818       result[instance.name] = idict
5819
5820     return result
5821
5822
5823 class LUSetInstanceParams(LogicalUnit):
5824   """Modifies an instances's parameters.
5825
5826   """
5827   HPATH = "instance-modify"
5828   HTYPE = constants.HTYPE_INSTANCE
5829   _OP_REQP = ["instance_name"]
5830   REQ_BGL = False
5831
5832   def CheckArguments(self):
5833     if not hasattr(self.op, 'nics'):
5834       self.op.nics = []
5835     if not hasattr(self.op, 'disks'):
5836       self.op.disks = []
5837     if not hasattr(self.op, 'beparams'):
5838       self.op.beparams = {}
5839     if not hasattr(self.op, 'hvparams'):
5840       self.op.hvparams = {}
5841     self.op.force = getattr(self.op, "force", False)
5842     if not (self.op.nics or self.op.disks or
5843             self.op.hvparams or self.op.beparams):
5844       raise errors.OpPrereqError("No changes submitted")
5845
5846     # Disk validation
5847     disk_addremove = 0
5848     for disk_op, disk_dict in self.op.disks:
5849       if disk_op == constants.DDM_REMOVE:
5850         disk_addremove += 1
5851         continue
5852       elif disk_op == constants.DDM_ADD:
5853         disk_addremove += 1
5854       else:
5855         if not isinstance(disk_op, int):
5856           raise errors.OpPrereqError("Invalid disk index")
5857       if disk_op == constants.DDM_ADD:
5858         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5859         if mode not in constants.DISK_ACCESS_SET:
5860           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5861         size = disk_dict.get('size', None)
5862         if size is None:
5863           raise errors.OpPrereqError("Required disk parameter size missing")
5864         try:
5865           size = int(size)
5866         except ValueError, err:
5867           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5868                                      str(err))
5869         disk_dict['size'] = size
5870       else:
5871         # modification of disk
5872         if 'size' in disk_dict:
5873           raise errors.OpPrereqError("Disk size change not possible, use"
5874                                      " grow-disk")
5875
5876     if disk_addremove > 1:
5877       raise errors.OpPrereqError("Only one disk add or remove operation"
5878                                  " supported at a time")
5879
5880     # NIC validation
5881     nic_addremove = 0
5882     for nic_op, nic_dict in self.op.nics:
5883       if nic_op == constants.DDM_REMOVE:
5884         nic_addremove += 1
5885         continue
5886       elif nic_op == constants.DDM_ADD:
5887         nic_addremove += 1
5888       else:
5889         if not isinstance(nic_op, int):
5890           raise errors.OpPrereqError("Invalid nic index")
5891
5892       # nic_dict should be a dict
5893       nic_ip = nic_dict.get('ip', None)
5894       if nic_ip is not None:
5895         if nic_ip.lower() == constants.VALUE_NONE:
5896           nic_dict['ip'] = None
5897         else:
5898           if not utils.IsValidIP(nic_ip):
5899             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5900
5901       if nic_op == constants.DDM_ADD:
5902         nic_bridge = nic_dict.get('bridge', None)
5903         if nic_bridge is None:
5904           nic_dict['bridge'] = self.cfg.GetDefBridge()
5905         nic_mac = nic_dict.get('mac', None)
5906         if nic_mac is None:
5907           nic_dict['mac'] = constants.VALUE_AUTO
5908
5909       if 'mac' in nic_dict:
5910         nic_mac = nic_dict['mac']
5911         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5912           if not utils.IsValidMac(nic_mac):
5913             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5914         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5915           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5916                                      " modifying an existing nic")
5917
5918     if nic_addremove > 1:
5919       raise errors.OpPrereqError("Only one NIC add or remove operation"
5920                                  " supported at a time")
5921
5922   def ExpandNames(self):
5923     self._ExpandAndLockInstance()
5924     self.needed_locks[locking.LEVEL_NODE] = []
5925     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5926
5927   def DeclareLocks(self, level):
5928     if level == locking.LEVEL_NODE:
5929       self._LockInstancesNodes()
5930
5931   def BuildHooksEnv(self):
5932     """Build hooks env.
5933
5934     This runs on the master, primary and secondaries.
5935
5936     """
5937     args = dict()
5938     if constants.BE_MEMORY in self.be_new:
5939       args['memory'] = self.be_new[constants.BE_MEMORY]
5940     if constants.BE_VCPUS in self.be_new:
5941       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5942     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5943     # information at all.
5944     if self.op.nics:
5945       args['nics'] = []
5946       nic_override = dict(self.op.nics)
5947       for idx, nic in enumerate(self.instance.nics):
5948         if idx in nic_override:
5949           this_nic_override = nic_override[idx]
5950         else:
5951           this_nic_override = {}
5952         if 'ip' in this_nic_override:
5953           ip = this_nic_override['ip']
5954         else:
5955           ip = nic.ip
5956         if 'bridge' in this_nic_override:
5957           bridge = this_nic_override['bridge']
5958         else:
5959           bridge = nic.bridge
5960         if 'mac' in this_nic_override:
5961           mac = this_nic_override['mac']
5962         else:
5963           mac = nic.mac
5964         args['nics'].append((ip, bridge, mac))
5965       if constants.DDM_ADD in nic_override:
5966         ip = nic_override[constants.DDM_ADD].get('ip', None)
5967         bridge = nic_override[constants.DDM_ADD]['bridge']
5968         mac = nic_override[constants.DDM_ADD]['mac']
5969         args['nics'].append((ip, bridge, mac))
5970       elif constants.DDM_REMOVE in nic_override:
5971         del args['nics'][-1]
5972
5973     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5974     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5975     return env, nl, nl
5976
5977   def _GetUpdatedParams(self, old_params, update_dict,
5978                         default_values, parameter_types):
5979     """Return the new params dict for the given params.
5980
5981     @type old_params: dict
5982     @type old_params: old parameters
5983     @type update_dict: dict
5984     @type update_dict: dict containing new parameter values,
5985                        or constants.VALUE_DEFAULT to reset the
5986                        parameter to its default value
5987     @type default_values: dict
5988     @param default_values: default values for the filled parameters
5989     @type parameter_types: dict
5990     @param parameter_types: dict mapping target dict keys to types
5991                             in constants.ENFORCEABLE_TYPES
5992     @rtype: (dict, dict)
5993     @return: (new_parameters, filled_parameters)
5994
5995     """
5996     params_copy = copy.deepcopy(old_params)
5997     for key, val in update_dict.iteritems():
5998       if val == constants.VALUE_DEFAULT:
5999         try:
6000           del params_copy[key]
6001         except KeyError:
6002           pass
6003       else:
6004         params_copy[key] = val
6005     utils.ForceDictType(params_copy, parameter_types)
6006     params_filled = objects.FillDict(default_values, params_copy)
6007     return (params_copy, params_filled)
6008
6009   def CheckPrereq(self):
6010     """Check prerequisites.
6011
6012     This only checks the instance list against the existing names.
6013
6014     """
6015     force = self.force = self.op.force
6016
6017     # checking the new params on the primary/secondary nodes
6018
6019     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6020     cluster = self.cluster = self.cfg.GetClusterInfo()
6021     assert self.instance is not None, \
6022       "Cannot retrieve locked instance %s" % self.op.instance_name
6023     pnode = instance.primary_node
6024     nodelist = list(instance.all_nodes)
6025
6026     # hvparams processing
6027     if self.op.hvparams:
6028       i_hvdict, hv_new = self._GetUpdatedParams(
6029                              instance.hvparams, self.op.hvparams,
6030                              cluster.hvparams[instance.hypervisor],
6031                              constants.HVS_PARAMETER_TYPES)
6032       # local check
6033       hypervisor.GetHypervisor(
6034         instance.hypervisor).CheckParameterSyntax(hv_new)
6035       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6036       self.hv_new = hv_new # the new actual values
6037       self.hv_inst = i_hvdict # the new dict (without defaults)
6038     else:
6039       self.hv_new = self.hv_inst = {}
6040
6041     # beparams processing
6042     if self.op.beparams:
6043       i_bedict, be_new = self._GetUpdatedParams(
6044                              instance.beparams, self.op.beparams,
6045                              cluster.beparams[constants.PP_DEFAULT],
6046                              constants.BES_PARAMETER_TYPES)
6047       self.be_new = be_new # the new actual values
6048       self.be_inst = i_bedict # the new dict (without defaults)
6049     else:
6050       self.be_new = self.be_inst = {}
6051
6052     self.warn = []
6053
6054     if constants.BE_MEMORY in self.op.beparams and not self.force:
6055       mem_check_list = [pnode]
6056       if be_new[constants.BE_AUTO_BALANCE]:
6057         # either we changed auto_balance to yes or it was from before
6058         mem_check_list.extend(instance.secondary_nodes)
6059       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6060                                                   instance.hypervisor)
6061       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6062                                          instance.hypervisor)
6063       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6064         # Assume the primary node is unreachable and go ahead
6065         self.warn.append("Can't get info from primary node %s" % pnode)
6066       else:
6067         if not instance_info.failed and instance_info.data:
6068           current_mem = int(instance_info.data['memory'])
6069         else:
6070           # Assume instance not running
6071           # (there is a slight race condition here, but it's not very probable,
6072           # and we have no other way to check)
6073           current_mem = 0
6074         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6075                     nodeinfo[pnode].data['memory_free'])
6076         if miss_mem > 0:
6077           raise errors.OpPrereqError("This change will prevent the instance"
6078                                      " from starting, due to %d MB of memory"
6079                                      " missing on its primary node" % miss_mem)
6080
6081       if be_new[constants.BE_AUTO_BALANCE]:
6082         for node, nres in nodeinfo.iteritems():
6083           if node not in instance.secondary_nodes:
6084             continue
6085           if nres.failed or not isinstance(nres.data, dict):
6086             self.warn.append("Can't get info from secondary node %s" % node)
6087           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6088             self.warn.append("Not enough memory to failover instance to"
6089                              " secondary node %s" % node)
6090
6091     # NIC processing
6092     for nic_op, nic_dict in self.op.nics:
6093       if nic_op == constants.DDM_REMOVE:
6094         if not instance.nics:
6095           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6096         continue
6097       if nic_op != constants.DDM_ADD:
6098         # an existing nic
6099         if nic_op < 0 or nic_op >= len(instance.nics):
6100           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6101                                      " are 0 to %d" %
6102                                      (nic_op, len(instance.nics)))
6103       if 'bridge' in nic_dict:
6104         nic_bridge = nic_dict['bridge']
6105         if nic_bridge is None:
6106           raise errors.OpPrereqError('Cannot set the nic bridge to None')
6107         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6108           msg = ("Bridge '%s' doesn't exist on one of"
6109                  " the instance nodes" % nic_bridge)
6110           if self.force:
6111             self.warn.append(msg)
6112           else:
6113             raise errors.OpPrereqError(msg)
6114       if 'mac' in nic_dict:
6115         nic_mac = nic_dict['mac']
6116         if nic_mac is None:
6117           raise errors.OpPrereqError('Cannot set the nic mac to None')
6118         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6119           # otherwise generate the mac
6120           nic_dict['mac'] = self.cfg.GenerateMAC()
6121         else:
6122           # or validate/reserve the current one
6123           if self.cfg.IsMacInUse(nic_mac):
6124             raise errors.OpPrereqError("MAC address %s already in use"
6125                                        " in cluster" % nic_mac)
6126
6127     # DISK processing
6128     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6129       raise errors.OpPrereqError("Disk operations not supported for"
6130                                  " diskless instances")
6131     for disk_op, disk_dict in self.op.disks:
6132       if disk_op == constants.DDM_REMOVE:
6133         if len(instance.disks) == 1:
6134           raise errors.OpPrereqError("Cannot remove the last disk of"
6135                                      " an instance")
6136         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6137         ins_l = ins_l[pnode]
6138         if ins_l.failed or not isinstance(ins_l.data, list):
6139           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6140         if instance.name in ins_l.data:
6141           raise errors.OpPrereqError("Instance is running, can't remove"
6142                                      " disks.")
6143
6144       if (disk_op == constants.DDM_ADD and
6145           len(instance.nics) >= constants.MAX_DISKS):
6146         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6147                                    " add more" % constants.MAX_DISKS)
6148       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6149         # an existing disk
6150         if disk_op < 0 or disk_op >= len(instance.disks):
6151           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6152                                      " are 0 to %d" %
6153                                      (disk_op, len(instance.disks)))
6154
6155     return
6156
6157   def Exec(self, feedback_fn):
6158     """Modifies an instance.
6159
6160     All parameters take effect only at the next restart of the instance.
6161
6162     """
6163     # Process here the warnings from CheckPrereq, as we don't have a
6164     # feedback_fn there.
6165     for warn in self.warn:
6166       feedback_fn("WARNING: %s" % warn)
6167
6168     result = []
6169     instance = self.instance
6170     # disk changes
6171     for disk_op, disk_dict in self.op.disks:
6172       if disk_op == constants.DDM_REMOVE:
6173         # remove the last disk
6174         device = instance.disks.pop()
6175         device_idx = len(instance.disks)
6176         for node, disk in device.ComputeNodeTree(instance.primary_node):
6177           self.cfg.SetDiskID(disk, node)
6178           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6179           if msg:
6180             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6181                             " continuing anyway", device_idx, node, msg)
6182         result.append(("disk/%d" % device_idx, "remove"))
6183       elif disk_op == constants.DDM_ADD:
6184         # add a new disk
6185         if instance.disk_template == constants.DT_FILE:
6186           file_driver, file_path = instance.disks[0].logical_id
6187           file_path = os.path.dirname(file_path)
6188         else:
6189           file_driver = file_path = None
6190         disk_idx_base = len(instance.disks)
6191         new_disk = _GenerateDiskTemplate(self,
6192                                          instance.disk_template,
6193                                          instance.name, instance.primary_node,
6194                                          instance.secondary_nodes,
6195                                          [disk_dict],
6196                                          file_path,
6197                                          file_driver,
6198                                          disk_idx_base)[0]
6199         instance.disks.append(new_disk)
6200         info = _GetInstanceInfoText(instance)
6201
6202         logging.info("Creating volume %s for instance %s",
6203                      new_disk.iv_name, instance.name)
6204         # Note: this needs to be kept in sync with _CreateDisks
6205         #HARDCODE
6206         for node in instance.all_nodes:
6207           f_create = node == instance.primary_node
6208           try:
6209             _CreateBlockDev(self, node, instance, new_disk,
6210                             f_create, info, f_create)
6211           except errors.OpExecError, err:
6212             self.LogWarning("Failed to create volume %s (%s) on"
6213                             " node %s: %s",
6214                             new_disk.iv_name, new_disk, node, err)
6215         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6216                        (new_disk.size, new_disk.mode)))
6217       else:
6218         # change a given disk
6219         instance.disks[disk_op].mode = disk_dict['mode']
6220         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6221     # NIC changes
6222     for nic_op, nic_dict in self.op.nics:
6223       if nic_op == constants.DDM_REMOVE:
6224         # remove the last nic
6225         del instance.nics[-1]
6226         result.append(("nic.%d" % len(instance.nics), "remove"))
6227       elif nic_op == constants.DDM_ADD:
6228         # mac and bridge should be set, by now
6229         mac = nic_dict['mac']
6230         bridge = nic_dict['bridge']
6231         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6232                               bridge=bridge)
6233         instance.nics.append(new_nic)
6234         result.append(("nic.%d" % (len(instance.nics) - 1),
6235                        "add:mac=%s,ip=%s,bridge=%s" %
6236                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6237       else:
6238         # change a given nic
6239         for key in 'mac', 'ip', 'bridge':
6240           if key in nic_dict:
6241             setattr(instance.nics[nic_op], key, nic_dict[key])
6242             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6243
6244     # hvparams changes
6245     if self.op.hvparams:
6246       instance.hvparams = self.hv_inst
6247       for key, val in self.op.hvparams.iteritems():
6248         result.append(("hv/%s" % key, val))
6249
6250     # beparams changes
6251     if self.op.beparams:
6252       instance.beparams = self.be_inst
6253       for key, val in self.op.beparams.iteritems():
6254         result.append(("be/%s" % key, val))
6255
6256     self.cfg.Update(instance)
6257
6258     return result
6259
6260
6261 class LUQueryExports(NoHooksLU):
6262   """Query the exports list
6263
6264   """
6265   _OP_REQP = ['nodes']
6266   REQ_BGL = False
6267
6268   def ExpandNames(self):
6269     self.needed_locks = {}
6270     self.share_locks[locking.LEVEL_NODE] = 1
6271     if not self.op.nodes:
6272       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6273     else:
6274       self.needed_locks[locking.LEVEL_NODE] = \
6275         _GetWantedNodes(self, self.op.nodes)
6276
6277   def CheckPrereq(self):
6278     """Check prerequisites.
6279
6280     """
6281     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6282
6283   def Exec(self, feedback_fn):
6284     """Compute the list of all the exported system images.
6285
6286     @rtype: dict
6287     @return: a dictionary with the structure node->(export-list)
6288         where export-list is a list of the instances exported on
6289         that node.
6290
6291     """
6292     rpcresult = self.rpc.call_export_list(self.nodes)
6293     result = {}
6294     for node in rpcresult:
6295       if rpcresult[node].failed:
6296         result[node] = False
6297       else:
6298         result[node] = rpcresult[node].data
6299
6300     return result
6301
6302
6303 class LUExportInstance(LogicalUnit):
6304   """Export an instance to an image in the cluster.
6305
6306   """
6307   HPATH = "instance-export"
6308   HTYPE = constants.HTYPE_INSTANCE
6309   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6310   REQ_BGL = False
6311
6312   def ExpandNames(self):
6313     self._ExpandAndLockInstance()
6314     # FIXME: lock only instance primary and destination node
6315     #
6316     # Sad but true, for now we have do lock all nodes, as we don't know where
6317     # the previous export might be, and and in this LU we search for it and
6318     # remove it from its current node. In the future we could fix this by:
6319     #  - making a tasklet to search (share-lock all), then create the new one,
6320     #    then one to remove, after
6321     #  - removing the removal operation altoghether
6322     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6323
6324   def DeclareLocks(self, level):
6325     """Last minute lock declaration."""
6326     # All nodes are locked anyway, so nothing to do here.
6327
6328   def BuildHooksEnv(self):
6329     """Build hooks env.
6330
6331     This will run on the master, primary node and target node.
6332
6333     """
6334     env = {
6335       "EXPORT_NODE": self.op.target_node,
6336       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6337       }
6338     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6339     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6340           self.op.target_node]
6341     return env, nl, nl
6342
6343   def CheckPrereq(self):
6344     """Check prerequisites.
6345
6346     This checks that the instance and node names are valid.
6347
6348     """
6349     instance_name = self.op.instance_name
6350     self.instance = self.cfg.GetInstanceInfo(instance_name)
6351     assert self.instance is not None, \
6352           "Cannot retrieve locked instance %s" % self.op.instance_name
6353     _CheckNodeOnline(self, self.instance.primary_node)
6354
6355     self.dst_node = self.cfg.GetNodeInfo(
6356       self.cfg.ExpandNodeName(self.op.target_node))
6357
6358     if self.dst_node is None:
6359       # This is wrong node name, not a non-locked node
6360       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6361     _CheckNodeOnline(self, self.dst_node.name)
6362     _CheckNodeNotDrained(self, self.dst_node.name)
6363
6364     # instance disk type verification
6365     for disk in self.instance.disks:
6366       if disk.dev_type == constants.LD_FILE:
6367         raise errors.OpPrereqError("Export not supported for instances with"
6368                                    " file-based disks")
6369
6370   def Exec(self, feedback_fn):
6371     """Export an instance to an image in the cluster.
6372
6373     """
6374     instance = self.instance
6375     dst_node = self.dst_node
6376     src_node = instance.primary_node
6377     if self.op.shutdown:
6378       # shutdown the instance, but not the disks
6379       result = self.rpc.call_instance_shutdown(src_node, instance)
6380       msg = result.RemoteFailMsg()
6381       if msg:
6382         raise errors.OpExecError("Could not shutdown instance %s on"
6383                                  " node %s: %s" %
6384                                  (instance.name, src_node, msg))
6385
6386     vgname = self.cfg.GetVGName()
6387
6388     snap_disks = []
6389
6390     # set the disks ID correctly since call_instance_start needs the
6391     # correct drbd minor to create the symlinks
6392     for disk in instance.disks:
6393       self.cfg.SetDiskID(disk, src_node)
6394
6395     try:
6396       for disk in instance.disks:
6397         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6398         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6399         if new_dev_name.failed or not new_dev_name.data:
6400           self.LogWarning("Could not snapshot block device %s on node %s",
6401                           disk.logical_id[1], src_node)
6402           snap_disks.append(False)
6403         else:
6404           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6405                                  logical_id=(vgname, new_dev_name.data),
6406                                  physical_id=(vgname, new_dev_name.data),
6407                                  iv_name=disk.iv_name)
6408           snap_disks.append(new_dev)
6409
6410     finally:
6411       if self.op.shutdown and instance.admin_up:
6412         result = self.rpc.call_instance_start(src_node, instance, None, None)
6413         msg = result.RemoteFailMsg()
6414         if msg:
6415           _ShutdownInstanceDisks(self, instance)
6416           raise errors.OpExecError("Could not start instance: %s" % msg)
6417
6418     # TODO: check for size
6419
6420     cluster_name = self.cfg.GetClusterName()
6421     for idx, dev in enumerate(snap_disks):
6422       if dev:
6423         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6424                                                instance, cluster_name, idx)
6425         if result.failed or not result.data:
6426           self.LogWarning("Could not export block device %s from node %s to"
6427                           " node %s", dev.logical_id[1], src_node,
6428                           dst_node.name)
6429         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6430         if msg:
6431           self.LogWarning("Could not remove snapshot block device %s from node"
6432                           " %s: %s", dev.logical_id[1], src_node, msg)
6433
6434     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6435     if result.failed or not result.data:
6436       self.LogWarning("Could not finalize export for instance %s on node %s",
6437                       instance.name, dst_node.name)
6438
6439     nodelist = self.cfg.GetNodeList()
6440     nodelist.remove(dst_node.name)
6441
6442     # on one-node clusters nodelist will be empty after the removal
6443     # if we proceed the backup would be removed because OpQueryExports
6444     # substitutes an empty list with the full cluster node list.
6445     if nodelist:
6446       exportlist = self.rpc.call_export_list(nodelist)
6447       for node in exportlist:
6448         if exportlist[node].failed:
6449           continue
6450         if instance.name in exportlist[node].data:
6451           if not self.rpc.call_export_remove(node, instance.name):
6452             self.LogWarning("Could not remove older export for instance %s"
6453                             " on node %s", instance.name, node)
6454
6455
6456 class LURemoveExport(NoHooksLU):
6457   """Remove exports related to the named instance.
6458
6459   """
6460   _OP_REQP = ["instance_name"]
6461   REQ_BGL = False
6462
6463   def ExpandNames(self):
6464     self.needed_locks = {}
6465     # We need all nodes to be locked in order for RemoveExport to work, but we
6466     # don't need to lock the instance itself, as nothing will happen to it (and
6467     # we can remove exports also for a removed instance)
6468     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6469
6470   def CheckPrereq(self):
6471     """Check prerequisites.
6472     """
6473     pass
6474
6475   def Exec(self, feedback_fn):
6476     """Remove any export.
6477
6478     """
6479     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6480     # If the instance was not found we'll try with the name that was passed in.
6481     # This will only work if it was an FQDN, though.
6482     fqdn_warn = False
6483     if not instance_name:
6484       fqdn_warn = True
6485       instance_name = self.op.instance_name
6486
6487     exportlist = self.rpc.call_export_list(self.acquired_locks[
6488       locking.LEVEL_NODE])
6489     found = False
6490     for node in exportlist:
6491       if exportlist[node].failed:
6492         self.LogWarning("Failed to query node %s, continuing" % node)
6493         continue
6494       if instance_name in exportlist[node].data:
6495         found = True
6496         result = self.rpc.call_export_remove(node, instance_name)
6497         if result.failed or not result.data:
6498           logging.error("Could not remove export for instance %s"
6499                         " on node %s", instance_name, node)
6500
6501     if fqdn_warn and not found:
6502       feedback_fn("Export not found. If trying to remove an export belonging"
6503                   " to a deleted instance please use its Fully Qualified"
6504                   " Domain Name.")
6505
6506
6507 class TagsLU(NoHooksLU):
6508   """Generic tags LU.
6509
6510   This is an abstract class which is the parent of all the other tags LUs.
6511
6512   """
6513
6514   def ExpandNames(self):
6515     self.needed_locks = {}
6516     if self.op.kind == constants.TAG_NODE:
6517       name = self.cfg.ExpandNodeName(self.op.name)
6518       if name is None:
6519         raise errors.OpPrereqError("Invalid node name (%s)" %
6520                                    (self.op.name,))
6521       self.op.name = name
6522       self.needed_locks[locking.LEVEL_NODE] = name
6523     elif self.op.kind == constants.TAG_INSTANCE:
6524       name = self.cfg.ExpandInstanceName(self.op.name)
6525       if name is None:
6526         raise errors.OpPrereqError("Invalid instance name (%s)" %
6527                                    (self.op.name,))
6528       self.op.name = name
6529       self.needed_locks[locking.LEVEL_INSTANCE] = name
6530
6531   def CheckPrereq(self):
6532     """Check prerequisites.
6533
6534     """
6535     if self.op.kind == constants.TAG_CLUSTER:
6536       self.target = self.cfg.GetClusterInfo()
6537     elif self.op.kind == constants.TAG_NODE:
6538       self.target = self.cfg.GetNodeInfo(self.op.name)
6539     elif self.op.kind == constants.TAG_INSTANCE:
6540       self.target = self.cfg.GetInstanceInfo(self.op.name)
6541     else:
6542       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6543                                  str(self.op.kind))
6544
6545
6546 class LUGetTags(TagsLU):
6547   """Returns the tags of a given object.
6548
6549   """
6550   _OP_REQP = ["kind", "name"]
6551   REQ_BGL = False
6552
6553   def Exec(self, feedback_fn):
6554     """Returns the tag list.
6555
6556     """
6557     return list(self.target.GetTags())
6558
6559
6560 class LUSearchTags(NoHooksLU):
6561   """Searches the tags for a given pattern.
6562
6563   """
6564   _OP_REQP = ["pattern"]
6565   REQ_BGL = False
6566
6567   def ExpandNames(self):
6568     self.needed_locks = {}
6569
6570   def CheckPrereq(self):
6571     """Check prerequisites.
6572
6573     This checks the pattern passed for validity by compiling it.
6574
6575     """
6576     try:
6577       self.re = re.compile(self.op.pattern)
6578     except re.error, err:
6579       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6580                                  (self.op.pattern, err))
6581
6582   def Exec(self, feedback_fn):
6583     """Returns the tag list.
6584
6585     """
6586     cfg = self.cfg
6587     tgts = [("/cluster", cfg.GetClusterInfo())]
6588     ilist = cfg.GetAllInstancesInfo().values()
6589     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6590     nlist = cfg.GetAllNodesInfo().values()
6591     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6592     results = []
6593     for path, target in tgts:
6594       for tag in target.GetTags():
6595         if self.re.search(tag):
6596           results.append((path, tag))
6597     return results
6598
6599
6600 class LUAddTags(TagsLU):
6601   """Sets a tag on a given object.
6602
6603   """
6604   _OP_REQP = ["kind", "name", "tags"]
6605   REQ_BGL = False
6606
6607   def CheckPrereq(self):
6608     """Check prerequisites.
6609
6610     This checks the type and length of the tag name and value.
6611
6612     """
6613     TagsLU.CheckPrereq(self)
6614     for tag in self.op.tags:
6615       objects.TaggableObject.ValidateTag(tag)
6616
6617   def Exec(self, feedback_fn):
6618     """Sets the tag.
6619
6620     """
6621     try:
6622       for tag in self.op.tags:
6623         self.target.AddTag(tag)
6624     except errors.TagError, err:
6625       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6626     try:
6627       self.cfg.Update(self.target)
6628     except errors.ConfigurationError:
6629       raise errors.OpRetryError("There has been a modification to the"
6630                                 " config file and the operation has been"
6631                                 " aborted. Please retry.")
6632
6633
6634 class LUDelTags(TagsLU):
6635   """Delete a list of tags from a given object.
6636
6637   """
6638   _OP_REQP = ["kind", "name", "tags"]
6639   REQ_BGL = False
6640
6641   def CheckPrereq(self):
6642     """Check prerequisites.
6643
6644     This checks that we have the given tag.
6645
6646     """
6647     TagsLU.CheckPrereq(self)
6648     for tag in self.op.tags:
6649       objects.TaggableObject.ValidateTag(tag)
6650     del_tags = frozenset(self.op.tags)
6651     cur_tags = self.target.GetTags()
6652     if not del_tags <= cur_tags:
6653       diff_tags = del_tags - cur_tags
6654       diff_names = ["'%s'" % tag for tag in diff_tags]
6655       diff_names.sort()
6656       raise errors.OpPrereqError("Tag(s) %s not found" %
6657                                  (",".join(diff_names)))
6658
6659   def Exec(self, feedback_fn):
6660     """Remove the tag from the object.
6661
6662     """
6663     for tag in self.op.tags:
6664       self.target.RemoveTag(tag)
6665     try:
6666       self.cfg.Update(self.target)
6667     except errors.ConfigurationError:
6668       raise errors.OpRetryError("There has been a modification to the"
6669                                 " config file and the operation has been"
6670                                 " aborted. Please retry.")
6671
6672
6673 class LUTestDelay(NoHooksLU):
6674   """Sleep for a specified amount of time.
6675
6676   This LU sleeps on the master and/or nodes for a specified amount of
6677   time.
6678
6679   """
6680   _OP_REQP = ["duration", "on_master", "on_nodes"]
6681   REQ_BGL = False
6682
6683   def ExpandNames(self):
6684     """Expand names and set required locks.
6685
6686     This expands the node list, if any.
6687
6688     """
6689     self.needed_locks = {}
6690     if self.op.on_nodes:
6691       # _GetWantedNodes can be used here, but is not always appropriate to use
6692       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6693       # more information.
6694       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6695       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6696
6697   def CheckPrereq(self):
6698     """Check prerequisites.
6699
6700     """
6701
6702   def Exec(self, feedback_fn):
6703     """Do the actual sleep.
6704
6705     """
6706     if self.op.on_master:
6707       if not utils.TestDelay(self.op.duration):
6708         raise errors.OpExecError("Error during master delay test")
6709     if self.op.on_nodes:
6710       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6711       if not result:
6712         raise errors.OpExecError("Complete failure from rpc call")
6713       for node, node_result in result.items():
6714         node_result.Raise()
6715         if not node_result.data:
6716           raise errors.OpExecError("Failure during rpc call to node %s,"
6717                                    " result: %s" % (node, node_result.data))
6718
6719
6720 class IAllocator(object):
6721   """IAllocator framework.
6722
6723   An IAllocator instance has three sets of attributes:
6724     - cfg that is needed to query the cluster
6725     - input data (all members of the _KEYS class attribute are required)
6726     - four buffer attributes (in|out_data|text), that represent the
6727       input (to the external script) in text and data structure format,
6728       and the output from it, again in two formats
6729     - the result variables from the script (success, info, nodes) for
6730       easy usage
6731
6732   """
6733   _ALLO_KEYS = [
6734     "mem_size", "disks", "disk_template",
6735     "os", "tags", "nics", "vcpus", "hypervisor",
6736     ]
6737   _RELO_KEYS = [
6738     "relocate_from",
6739     ]
6740
6741   def __init__(self, lu, mode, name, **kwargs):
6742     self.lu = lu
6743     # init buffer variables
6744     self.in_text = self.out_text = self.in_data = self.out_data = None
6745     # init all input fields so that pylint is happy
6746     self.mode = mode
6747     self.name = name
6748     self.mem_size = self.disks = self.disk_template = None
6749     self.os = self.tags = self.nics = self.vcpus = None
6750     self.hypervisor = None
6751     self.relocate_from = None
6752     # computed fields
6753     self.required_nodes = None
6754     # init result fields
6755     self.success = self.info = self.nodes = None
6756     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6757       keyset = self._ALLO_KEYS
6758     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6759       keyset = self._RELO_KEYS
6760     else:
6761       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6762                                    " IAllocator" % self.mode)
6763     for key in kwargs:
6764       if key not in keyset:
6765         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6766                                      " IAllocator" % key)
6767       setattr(self, key, kwargs[key])
6768     for key in keyset:
6769       if key not in kwargs:
6770         raise errors.ProgrammerError("Missing input parameter '%s' to"
6771                                      " IAllocator" % key)
6772     self._BuildInputData()
6773
6774   def _ComputeClusterData(self):
6775     """Compute the generic allocator input data.
6776
6777     This is the data that is independent of the actual operation.
6778
6779     """
6780     cfg = self.lu.cfg
6781     cluster_info = cfg.GetClusterInfo()
6782     # cluster data
6783     data = {
6784       "version": constants.IALLOCATOR_VERSION,
6785       "cluster_name": cfg.GetClusterName(),
6786       "cluster_tags": list(cluster_info.GetTags()),
6787       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6788       # we don't have job IDs
6789       }
6790     iinfo = cfg.GetAllInstancesInfo().values()
6791     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6792
6793     # node data
6794     node_results = {}
6795     node_list = cfg.GetNodeList()
6796
6797     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6798       hypervisor_name = self.hypervisor
6799     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6800       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6801
6802     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6803                                            hypervisor_name)
6804     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6805                        cluster_info.enabled_hypervisors)
6806     for nname, nresult in node_data.items():
6807       # first fill in static (config-based) values
6808       ninfo = cfg.GetNodeInfo(nname)
6809       pnr = {
6810         "tags": list(ninfo.GetTags()),
6811         "primary_ip": ninfo.primary_ip,
6812         "secondary_ip": ninfo.secondary_ip,
6813         "offline": ninfo.offline,
6814         "drained": ninfo.drained,
6815         "master_candidate": ninfo.master_candidate,
6816         }
6817
6818       if not ninfo.offline:
6819         nresult.Raise()
6820         if not isinstance(nresult.data, dict):
6821           raise errors.OpExecError("Can't get data for node %s" % nname)
6822         remote_info = nresult.data
6823         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6824                      'vg_size', 'vg_free', 'cpu_total']:
6825           if attr not in remote_info:
6826             raise errors.OpExecError("Node '%s' didn't return attribute"
6827                                      " '%s'" % (nname, attr))
6828           try:
6829             remote_info[attr] = int(remote_info[attr])
6830           except ValueError, err:
6831             raise errors.OpExecError("Node '%s' returned invalid value"
6832                                      " for '%s': %s" % (nname, attr, err))
6833         # compute memory used by primary instances
6834         i_p_mem = i_p_up_mem = 0
6835         for iinfo, beinfo in i_list:
6836           if iinfo.primary_node == nname:
6837             i_p_mem += beinfo[constants.BE_MEMORY]
6838             if iinfo.name not in node_iinfo[nname].data:
6839               i_used_mem = 0
6840             else:
6841               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6842             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6843             remote_info['memory_free'] -= max(0, i_mem_diff)
6844
6845             if iinfo.admin_up:
6846               i_p_up_mem += beinfo[constants.BE_MEMORY]
6847
6848         # compute memory used by instances
6849         pnr_dyn = {
6850           "total_memory": remote_info['memory_total'],
6851           "reserved_memory": remote_info['memory_dom0'],
6852           "free_memory": remote_info['memory_free'],
6853           "total_disk": remote_info['vg_size'],
6854           "free_disk": remote_info['vg_free'],
6855           "total_cpus": remote_info['cpu_total'],
6856           "i_pri_memory": i_p_mem,
6857           "i_pri_up_memory": i_p_up_mem,
6858           }
6859         pnr.update(pnr_dyn)
6860
6861       node_results[nname] = pnr
6862     data["nodes"] = node_results
6863
6864     # instance data
6865     instance_data = {}
6866     for iinfo, beinfo in i_list:
6867       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6868                   for n in iinfo.nics]
6869       pir = {
6870         "tags": list(iinfo.GetTags()),
6871         "admin_up": iinfo.admin_up,
6872         "vcpus": beinfo[constants.BE_VCPUS],
6873         "memory": beinfo[constants.BE_MEMORY],
6874         "os": iinfo.os,
6875         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6876         "nics": nic_data,
6877         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6878         "disk_template": iinfo.disk_template,
6879         "hypervisor": iinfo.hypervisor,
6880         }
6881       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6882                                                  pir["disks"])
6883       instance_data[iinfo.name] = pir
6884
6885     data["instances"] = instance_data
6886
6887     self.in_data = data
6888
6889   def _AddNewInstance(self):
6890     """Add new instance data to allocator structure.
6891
6892     This in combination with _AllocatorGetClusterData will create the
6893     correct structure needed as input for the allocator.
6894
6895     The checks for the completeness of the opcode must have already been
6896     done.
6897
6898     """
6899     data = self.in_data
6900
6901     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6902
6903     if self.disk_template in constants.DTS_NET_MIRROR:
6904       self.required_nodes = 2
6905     else:
6906       self.required_nodes = 1
6907     request = {
6908       "type": "allocate",
6909       "name": self.name,
6910       "disk_template": self.disk_template,
6911       "tags": self.tags,
6912       "os": self.os,
6913       "vcpus": self.vcpus,
6914       "memory": self.mem_size,
6915       "disks": self.disks,
6916       "disk_space_total": disk_space,
6917       "nics": self.nics,
6918       "required_nodes": self.required_nodes,
6919       }
6920     data["request"] = request
6921
6922   def _AddRelocateInstance(self):
6923     """Add relocate instance data to allocator structure.
6924
6925     This in combination with _IAllocatorGetClusterData will create the
6926     correct structure needed as input for the allocator.
6927
6928     The checks for the completeness of the opcode must have already been
6929     done.
6930
6931     """
6932     instance = self.lu.cfg.GetInstanceInfo(self.name)
6933     if instance is None:
6934       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6935                                    " IAllocator" % self.name)
6936
6937     if instance.disk_template not in constants.DTS_NET_MIRROR:
6938       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6939
6940     if len(instance.secondary_nodes) != 1:
6941       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6942
6943     self.required_nodes = 1
6944     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6945     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6946
6947     request = {
6948       "type": "relocate",
6949       "name": self.name,
6950       "disk_space_total": disk_space,
6951       "required_nodes": self.required_nodes,
6952       "relocate_from": self.relocate_from,
6953       }
6954     self.in_data["request"] = request
6955
6956   def _BuildInputData(self):
6957     """Build input data structures.
6958
6959     """
6960     self._ComputeClusterData()
6961
6962     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6963       self._AddNewInstance()
6964     else:
6965       self._AddRelocateInstance()
6966
6967     self.in_text = serializer.Dump(self.in_data)
6968
6969   def Run(self, name, validate=True, call_fn=None):
6970     """Run an instance allocator and return the results.
6971
6972     """
6973     if call_fn is None:
6974       call_fn = self.lu.rpc.call_iallocator_runner
6975     data = self.in_text
6976
6977     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6978     result.Raise()
6979
6980     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6981       raise errors.OpExecError("Invalid result from master iallocator runner")
6982
6983     rcode, stdout, stderr, fail = result.data
6984
6985     if rcode == constants.IARUN_NOTFOUND:
6986       raise errors.OpExecError("Can't find allocator '%s'" % name)
6987     elif rcode == constants.IARUN_FAILURE:
6988       raise errors.OpExecError("Instance allocator call failed: %s,"
6989                                " output: %s" % (fail, stdout+stderr))
6990     self.out_text = stdout
6991     if validate:
6992       self._ValidateResult()
6993
6994   def _ValidateResult(self):
6995     """Process the allocator results.
6996
6997     This will process and if successful save the result in
6998     self.out_data and the other parameters.
6999
7000     """
7001     try:
7002       rdict = serializer.Load(self.out_text)
7003     except Exception, err:
7004       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7005
7006     if not isinstance(rdict, dict):
7007       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7008
7009     for key in "success", "info", "nodes":
7010       if key not in rdict:
7011         raise errors.OpExecError("Can't parse iallocator results:"
7012                                  " missing key '%s'" % key)
7013       setattr(self, key, rdict[key])
7014
7015     if not isinstance(rdict["nodes"], list):
7016       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7017                                " is not a list")
7018     self.out_data = rdict
7019
7020
7021 class LUTestAllocator(NoHooksLU):
7022   """Run allocator tests.
7023
7024   This LU runs the allocator tests
7025
7026   """
7027   _OP_REQP = ["direction", "mode", "name"]
7028
7029   def CheckPrereq(self):
7030     """Check prerequisites.
7031
7032     This checks the opcode parameters depending on the director and mode test.
7033
7034     """
7035     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7036       for attr in ["name", "mem_size", "disks", "disk_template",
7037                    "os", "tags", "nics", "vcpus"]:
7038         if not hasattr(self.op, attr):
7039           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7040                                      attr)
7041       iname = self.cfg.ExpandInstanceName(self.op.name)
7042       if iname is not None:
7043         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7044                                    iname)
7045       if not isinstance(self.op.nics, list):
7046         raise errors.OpPrereqError("Invalid parameter 'nics'")
7047       for row in self.op.nics:
7048         if (not isinstance(row, dict) or
7049             "mac" not in row or
7050             "ip" not in row or
7051             "bridge" not in row):
7052           raise errors.OpPrereqError("Invalid contents of the"
7053                                      " 'nics' parameter")
7054       if not isinstance(self.op.disks, list):
7055         raise errors.OpPrereqError("Invalid parameter 'disks'")
7056       for row in self.op.disks:
7057         if (not isinstance(row, dict) or
7058             "size" not in row or
7059             not isinstance(row["size"], int) or
7060             "mode" not in row or
7061             row["mode"] not in ['r', 'w']):
7062           raise errors.OpPrereqError("Invalid contents of the"
7063                                      " 'disks' parameter")
7064       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7065         self.op.hypervisor = self.cfg.GetHypervisorType()
7066     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7067       if not hasattr(self.op, "name"):
7068         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7069       fname = self.cfg.ExpandInstanceName(self.op.name)
7070       if fname is None:
7071         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7072                                    self.op.name)
7073       self.op.name = fname
7074       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7075     else:
7076       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7077                                  self.op.mode)
7078
7079     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7080       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7081         raise errors.OpPrereqError("Missing allocator name")
7082     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7083       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7084                                  self.op.direction)
7085
7086   def Exec(self, feedback_fn):
7087     """Run the allocator test.
7088
7089     """
7090     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7091       ial = IAllocator(self,
7092                        mode=self.op.mode,
7093                        name=self.op.name,
7094                        mem_size=self.op.mem_size,
7095                        disks=self.op.disks,
7096                        disk_template=self.op.disk_template,
7097                        os=self.op.os,
7098                        tags=self.op.tags,
7099                        nics=self.op.nics,
7100                        vcpus=self.op.vcpus,
7101                        hypervisor=self.op.hypervisor,
7102                        )
7103     else:
7104       ial = IAllocator(self,
7105                        mode=self.op.mode,
7106                        name=self.op.name,
7107                        relocate_from=list(self.relocate_from),
7108                        )
7109
7110     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7111       result = ial.in_text
7112     else:
7113       ial.Run(self.op.allocator, validate=False)
7114       result = ial.out_text
7115     return result