Iallocator: NIC parameters
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, mac, mode, link) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_MAC" % idx] = mac
509       env["INSTANCE_NIC%d_MODE" % idx] = mode
510       env["INSTANCE_NIC%d_LINK" % idx] = link
511       if mode == constants.NIC_MODE_BRIDGED:
512         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
513   else:
514     nic_count = 0
515
516   env["INSTANCE_NIC_COUNT"] = nic_count
517
518   if disks:
519     disk_count = len(disks)
520     for idx, (size, mode) in enumerate(disks):
521       env["INSTANCE_DISK%d_SIZE" % idx] = size
522       env["INSTANCE_DISK%d_MODE" % idx] = mode
523   else:
524     disk_count = 0
525
526   env["INSTANCE_DISK_COUNT"] = disk_count
527
528   return env
529
530 def _PreBuildNICHooksList(lu, nics):
531   """Build a list of nic information tuples.
532
533   This list is suitable to be passed to _BuildInstanceHookEnv.
534
535   @type lu:  L{LogicalUnit}
536   @param lu: the logical unit on whose behalf we execute
537   @type nics: list of L{objects.NIC}
538   @param nics: list of nics to convert to hooks tuples
539
540   """
541   hooks_nics = []
542   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
543   for nic in nics:
544     ip = nic.ip
545     mac = nic.mac
546     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547     mode = filled_params[constants.NIC_MODE]
548     link = filled_params[constants.NIC_LINK]
549     hooks_nics.append((ip, mac, mode, link))
550   return hooks_nics
551
552 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553   """Builds instance related env variables for hooks from an object.
554
555   @type lu: L{LogicalUnit}
556   @param lu: the logical unit on whose behalf we execute
557   @type instance: L{objects.Instance}
558   @param instance: the instance for which we should build the
559       environment
560   @type override: dict
561   @param override: dictionary with key/values that will override
562       our values
563   @rtype: dict
564   @return: the hook environment dictionary
565
566   """
567   bep = lu.cfg.GetClusterInfo().FillBE(instance)
568   args = {
569     'name': instance.name,
570     'primary_node': instance.primary_node,
571     'secondary_nodes': instance.secondary_nodes,
572     'os_type': instance.os,
573     'status': instance.admin_up,
574     'memory': bep[constants.BE_MEMORY],
575     'vcpus': bep[constants.BE_VCPUS],
576     'nics': _PreBuildNICHooksList(lu, instance.nics),
577     'disk_template': instance.disk_template,
578     'disks': [(disk.size, disk.mode) for disk in instance.disks],
579   }
580   if override:
581     args.update(override)
582   return _BuildInstanceHookEnv(**args)
583
584
585 def _AdjustCandidatePool(lu):
586   """Adjust the candidate pool after node operations.
587
588   """
589   mod_list = lu.cfg.MaintainCandidatePool()
590   if mod_list:
591     lu.LogInfo("Promoted nodes to master candidate role: %s",
592                ", ".join(node.name for node in mod_list))
593     for name in mod_list:
594       lu.context.ReaddNode(name)
595   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
596   if mc_now > mc_max:
597     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
598                (mc_now, mc_max))
599
600
601 def _CheckNicsBridgesExist(lu, target_nics, target_node,
602                                profile=constants.PP_DEFAULT):
603   """Check that the brigdes needed by a list of nics exist.
604
605   """
606   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608                 for nic in target_nics]
609   brlist = [params[constants.NIC_LINK] for params in paramslist
610             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
611   if brlist:
612     result = lu.rpc.call_bridges_exist(target_node, brlist)
613     result.Raise()
614     if not result.data:
615       raise errors.OpPrereqError("One or more target bridges %s does not"
616                                  " exist on destination node '%s'" %
617                                  (brlist, target_node))
618
619
620 def _CheckInstanceBridgesExist(lu, instance, node=None):
621   """Check that the brigdes needed by an instance exist.
622
623   """
624   if node is None:
625     node=instance.primary_node
626   _CheckNicsBridgesExist(lu, instance.nics, node)
627
628
629 class LUDestroyCluster(NoHooksLU):
630   """Logical unit for destroying the cluster.
631
632   """
633   _OP_REQP = []
634
635   def CheckPrereq(self):
636     """Check prerequisites.
637
638     This checks whether the cluster is empty.
639
640     Any errors are signalled by raising errors.OpPrereqError.
641
642     """
643     master = self.cfg.GetMasterNode()
644
645     nodelist = self.cfg.GetNodeList()
646     if len(nodelist) != 1 or nodelist[0] != master:
647       raise errors.OpPrereqError("There are still %d node(s) in"
648                                  " this cluster." % (len(nodelist) - 1))
649     instancelist = self.cfg.GetInstanceList()
650     if instancelist:
651       raise errors.OpPrereqError("There are still %d instance(s) in"
652                                  " this cluster." % len(instancelist))
653
654   def Exec(self, feedback_fn):
655     """Destroys the cluster.
656
657     """
658     master = self.cfg.GetMasterNode()
659     result = self.rpc.call_node_stop_master(master, False)
660     result.Raise()
661     if not result.data:
662       raise errors.OpExecError("Could not disable the master role")
663     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
664     utils.CreateBackup(priv_key)
665     utils.CreateBackup(pub_key)
666     return master
667
668
669 class LUVerifyCluster(LogicalUnit):
670   """Verifies the cluster status.
671
672   """
673   HPATH = "cluster-verify"
674   HTYPE = constants.HTYPE_CLUSTER
675   _OP_REQP = ["skip_checks"]
676   REQ_BGL = False
677
678   def ExpandNames(self):
679     self.needed_locks = {
680       locking.LEVEL_NODE: locking.ALL_SET,
681       locking.LEVEL_INSTANCE: locking.ALL_SET,
682     }
683     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
684
685   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
686                   node_result, feedback_fn, master_files,
687                   drbd_map, vg_name):
688     """Run multiple tests against a node.
689
690     Test list:
691
692       - compares ganeti version
693       - checks vg existance and size > 20G
694       - checks config file checksum
695       - checks ssh to other nodes
696
697     @type nodeinfo: L{objects.Node}
698     @param nodeinfo: the node to check
699     @param file_list: required list of files
700     @param local_cksum: dictionary of local files and their checksums
701     @param node_result: the results from the node
702     @param feedback_fn: function used to accumulate results
703     @param master_files: list of files that only masters should have
704     @param drbd_map: the useddrbd minors for this node, in
705         form of minor: (instance, must_exist) which correspond to instances
706         and their running status
707     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
708
709     """
710     node = nodeinfo.name
711
712     # main result, node_result should be a non-empty dict
713     if not node_result or not isinstance(node_result, dict):
714       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
715       return True
716
717     # compares ganeti version
718     local_version = constants.PROTOCOL_VERSION
719     remote_version = node_result.get('version', None)
720     if not (remote_version and isinstance(remote_version, (list, tuple)) and
721             len(remote_version) == 2):
722       feedback_fn("  - ERROR: connection to %s failed" % (node))
723       return True
724
725     if local_version != remote_version[0]:
726       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
727                   " node %s %s" % (local_version, node, remote_version[0]))
728       return True
729
730     # node seems compatible, we can actually try to look into its results
731
732     bad = False
733
734     # full package version
735     if constants.RELEASE_VERSION != remote_version[1]:
736       feedback_fn("  - WARNING: software version mismatch: master %s,"
737                   " node %s %s" %
738                   (constants.RELEASE_VERSION, node, remote_version[1]))
739
740     # checks vg existence and size > 20G
741     if vg_name is not None:
742       vglist = node_result.get(constants.NV_VGLIST, None)
743       if not vglist:
744         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
745                         (node,))
746         bad = True
747       else:
748         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
749                                               constants.MIN_VG_SIZE)
750         if vgstatus:
751           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
752           bad = True
753
754     # checks config file checksum
755
756     remote_cksum = node_result.get(constants.NV_FILELIST, None)
757     if not isinstance(remote_cksum, dict):
758       bad = True
759       feedback_fn("  - ERROR: node hasn't returned file checksum data")
760     else:
761       for file_name in file_list:
762         node_is_mc = nodeinfo.master_candidate
763         must_have_file = file_name not in master_files
764         if file_name not in remote_cksum:
765           if node_is_mc or must_have_file:
766             bad = True
767             feedback_fn("  - ERROR: file '%s' missing" % file_name)
768         elif remote_cksum[file_name] != local_cksum[file_name]:
769           if node_is_mc or must_have_file:
770             bad = True
771             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
772           else:
773             # not candidate and this is not a must-have file
774             bad = True
775             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
776                         " '%s'" % file_name)
777         else:
778           # all good, except non-master/non-must have combination
779           if not node_is_mc and not must_have_file:
780             feedback_fn("  - ERROR: file '%s' should not exist on non master"
781                         " candidates" % file_name)
782
783     # checks ssh to any
784
785     if constants.NV_NODELIST not in node_result:
786       bad = True
787       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
788     else:
789       if node_result[constants.NV_NODELIST]:
790         bad = True
791         for node in node_result[constants.NV_NODELIST]:
792           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
793                           (node, node_result[constants.NV_NODELIST][node]))
794
795     if constants.NV_NODENETTEST not in node_result:
796       bad = True
797       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
798     else:
799       if node_result[constants.NV_NODENETTEST]:
800         bad = True
801         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
802         for node in nlist:
803           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
804                           (node, node_result[constants.NV_NODENETTEST][node]))
805
806     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
807     if isinstance(hyp_result, dict):
808       for hv_name, hv_result in hyp_result.iteritems():
809         if hv_result is not None:
810           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
811                       (hv_name, hv_result))
812
813     # check used drbd list
814     if vg_name is not None:
815       used_minors = node_result.get(constants.NV_DRBDLIST, [])
816       if not isinstance(used_minors, (tuple, list)):
817         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
818                     str(used_minors))
819       else:
820         for minor, (iname, must_exist) in drbd_map.items():
821           if minor not in used_minors and must_exist:
822             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
823                         " not active" % (minor, iname))
824             bad = True
825         for minor in used_minors:
826           if minor not in drbd_map:
827             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
828                         minor)
829             bad = True
830
831     return bad
832
833   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
834                       node_instance, feedback_fn, n_offline):
835     """Verify an instance.
836
837     This function checks to see if the required block devices are
838     available on the instance's node.
839
840     """
841     bad = False
842
843     node_current = instanceconfig.primary_node
844
845     node_vol_should = {}
846     instanceconfig.MapLVsByNode(node_vol_should)
847
848     for node in node_vol_should:
849       if node in n_offline:
850         # ignore missing volumes on offline nodes
851         continue
852       for volume in node_vol_should[node]:
853         if node not in node_vol_is or volume not in node_vol_is[node]:
854           feedback_fn("  - ERROR: volume %s missing on node %s" %
855                           (volume, node))
856           bad = True
857
858     if instanceconfig.admin_up:
859       if ((node_current not in node_instance or
860           not instance in node_instance[node_current]) and
861           node_current not in n_offline):
862         feedback_fn("  - ERROR: instance %s not running on node %s" %
863                         (instance, node_current))
864         bad = True
865
866     for node in node_instance:
867       if (not node == node_current):
868         if instance in node_instance[node]:
869           feedback_fn("  - ERROR: instance %s should not run on node %s" %
870                           (instance, node))
871           bad = True
872
873     return bad
874
875   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
876     """Verify if there are any unknown volumes in the cluster.
877
878     The .os, .swap and backup volumes are ignored. All other volumes are
879     reported as unknown.
880
881     """
882     bad = False
883
884     for node in node_vol_is:
885       for volume in node_vol_is[node]:
886         if node not in node_vol_should or volume not in node_vol_should[node]:
887           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
888                       (volume, node))
889           bad = True
890     return bad
891
892   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
893     """Verify the list of running instances.
894
895     This checks what instances are running but unknown to the cluster.
896
897     """
898     bad = False
899     for node in node_instance:
900       for runninginstance in node_instance[node]:
901         if runninginstance not in instancelist:
902           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
903                           (runninginstance, node))
904           bad = True
905     return bad
906
907   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
908     """Verify N+1 Memory Resilience.
909
910     Check that if one single node dies we can still start all the instances it
911     was primary for.
912
913     """
914     bad = False
915
916     for node, nodeinfo in node_info.iteritems():
917       # This code checks that every node which is now listed as secondary has
918       # enough memory to host all instances it is supposed to should a single
919       # other node in the cluster fail.
920       # FIXME: not ready for failover to an arbitrary node
921       # FIXME: does not support file-backed instances
922       # WARNING: we currently take into account down instances as well as up
923       # ones, considering that even if they're down someone might want to start
924       # them even in the event of a node failure.
925       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
926         needed_mem = 0
927         for instance in instances:
928           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
929           if bep[constants.BE_AUTO_BALANCE]:
930             needed_mem += bep[constants.BE_MEMORY]
931         if nodeinfo['mfree'] < needed_mem:
932           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
933                       " failovers should node %s fail" % (node, prinode))
934           bad = True
935     return bad
936
937   def CheckPrereq(self):
938     """Check prerequisites.
939
940     Transform the list of checks we're going to skip into a set and check that
941     all its members are valid.
942
943     """
944     self.skip_set = frozenset(self.op.skip_checks)
945     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
946       raise errors.OpPrereqError("Invalid checks to be skipped specified")
947
948   def BuildHooksEnv(self):
949     """Build hooks env.
950
951     Cluster-Verify hooks just rone in the post phase and their failure makes
952     the output be logged in the verify output and the verification to fail.
953
954     """
955     all_nodes = self.cfg.GetNodeList()
956     env = {
957       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
958       }
959     for node in self.cfg.GetAllNodesInfo().values():
960       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
961
962     return env, [], all_nodes
963
964   def Exec(self, feedback_fn):
965     """Verify integrity of cluster, performing various test on nodes.
966
967     """
968     bad = False
969     feedback_fn("* Verifying global settings")
970     for msg in self.cfg.VerifyConfig():
971       feedback_fn("  - ERROR: %s" % msg)
972
973     vg_name = self.cfg.GetVGName()
974     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
975     nodelist = utils.NiceSort(self.cfg.GetNodeList())
976     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
977     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
978     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
979                         for iname in instancelist)
980     i_non_redundant = [] # Non redundant instances
981     i_non_a_balanced = [] # Non auto-balanced instances
982     n_offline = [] # List of offline nodes
983     n_drained = [] # List of nodes being drained
984     node_volume = {}
985     node_instance = {}
986     node_info = {}
987     instance_cfg = {}
988
989     # FIXME: verify OS list
990     # do local checksums
991     master_files = [constants.CLUSTER_CONF_FILE]
992
993     file_names = ssconf.SimpleStore().GetFileList()
994     file_names.append(constants.SSL_CERT_FILE)
995     file_names.append(constants.RAPI_CERT_FILE)
996     file_names.extend(master_files)
997
998     local_checksums = utils.FingerprintFiles(file_names)
999
1000     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1001     node_verify_param = {
1002       constants.NV_FILELIST: file_names,
1003       constants.NV_NODELIST: [node.name for node in nodeinfo
1004                               if not node.offline],
1005       constants.NV_HYPERVISOR: hypervisors,
1006       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1007                                   node.secondary_ip) for node in nodeinfo
1008                                  if not node.offline],
1009       constants.NV_INSTANCELIST: hypervisors,
1010       constants.NV_VERSION: None,
1011       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1012       }
1013     if vg_name is not None:
1014       node_verify_param[constants.NV_VGLIST] = None
1015       node_verify_param[constants.NV_LVLIST] = vg_name
1016       node_verify_param[constants.NV_DRBDLIST] = None
1017     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1018                                            self.cfg.GetClusterName())
1019
1020     cluster = self.cfg.GetClusterInfo()
1021     master_node = self.cfg.GetMasterNode()
1022     all_drbd_map = self.cfg.ComputeDRBDMap()
1023
1024     for node_i in nodeinfo:
1025       node = node_i.name
1026       nresult = all_nvinfo[node].data
1027
1028       if node_i.offline:
1029         feedback_fn("* Skipping offline node %s" % (node,))
1030         n_offline.append(node)
1031         continue
1032
1033       if node == master_node:
1034         ntype = "master"
1035       elif node_i.master_candidate:
1036         ntype = "master candidate"
1037       elif node_i.drained:
1038         ntype = "drained"
1039         n_drained.append(node)
1040       else:
1041         ntype = "regular"
1042       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1043
1044       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1045         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1046         bad = True
1047         continue
1048
1049       node_drbd = {}
1050       for minor, instance in all_drbd_map[node].items():
1051         if instance not in instanceinfo:
1052           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1053                       instance)
1054           # ghost instance should not be running, but otherwise we
1055           # don't give double warnings (both ghost instance and
1056           # unallocated minor in use)
1057           node_drbd[minor] = (instance, False)
1058         else:
1059           instance = instanceinfo[instance]
1060           node_drbd[minor] = (instance.name, instance.admin_up)
1061       result = self._VerifyNode(node_i, file_names, local_checksums,
1062                                 nresult, feedback_fn, master_files,
1063                                 node_drbd, vg_name)
1064       bad = bad or result
1065
1066       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1067       if vg_name is None:
1068         node_volume[node] = {}
1069       elif isinstance(lvdata, basestring):
1070         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1071                     (node, utils.SafeEncode(lvdata)))
1072         bad = True
1073         node_volume[node] = {}
1074       elif not isinstance(lvdata, dict):
1075         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1076         bad = True
1077         continue
1078       else:
1079         node_volume[node] = lvdata
1080
1081       # node_instance
1082       idata = nresult.get(constants.NV_INSTANCELIST, None)
1083       if not isinstance(idata, list):
1084         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1085                     (node,))
1086         bad = True
1087         continue
1088
1089       node_instance[node] = idata
1090
1091       # node_info
1092       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1093       if not isinstance(nodeinfo, dict):
1094         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1095         bad = True
1096         continue
1097
1098       try:
1099         node_info[node] = {
1100           "mfree": int(nodeinfo['memory_free']),
1101           "pinst": [],
1102           "sinst": [],
1103           # dictionary holding all instances this node is secondary for,
1104           # grouped by their primary node. Each key is a cluster node, and each
1105           # value is a list of instances which have the key as primary and the
1106           # current node as secondary.  this is handy to calculate N+1 memory
1107           # availability if you can only failover from a primary to its
1108           # secondary.
1109           "sinst-by-pnode": {},
1110         }
1111         # FIXME: devise a free space model for file based instances as well
1112         if vg_name is not None:
1113           if (constants.NV_VGLIST not in nresult or
1114               vg_name not in nresult[constants.NV_VGLIST]):
1115             feedback_fn("  - ERROR: node %s didn't return data for the"
1116                         " volume group '%s' - it is either missing or broken" %
1117                         (node, vg_name))
1118             bad = True
1119             continue
1120           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1121       except (ValueError, KeyError):
1122         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1123                     " from node %s" % (node,))
1124         bad = True
1125         continue
1126
1127     node_vol_should = {}
1128
1129     for instance in instancelist:
1130       feedback_fn("* Verifying instance %s" % instance)
1131       inst_config = instanceinfo[instance]
1132       result =  self._VerifyInstance(instance, inst_config, node_volume,
1133                                      node_instance, feedback_fn, n_offline)
1134       bad = bad or result
1135       inst_nodes_offline = []
1136
1137       inst_config.MapLVsByNode(node_vol_should)
1138
1139       instance_cfg[instance] = inst_config
1140
1141       pnode = inst_config.primary_node
1142       if pnode in node_info:
1143         node_info[pnode]['pinst'].append(instance)
1144       elif pnode not in n_offline:
1145         feedback_fn("  - ERROR: instance %s, connection to primary node"
1146                     " %s failed" % (instance, pnode))
1147         bad = True
1148
1149       if pnode in n_offline:
1150         inst_nodes_offline.append(pnode)
1151
1152       # If the instance is non-redundant we cannot survive losing its primary
1153       # node, so we are not N+1 compliant. On the other hand we have no disk
1154       # templates with more than one secondary so that situation is not well
1155       # supported either.
1156       # FIXME: does not support file-backed instances
1157       if len(inst_config.secondary_nodes) == 0:
1158         i_non_redundant.append(instance)
1159       elif len(inst_config.secondary_nodes) > 1:
1160         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1161                     % instance)
1162
1163       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1164         i_non_a_balanced.append(instance)
1165
1166       for snode in inst_config.secondary_nodes:
1167         if snode in node_info:
1168           node_info[snode]['sinst'].append(instance)
1169           if pnode not in node_info[snode]['sinst-by-pnode']:
1170             node_info[snode]['sinst-by-pnode'][pnode] = []
1171           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1172         elif snode not in n_offline:
1173           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1174                       " %s failed" % (instance, snode))
1175           bad = True
1176         if snode in n_offline:
1177           inst_nodes_offline.append(snode)
1178
1179       if inst_nodes_offline:
1180         # warn that the instance lives on offline nodes, and set bad=True
1181         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1182                     ", ".join(inst_nodes_offline))
1183         bad = True
1184
1185     feedback_fn("* Verifying orphan volumes")
1186     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1187                                        feedback_fn)
1188     bad = bad or result
1189
1190     feedback_fn("* Verifying remaining instances")
1191     result = self._VerifyOrphanInstances(instancelist, node_instance,
1192                                          feedback_fn)
1193     bad = bad or result
1194
1195     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1196       feedback_fn("* Verifying N+1 Memory redundancy")
1197       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1198       bad = bad or result
1199
1200     feedback_fn("* Other Notes")
1201     if i_non_redundant:
1202       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1203                   % len(i_non_redundant))
1204
1205     if i_non_a_balanced:
1206       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1207                   % len(i_non_a_balanced))
1208
1209     if n_offline:
1210       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1211
1212     if n_drained:
1213       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1214
1215     return not bad
1216
1217   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1218     """Analize the post-hooks' result
1219
1220     This method analyses the hook result, handles it, and sends some
1221     nicely-formatted feedback back to the user.
1222
1223     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1224         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1225     @param hooks_results: the results of the multi-node hooks rpc call
1226     @param feedback_fn: function used send feedback back to the caller
1227     @param lu_result: previous Exec result
1228     @return: the new Exec result, based on the previous result
1229         and hook results
1230
1231     """
1232     # We only really run POST phase hooks, and are only interested in
1233     # their results
1234     if phase == constants.HOOKS_PHASE_POST:
1235       # Used to change hooks' output to proper indentation
1236       indent_re = re.compile('^', re.M)
1237       feedback_fn("* Hooks Results")
1238       if not hooks_results:
1239         feedback_fn("  - ERROR: general communication failure")
1240         lu_result = 1
1241       else:
1242         for node_name in hooks_results:
1243           show_node_header = True
1244           res = hooks_results[node_name]
1245           if res.failed or res.data is False or not isinstance(res.data, list):
1246             if res.offline:
1247               # no need to warn or set fail return value
1248               continue
1249             feedback_fn("    Communication failure in hooks execution")
1250             lu_result = 1
1251             continue
1252           for script, hkr, output in res.data:
1253             if hkr == constants.HKR_FAIL:
1254               # The node header is only shown once, if there are
1255               # failing hooks on that node
1256               if show_node_header:
1257                 feedback_fn("  Node %s:" % node_name)
1258                 show_node_header = False
1259               feedback_fn("    ERROR: Script %s failed, output:" % script)
1260               output = indent_re.sub('      ', output)
1261               feedback_fn("%s" % output)
1262               lu_result = 1
1263
1264       return lu_result
1265
1266
1267 class LUVerifyDisks(NoHooksLU):
1268   """Verifies the cluster disks status.
1269
1270   """
1271   _OP_REQP = []
1272   REQ_BGL = False
1273
1274   def ExpandNames(self):
1275     self.needed_locks = {
1276       locking.LEVEL_NODE: locking.ALL_SET,
1277       locking.LEVEL_INSTANCE: locking.ALL_SET,
1278     }
1279     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1280
1281   def CheckPrereq(self):
1282     """Check prerequisites.
1283
1284     This has no prerequisites.
1285
1286     """
1287     pass
1288
1289   def Exec(self, feedback_fn):
1290     """Verify integrity of cluster disks.
1291
1292     """
1293     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1294
1295     vg_name = self.cfg.GetVGName()
1296     nodes = utils.NiceSort(self.cfg.GetNodeList())
1297     instances = [self.cfg.GetInstanceInfo(name)
1298                  for name in self.cfg.GetInstanceList()]
1299
1300     nv_dict = {}
1301     for inst in instances:
1302       inst_lvs = {}
1303       if (not inst.admin_up or
1304           inst.disk_template not in constants.DTS_NET_MIRROR):
1305         continue
1306       inst.MapLVsByNode(inst_lvs)
1307       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1308       for node, vol_list in inst_lvs.iteritems():
1309         for vol in vol_list:
1310           nv_dict[(node, vol)] = inst
1311
1312     if not nv_dict:
1313       return result
1314
1315     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1316
1317     to_act = set()
1318     for node in nodes:
1319       # node_volume
1320       lvs = node_lvs[node]
1321       if lvs.failed:
1322         if not lvs.offline:
1323           self.LogWarning("Connection to node %s failed: %s" %
1324                           (node, lvs.data))
1325         continue
1326       lvs = lvs.data
1327       if isinstance(lvs, basestring):
1328         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1329         res_nlvm[node] = lvs
1330         continue
1331       elif not isinstance(lvs, dict):
1332         logging.warning("Connection to node %s failed or invalid data"
1333                         " returned", node)
1334         res_nodes.append(node)
1335         continue
1336
1337       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1338         inst = nv_dict.pop((node, lv_name), None)
1339         if (not lv_online and inst is not None
1340             and inst.name not in res_instances):
1341           res_instances.append(inst.name)
1342
1343     # any leftover items in nv_dict are missing LVs, let's arrange the
1344     # data better
1345     for key, inst in nv_dict.iteritems():
1346       if inst.name not in res_missing:
1347         res_missing[inst.name] = []
1348       res_missing[inst.name].append(key)
1349
1350     return result
1351
1352
1353 class LURenameCluster(LogicalUnit):
1354   """Rename the cluster.
1355
1356   """
1357   HPATH = "cluster-rename"
1358   HTYPE = constants.HTYPE_CLUSTER
1359   _OP_REQP = ["name"]
1360
1361   def BuildHooksEnv(self):
1362     """Build hooks env.
1363
1364     """
1365     env = {
1366       "OP_TARGET": self.cfg.GetClusterName(),
1367       "NEW_NAME": self.op.name,
1368       }
1369     mn = self.cfg.GetMasterNode()
1370     return env, [mn], [mn]
1371
1372   def CheckPrereq(self):
1373     """Verify that the passed name is a valid one.
1374
1375     """
1376     hostname = utils.HostInfo(self.op.name)
1377
1378     new_name = hostname.name
1379     self.ip = new_ip = hostname.ip
1380     old_name = self.cfg.GetClusterName()
1381     old_ip = self.cfg.GetMasterIP()
1382     if new_name == old_name and new_ip == old_ip:
1383       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1384                                  " cluster has changed")
1385     if new_ip != old_ip:
1386       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1387         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1388                                    " reachable on the network. Aborting." %
1389                                    new_ip)
1390
1391     self.op.name = new_name
1392
1393   def Exec(self, feedback_fn):
1394     """Rename the cluster.
1395
1396     """
1397     clustername = self.op.name
1398     ip = self.ip
1399
1400     # shutdown the master IP
1401     master = self.cfg.GetMasterNode()
1402     result = self.rpc.call_node_stop_master(master, False)
1403     if result.failed or not result.data:
1404       raise errors.OpExecError("Could not disable the master role")
1405
1406     try:
1407       cluster = self.cfg.GetClusterInfo()
1408       cluster.cluster_name = clustername
1409       cluster.master_ip = ip
1410       self.cfg.Update(cluster)
1411
1412       # update the known hosts file
1413       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1414       node_list = self.cfg.GetNodeList()
1415       try:
1416         node_list.remove(master)
1417       except ValueError:
1418         pass
1419       result = self.rpc.call_upload_file(node_list,
1420                                          constants.SSH_KNOWN_HOSTS_FILE)
1421       for to_node, to_result in result.iteritems():
1422          msg = to_result.RemoteFailMsg()
1423          if msg:
1424            msg = ("Copy of file %s to node %s failed: %s" %
1425                    (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1426            self.proc.LogWarning(msg)
1427
1428     finally:
1429       result = self.rpc.call_node_start_master(master, False)
1430       if result.failed or not result.data:
1431         self.LogWarning("Could not re-enable the master role on"
1432                         " the master, please restart manually.")
1433
1434
1435 def _RecursiveCheckIfLVMBased(disk):
1436   """Check if the given disk or its children are lvm-based.
1437
1438   @type disk: L{objects.Disk}
1439   @param disk: the disk to check
1440   @rtype: booleean
1441   @return: boolean indicating whether a LD_LV dev_type was found or not
1442
1443   """
1444   if disk.children:
1445     for chdisk in disk.children:
1446       if _RecursiveCheckIfLVMBased(chdisk):
1447         return True
1448   return disk.dev_type == constants.LD_LV
1449
1450
1451 class LUSetClusterParams(LogicalUnit):
1452   """Change the parameters of the cluster.
1453
1454   """
1455   HPATH = "cluster-modify"
1456   HTYPE = constants.HTYPE_CLUSTER
1457   _OP_REQP = []
1458   REQ_BGL = False
1459
1460   def CheckArguments(self):
1461     """Check parameters
1462
1463     """
1464     if not hasattr(self.op, "candidate_pool_size"):
1465       self.op.candidate_pool_size = None
1466     if self.op.candidate_pool_size is not None:
1467       try:
1468         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1469       except (ValueError, TypeError), err:
1470         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1471                                    str(err))
1472       if self.op.candidate_pool_size < 1:
1473         raise errors.OpPrereqError("At least one master candidate needed")
1474
1475   def ExpandNames(self):
1476     # FIXME: in the future maybe other cluster params won't require checking on
1477     # all nodes to be modified.
1478     self.needed_locks = {
1479       locking.LEVEL_NODE: locking.ALL_SET,
1480     }
1481     self.share_locks[locking.LEVEL_NODE] = 1
1482
1483   def BuildHooksEnv(self):
1484     """Build hooks env.
1485
1486     """
1487     env = {
1488       "OP_TARGET": self.cfg.GetClusterName(),
1489       "NEW_VG_NAME": self.op.vg_name,
1490       }
1491     mn = self.cfg.GetMasterNode()
1492     return env, [mn], [mn]
1493
1494   def CheckPrereq(self):
1495     """Check prerequisites.
1496
1497     This checks whether the given params don't conflict and
1498     if the given volume group is valid.
1499
1500     """
1501     if self.op.vg_name is not None and not self.op.vg_name:
1502       instances = self.cfg.GetAllInstancesInfo().values()
1503       for inst in instances:
1504         for disk in inst.disks:
1505           if _RecursiveCheckIfLVMBased(disk):
1506             raise errors.OpPrereqError("Cannot disable lvm storage while"
1507                                        " lvm-based instances exist")
1508
1509     node_list = self.acquired_locks[locking.LEVEL_NODE]
1510
1511     # if vg_name not None, checks given volume group on all nodes
1512     if self.op.vg_name:
1513       vglist = self.rpc.call_vg_list(node_list)
1514       for node in node_list:
1515         if vglist[node].failed:
1516           # ignoring down node
1517           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1518           continue
1519         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1520                                               self.op.vg_name,
1521                                               constants.MIN_VG_SIZE)
1522         if vgstatus:
1523           raise errors.OpPrereqError("Error on node '%s': %s" %
1524                                      (node, vgstatus))
1525
1526     self.cluster = cluster = self.cfg.GetClusterInfo()
1527     # validate params changes
1528     if self.op.beparams:
1529       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1530       self.new_beparams = objects.FillDict(
1531         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1532
1533     if self.op.nicparams:
1534       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1535       self.new_nicparams = objects.FillDict(
1536         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1537       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1538
1539     # hypervisor list/parameters
1540     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1541     if self.op.hvparams:
1542       if not isinstance(self.op.hvparams, dict):
1543         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1544       for hv_name, hv_dict in self.op.hvparams.items():
1545         if hv_name not in self.new_hvparams:
1546           self.new_hvparams[hv_name] = hv_dict
1547         else:
1548           self.new_hvparams[hv_name].update(hv_dict)
1549
1550     if self.op.enabled_hypervisors is not None:
1551       self.hv_list = self.op.enabled_hypervisors
1552     else:
1553       self.hv_list = cluster.enabled_hypervisors
1554
1555     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1556       # either the enabled list has changed, or the parameters have, validate
1557       for hv_name, hv_params in self.new_hvparams.items():
1558         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1559             (self.op.enabled_hypervisors and
1560              hv_name in self.op.enabled_hypervisors)):
1561           # either this is a new hypervisor, or its parameters have changed
1562           hv_class = hypervisor.GetHypervisor(hv_name)
1563           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1564           hv_class.CheckParameterSyntax(hv_params)
1565           _CheckHVParams(self, node_list, hv_name, hv_params)
1566
1567   def Exec(self, feedback_fn):
1568     """Change the parameters of the cluster.
1569
1570     """
1571     if self.op.vg_name is not None:
1572       new_volume = self.op.vg_name
1573       if not new_volume:
1574         new_volume = None
1575       if new_volume != self.cfg.GetVGName():
1576         self.cfg.SetVGName(new_volume)
1577       else:
1578         feedback_fn("Cluster LVM configuration already in desired"
1579                     " state, not changing")
1580     if self.op.hvparams:
1581       self.cluster.hvparams = self.new_hvparams
1582     if self.op.enabled_hypervisors is not None:
1583       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1584     if self.op.beparams:
1585       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1586     if self.op.nicparams:
1587       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1588
1589     if self.op.candidate_pool_size is not None:
1590       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1591
1592     self.cfg.Update(self.cluster)
1593
1594     # we want to update nodes after the cluster so that if any errors
1595     # happen, we have recorded and saved the cluster info
1596     if self.op.candidate_pool_size is not None:
1597       _AdjustCandidatePool(self)
1598
1599
1600 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1601   """Distribute additional files which are part of the cluster configuration.
1602
1603   ConfigWriter takes care of distributing the config and ssconf files, but
1604   there are more files which should be distributed to all nodes. This function
1605   makes sure those are copied.
1606
1607   @param lu: calling logical unit
1608   @param additional_nodes: list of nodes not in the config to distribute to
1609
1610   """
1611   # 1. Gather target nodes
1612   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1613   dist_nodes = lu.cfg.GetNodeList()
1614   if additional_nodes is not None:
1615     dist_nodes.extend(additional_nodes)
1616   if myself.name in dist_nodes:
1617     dist_nodes.remove(myself.name)
1618   # 2. Gather files to distribute
1619   dist_files = set([constants.ETC_HOSTS,
1620                     constants.SSH_KNOWN_HOSTS_FILE,
1621                     constants.RAPI_CERT_FILE,
1622                     constants.RAPI_USERS_FILE,
1623                    ])
1624
1625   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1626   for hv_name in enabled_hypervisors:
1627     hv_class = hypervisor.GetHypervisor(hv_name)
1628     dist_files.update(hv_class.GetAncillaryFiles())
1629
1630   # 3. Perform the files upload
1631   for fname in dist_files:
1632     if os.path.exists(fname):
1633       result = lu.rpc.call_upload_file(dist_nodes, fname)
1634       for to_node, to_result in result.items():
1635          msg = to_result.RemoteFailMsg()
1636          if msg:
1637            msg = ("Copy of file %s to node %s failed: %s" %
1638                    (fname, to_node, msg))
1639            lu.proc.LogWarning(msg)
1640
1641
1642 class LURedistributeConfig(NoHooksLU):
1643   """Force the redistribution of cluster configuration.
1644
1645   This is a very simple LU.
1646
1647   """
1648   _OP_REQP = []
1649   REQ_BGL = False
1650
1651   def ExpandNames(self):
1652     self.needed_locks = {
1653       locking.LEVEL_NODE: locking.ALL_SET,
1654     }
1655     self.share_locks[locking.LEVEL_NODE] = 1
1656
1657   def CheckPrereq(self):
1658     """Check prerequisites.
1659
1660     """
1661
1662   def Exec(self, feedback_fn):
1663     """Redistribute the configuration.
1664
1665     """
1666     self.cfg.Update(self.cfg.GetClusterInfo())
1667     _RedistributeAncillaryFiles(self)
1668
1669
1670 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1671   """Sleep and poll for an instance's disk to sync.
1672
1673   """
1674   if not instance.disks:
1675     return True
1676
1677   if not oneshot:
1678     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1679
1680   node = instance.primary_node
1681
1682   for dev in instance.disks:
1683     lu.cfg.SetDiskID(dev, node)
1684
1685   retries = 0
1686   while True:
1687     max_time = 0
1688     done = True
1689     cumul_degraded = False
1690     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1691     if rstats.failed or not rstats.data:
1692       lu.LogWarning("Can't get any data from node %s", node)
1693       retries += 1
1694       if retries >= 10:
1695         raise errors.RemoteError("Can't contact node %s for mirror data,"
1696                                  " aborting." % node)
1697       time.sleep(6)
1698       continue
1699     rstats = rstats.data
1700     retries = 0
1701     for i, mstat in enumerate(rstats):
1702       if mstat is None:
1703         lu.LogWarning("Can't compute data for node %s/%s",
1704                            node, instance.disks[i].iv_name)
1705         continue
1706       # we ignore the ldisk parameter
1707       perc_done, est_time, is_degraded, _ = mstat
1708       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1709       if perc_done is not None:
1710         done = False
1711         if est_time is not None:
1712           rem_time = "%d estimated seconds remaining" % est_time
1713           max_time = est_time
1714         else:
1715           rem_time = "no time estimate"
1716         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1717                         (instance.disks[i].iv_name, perc_done, rem_time))
1718     if done or oneshot:
1719       break
1720
1721     time.sleep(min(60, max_time))
1722
1723   if done:
1724     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1725   return not cumul_degraded
1726
1727
1728 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1729   """Check that mirrors are not degraded.
1730
1731   The ldisk parameter, if True, will change the test from the
1732   is_degraded attribute (which represents overall non-ok status for
1733   the device(s)) to the ldisk (representing the local storage status).
1734
1735   """
1736   lu.cfg.SetDiskID(dev, node)
1737   if ldisk:
1738     idx = 6
1739   else:
1740     idx = 5
1741
1742   result = True
1743   if on_primary or dev.AssembleOnSecondary():
1744     rstats = lu.rpc.call_blockdev_find(node, dev)
1745     msg = rstats.RemoteFailMsg()
1746     if msg:
1747       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1748       result = False
1749     elif not rstats.payload:
1750       lu.LogWarning("Can't find disk on node %s", node)
1751       result = False
1752     else:
1753       result = result and (not rstats.payload[idx])
1754   if dev.children:
1755     for child in dev.children:
1756       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1757
1758   return result
1759
1760
1761 class LUDiagnoseOS(NoHooksLU):
1762   """Logical unit for OS diagnose/query.
1763
1764   """
1765   _OP_REQP = ["output_fields", "names"]
1766   REQ_BGL = False
1767   _FIELDS_STATIC = utils.FieldSet()
1768   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1769
1770   def ExpandNames(self):
1771     if self.op.names:
1772       raise errors.OpPrereqError("Selective OS query not supported")
1773
1774     _CheckOutputFields(static=self._FIELDS_STATIC,
1775                        dynamic=self._FIELDS_DYNAMIC,
1776                        selected=self.op.output_fields)
1777
1778     # Lock all nodes, in shared mode
1779     # Temporary removal of locks, should be reverted later
1780     # TODO: reintroduce locks when they are lighter-weight
1781     self.needed_locks = {}
1782     #self.share_locks[locking.LEVEL_NODE] = 1
1783     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1784
1785   def CheckPrereq(self):
1786     """Check prerequisites.
1787
1788     """
1789
1790   @staticmethod
1791   def _DiagnoseByOS(node_list, rlist):
1792     """Remaps a per-node return list into an a per-os per-node dictionary
1793
1794     @param node_list: a list with the names of all nodes
1795     @param rlist: a map with node names as keys and OS objects as values
1796
1797     @rtype: dict
1798     @return: a dictionary with osnames as keys and as value another map, with
1799         nodes as keys and list of OS objects as values, eg::
1800
1801           {"debian-etch": {"node1": [<object>,...],
1802                            "node2": [<object>,]}
1803           }
1804
1805     """
1806     all_os = {}
1807     # we build here the list of nodes that didn't fail the RPC (at RPC
1808     # level), so that nodes with a non-responding node daemon don't
1809     # make all OSes invalid
1810     good_nodes = [node_name for node_name in rlist
1811                   if not rlist[node_name].failed]
1812     for node_name, nr in rlist.iteritems():
1813       if nr.failed or not nr.data:
1814         continue
1815       for os_obj in nr.data:
1816         if os_obj.name not in all_os:
1817           # build a list of nodes for this os containing empty lists
1818           # for each node in node_list
1819           all_os[os_obj.name] = {}
1820           for nname in good_nodes:
1821             all_os[os_obj.name][nname] = []
1822         all_os[os_obj.name][node_name].append(os_obj)
1823     return all_os
1824
1825   def Exec(self, feedback_fn):
1826     """Compute the list of OSes.
1827
1828     """
1829     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1830     node_data = self.rpc.call_os_diagnose(valid_nodes)
1831     if node_data == False:
1832       raise errors.OpExecError("Can't gather the list of OSes")
1833     pol = self._DiagnoseByOS(valid_nodes, node_data)
1834     output = []
1835     for os_name, os_data in pol.iteritems():
1836       row = []
1837       for field in self.op.output_fields:
1838         if field == "name":
1839           val = os_name
1840         elif field == "valid":
1841           val = utils.all([osl and osl[0] for osl in os_data.values()])
1842         elif field == "node_status":
1843           val = {}
1844           for node_name, nos_list in os_data.iteritems():
1845             val[node_name] = [(v.status, v.path) for v in nos_list]
1846         else:
1847           raise errors.ParameterError(field)
1848         row.append(val)
1849       output.append(row)
1850
1851     return output
1852
1853
1854 class LURemoveNode(LogicalUnit):
1855   """Logical unit for removing a node.
1856
1857   """
1858   HPATH = "node-remove"
1859   HTYPE = constants.HTYPE_NODE
1860   _OP_REQP = ["node_name"]
1861
1862   def BuildHooksEnv(self):
1863     """Build hooks env.
1864
1865     This doesn't run on the target node in the pre phase as a failed
1866     node would then be impossible to remove.
1867
1868     """
1869     env = {
1870       "OP_TARGET": self.op.node_name,
1871       "NODE_NAME": self.op.node_name,
1872       }
1873     all_nodes = self.cfg.GetNodeList()
1874     all_nodes.remove(self.op.node_name)
1875     return env, all_nodes, all_nodes
1876
1877   def CheckPrereq(self):
1878     """Check prerequisites.
1879
1880     This checks:
1881      - the node exists in the configuration
1882      - it does not have primary or secondary instances
1883      - it's not the master
1884
1885     Any errors are signalled by raising errors.OpPrereqError.
1886
1887     """
1888     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1889     if node is None:
1890       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1891
1892     instance_list = self.cfg.GetInstanceList()
1893
1894     masternode = self.cfg.GetMasterNode()
1895     if node.name == masternode:
1896       raise errors.OpPrereqError("Node is the master node,"
1897                                  " you need to failover first.")
1898
1899     for instance_name in instance_list:
1900       instance = self.cfg.GetInstanceInfo(instance_name)
1901       if node.name in instance.all_nodes:
1902         raise errors.OpPrereqError("Instance %s is still running on the node,"
1903                                    " please remove first." % instance_name)
1904     self.op.node_name = node.name
1905     self.node = node
1906
1907   def Exec(self, feedback_fn):
1908     """Removes the node from the cluster.
1909
1910     """
1911     node = self.node
1912     logging.info("Stopping the node daemon and removing configs from node %s",
1913                  node.name)
1914
1915     self.context.RemoveNode(node.name)
1916
1917     self.rpc.call_node_leave_cluster(node.name)
1918
1919     # Promote nodes to master candidate as needed
1920     _AdjustCandidatePool(self)
1921
1922
1923 class LUQueryNodes(NoHooksLU):
1924   """Logical unit for querying nodes.
1925
1926   """
1927   _OP_REQP = ["output_fields", "names", "use_locking"]
1928   REQ_BGL = False
1929   _FIELDS_DYNAMIC = utils.FieldSet(
1930     "dtotal", "dfree",
1931     "mtotal", "mnode", "mfree",
1932     "bootid",
1933     "ctotal", "cnodes", "csockets",
1934     )
1935
1936   _FIELDS_STATIC = utils.FieldSet(
1937     "name", "pinst_cnt", "sinst_cnt",
1938     "pinst_list", "sinst_list",
1939     "pip", "sip", "tags",
1940     "serial_no",
1941     "master_candidate",
1942     "master",
1943     "offline",
1944     "drained",
1945     )
1946
1947   def ExpandNames(self):
1948     _CheckOutputFields(static=self._FIELDS_STATIC,
1949                        dynamic=self._FIELDS_DYNAMIC,
1950                        selected=self.op.output_fields)
1951
1952     self.needed_locks = {}
1953     self.share_locks[locking.LEVEL_NODE] = 1
1954
1955     if self.op.names:
1956       self.wanted = _GetWantedNodes(self, self.op.names)
1957     else:
1958       self.wanted = locking.ALL_SET
1959
1960     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1961     self.do_locking = self.do_node_query and self.op.use_locking
1962     if self.do_locking:
1963       # if we don't request only static fields, we need to lock the nodes
1964       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1965
1966
1967   def CheckPrereq(self):
1968     """Check prerequisites.
1969
1970     """
1971     # The validation of the node list is done in the _GetWantedNodes,
1972     # if non empty, and if empty, there's no validation to do
1973     pass
1974
1975   def Exec(self, feedback_fn):
1976     """Computes the list of nodes and their attributes.
1977
1978     """
1979     all_info = self.cfg.GetAllNodesInfo()
1980     if self.do_locking:
1981       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1982     elif self.wanted != locking.ALL_SET:
1983       nodenames = self.wanted
1984       missing = set(nodenames).difference(all_info.keys())
1985       if missing:
1986         raise errors.OpExecError(
1987           "Some nodes were removed before retrieving their data: %s" % missing)
1988     else:
1989       nodenames = all_info.keys()
1990
1991     nodenames = utils.NiceSort(nodenames)
1992     nodelist = [all_info[name] for name in nodenames]
1993
1994     # begin data gathering
1995
1996     if self.do_node_query:
1997       live_data = {}
1998       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1999                                           self.cfg.GetHypervisorType())
2000       for name in nodenames:
2001         nodeinfo = node_data[name]
2002         if not nodeinfo.failed and nodeinfo.data:
2003           nodeinfo = nodeinfo.data
2004           fn = utils.TryConvert
2005           live_data[name] = {
2006             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2007             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2008             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2009             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2010             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2011             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2012             "bootid": nodeinfo.get('bootid', None),
2013             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2014             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2015             }
2016         else:
2017           live_data[name] = {}
2018     else:
2019       live_data = dict.fromkeys(nodenames, {})
2020
2021     node_to_primary = dict([(name, set()) for name in nodenames])
2022     node_to_secondary = dict([(name, set()) for name in nodenames])
2023
2024     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2025                              "sinst_cnt", "sinst_list"))
2026     if inst_fields & frozenset(self.op.output_fields):
2027       instancelist = self.cfg.GetInstanceList()
2028
2029       for instance_name in instancelist:
2030         inst = self.cfg.GetInstanceInfo(instance_name)
2031         if inst.primary_node in node_to_primary:
2032           node_to_primary[inst.primary_node].add(inst.name)
2033         for secnode in inst.secondary_nodes:
2034           if secnode in node_to_secondary:
2035             node_to_secondary[secnode].add(inst.name)
2036
2037     master_node = self.cfg.GetMasterNode()
2038
2039     # end data gathering
2040
2041     output = []
2042     for node in nodelist:
2043       node_output = []
2044       for field in self.op.output_fields:
2045         if field == "name":
2046           val = node.name
2047         elif field == "pinst_list":
2048           val = list(node_to_primary[node.name])
2049         elif field == "sinst_list":
2050           val = list(node_to_secondary[node.name])
2051         elif field == "pinst_cnt":
2052           val = len(node_to_primary[node.name])
2053         elif field == "sinst_cnt":
2054           val = len(node_to_secondary[node.name])
2055         elif field == "pip":
2056           val = node.primary_ip
2057         elif field == "sip":
2058           val = node.secondary_ip
2059         elif field == "tags":
2060           val = list(node.GetTags())
2061         elif field == "serial_no":
2062           val = node.serial_no
2063         elif field == "master_candidate":
2064           val = node.master_candidate
2065         elif field == "master":
2066           val = node.name == master_node
2067         elif field == "offline":
2068           val = node.offline
2069         elif field == "drained":
2070           val = node.drained
2071         elif self._FIELDS_DYNAMIC.Matches(field):
2072           val = live_data[node.name].get(field, None)
2073         else:
2074           raise errors.ParameterError(field)
2075         node_output.append(val)
2076       output.append(node_output)
2077
2078     return output
2079
2080
2081 class LUQueryNodeVolumes(NoHooksLU):
2082   """Logical unit for getting volumes on node(s).
2083
2084   """
2085   _OP_REQP = ["nodes", "output_fields"]
2086   REQ_BGL = False
2087   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2088   _FIELDS_STATIC = utils.FieldSet("node")
2089
2090   def ExpandNames(self):
2091     _CheckOutputFields(static=self._FIELDS_STATIC,
2092                        dynamic=self._FIELDS_DYNAMIC,
2093                        selected=self.op.output_fields)
2094
2095     self.needed_locks = {}
2096     self.share_locks[locking.LEVEL_NODE] = 1
2097     if not self.op.nodes:
2098       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2099     else:
2100       self.needed_locks[locking.LEVEL_NODE] = \
2101         _GetWantedNodes(self, self.op.nodes)
2102
2103   def CheckPrereq(self):
2104     """Check prerequisites.
2105
2106     This checks that the fields required are valid output fields.
2107
2108     """
2109     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2110
2111   def Exec(self, feedback_fn):
2112     """Computes the list of nodes and their attributes.
2113
2114     """
2115     nodenames = self.nodes
2116     volumes = self.rpc.call_node_volumes(nodenames)
2117
2118     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2119              in self.cfg.GetInstanceList()]
2120
2121     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2122
2123     output = []
2124     for node in nodenames:
2125       if node not in volumes or volumes[node].failed or not volumes[node].data:
2126         continue
2127
2128       node_vols = volumes[node].data[:]
2129       node_vols.sort(key=lambda vol: vol['dev'])
2130
2131       for vol in node_vols:
2132         node_output = []
2133         for field in self.op.output_fields:
2134           if field == "node":
2135             val = node
2136           elif field == "phys":
2137             val = vol['dev']
2138           elif field == "vg":
2139             val = vol['vg']
2140           elif field == "name":
2141             val = vol['name']
2142           elif field == "size":
2143             val = int(float(vol['size']))
2144           elif field == "instance":
2145             for inst in ilist:
2146               if node not in lv_by_node[inst]:
2147                 continue
2148               if vol['name'] in lv_by_node[inst][node]:
2149                 val = inst.name
2150                 break
2151             else:
2152               val = '-'
2153           else:
2154             raise errors.ParameterError(field)
2155           node_output.append(str(val))
2156
2157         output.append(node_output)
2158
2159     return output
2160
2161
2162 class LUAddNode(LogicalUnit):
2163   """Logical unit for adding node to the cluster.
2164
2165   """
2166   HPATH = "node-add"
2167   HTYPE = constants.HTYPE_NODE
2168   _OP_REQP = ["node_name"]
2169
2170   def BuildHooksEnv(self):
2171     """Build hooks env.
2172
2173     This will run on all nodes before, and on all nodes + the new node after.
2174
2175     """
2176     env = {
2177       "OP_TARGET": self.op.node_name,
2178       "NODE_NAME": self.op.node_name,
2179       "NODE_PIP": self.op.primary_ip,
2180       "NODE_SIP": self.op.secondary_ip,
2181       }
2182     nodes_0 = self.cfg.GetNodeList()
2183     nodes_1 = nodes_0 + [self.op.node_name, ]
2184     return env, nodes_0, nodes_1
2185
2186   def CheckPrereq(self):
2187     """Check prerequisites.
2188
2189     This checks:
2190      - the new node is not already in the config
2191      - it is resolvable
2192      - its parameters (single/dual homed) matches the cluster
2193
2194     Any errors are signalled by raising errors.OpPrereqError.
2195
2196     """
2197     node_name = self.op.node_name
2198     cfg = self.cfg
2199
2200     dns_data = utils.HostInfo(node_name)
2201
2202     node = dns_data.name
2203     primary_ip = self.op.primary_ip = dns_data.ip
2204     secondary_ip = getattr(self.op, "secondary_ip", None)
2205     if secondary_ip is None:
2206       secondary_ip = primary_ip
2207     if not utils.IsValidIP(secondary_ip):
2208       raise errors.OpPrereqError("Invalid secondary IP given")
2209     self.op.secondary_ip = secondary_ip
2210
2211     node_list = cfg.GetNodeList()
2212     if not self.op.readd and node in node_list:
2213       raise errors.OpPrereqError("Node %s is already in the configuration" %
2214                                  node)
2215     elif self.op.readd and node not in node_list:
2216       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2217
2218     for existing_node_name in node_list:
2219       existing_node = cfg.GetNodeInfo(existing_node_name)
2220
2221       if self.op.readd and node == existing_node_name:
2222         if (existing_node.primary_ip != primary_ip or
2223             existing_node.secondary_ip != secondary_ip):
2224           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2225                                      " address configuration as before")
2226         continue
2227
2228       if (existing_node.primary_ip == primary_ip or
2229           existing_node.secondary_ip == primary_ip or
2230           existing_node.primary_ip == secondary_ip or
2231           existing_node.secondary_ip == secondary_ip):
2232         raise errors.OpPrereqError("New node ip address(es) conflict with"
2233                                    " existing node %s" % existing_node.name)
2234
2235     # check that the type of the node (single versus dual homed) is the
2236     # same as for the master
2237     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2238     master_singlehomed = myself.secondary_ip == myself.primary_ip
2239     newbie_singlehomed = secondary_ip == primary_ip
2240     if master_singlehomed != newbie_singlehomed:
2241       if master_singlehomed:
2242         raise errors.OpPrereqError("The master has no private ip but the"
2243                                    " new node has one")
2244       else:
2245         raise errors.OpPrereqError("The master has a private ip but the"
2246                                    " new node doesn't have one")
2247
2248     # checks reachablity
2249     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2250       raise errors.OpPrereqError("Node not reachable by ping")
2251
2252     if not newbie_singlehomed:
2253       # check reachability from my secondary ip to newbie's secondary ip
2254       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2255                            source=myself.secondary_ip):
2256         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2257                                    " based ping to noded port")
2258
2259     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2260     mc_now, _ = self.cfg.GetMasterCandidateStats()
2261     master_candidate = mc_now < cp_size
2262
2263     self.new_node = objects.Node(name=node,
2264                                  primary_ip=primary_ip,
2265                                  secondary_ip=secondary_ip,
2266                                  master_candidate=master_candidate,
2267                                  offline=False, drained=False)
2268
2269   def Exec(self, feedback_fn):
2270     """Adds the new node to the cluster.
2271
2272     """
2273     new_node = self.new_node
2274     node = new_node.name
2275
2276     # check connectivity
2277     result = self.rpc.call_version([node])[node]
2278     result.Raise()
2279     if result.data:
2280       if constants.PROTOCOL_VERSION == result.data:
2281         logging.info("Communication to node %s fine, sw version %s match",
2282                      node, result.data)
2283       else:
2284         raise errors.OpExecError("Version mismatch master version %s,"
2285                                  " node version %s" %
2286                                  (constants.PROTOCOL_VERSION, result.data))
2287     else:
2288       raise errors.OpExecError("Cannot get version from the new node")
2289
2290     # setup ssh on node
2291     logging.info("Copy ssh key to node %s", node)
2292     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2293     keyarray = []
2294     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2295                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2296                 priv_key, pub_key]
2297
2298     for i in keyfiles:
2299       f = open(i, 'r')
2300       try:
2301         keyarray.append(f.read())
2302       finally:
2303         f.close()
2304
2305     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2306                                     keyarray[2],
2307                                     keyarray[3], keyarray[4], keyarray[5])
2308
2309     msg = result.RemoteFailMsg()
2310     if msg:
2311       raise errors.OpExecError("Cannot transfer ssh keys to the"
2312                                " new node: %s" % msg)
2313
2314     # Add node to our /etc/hosts, and add key to known_hosts
2315     if self.cfg.GetClusterInfo().modify_etc_hosts:
2316       utils.AddHostToEtcHosts(new_node.name)
2317
2318     if new_node.secondary_ip != new_node.primary_ip:
2319       result = self.rpc.call_node_has_ip_address(new_node.name,
2320                                                  new_node.secondary_ip)
2321       if result.failed or not result.data:
2322         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2323                                  " you gave (%s). Please fix and re-run this"
2324                                  " command." % new_node.secondary_ip)
2325
2326     node_verify_list = [self.cfg.GetMasterNode()]
2327     node_verify_param = {
2328       'nodelist': [node],
2329       # TODO: do a node-net-test as well?
2330     }
2331
2332     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2333                                        self.cfg.GetClusterName())
2334     for verifier in node_verify_list:
2335       if result[verifier].failed or not result[verifier].data:
2336         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2337                                  " for remote verification" % verifier)
2338       if result[verifier].data['nodelist']:
2339         for failed in result[verifier].data['nodelist']:
2340           feedback_fn("ssh/hostname verification failed %s -> %s" %
2341                       (verifier, result[verifier].data['nodelist'][failed]))
2342         raise errors.OpExecError("ssh/hostname verification failed.")
2343
2344     if self.op.readd:
2345       _RedistributeAncillaryFiles(self)
2346       self.context.ReaddNode(new_node)
2347     else:
2348       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2349       self.context.AddNode(new_node)
2350
2351
2352 class LUSetNodeParams(LogicalUnit):
2353   """Modifies the parameters of a node.
2354
2355   """
2356   HPATH = "node-modify"
2357   HTYPE = constants.HTYPE_NODE
2358   _OP_REQP = ["node_name"]
2359   REQ_BGL = False
2360
2361   def CheckArguments(self):
2362     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2363     if node_name is None:
2364       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2365     self.op.node_name = node_name
2366     _CheckBooleanOpField(self.op, 'master_candidate')
2367     _CheckBooleanOpField(self.op, 'offline')
2368     _CheckBooleanOpField(self.op, 'drained')
2369     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2370     if all_mods.count(None) == 3:
2371       raise errors.OpPrereqError("Please pass at least one modification")
2372     if all_mods.count(True) > 1:
2373       raise errors.OpPrereqError("Can't set the node into more than one"
2374                                  " state at the same time")
2375
2376   def ExpandNames(self):
2377     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2378
2379   def BuildHooksEnv(self):
2380     """Build hooks env.
2381
2382     This runs on the master node.
2383
2384     """
2385     env = {
2386       "OP_TARGET": self.op.node_name,
2387       "MASTER_CANDIDATE": str(self.op.master_candidate),
2388       "OFFLINE": str(self.op.offline),
2389       "DRAINED": str(self.op.drained),
2390       }
2391     nl = [self.cfg.GetMasterNode(),
2392           self.op.node_name]
2393     return env, nl, nl
2394
2395   def CheckPrereq(self):
2396     """Check prerequisites.
2397
2398     This only checks the instance list against the existing names.
2399
2400     """
2401     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2402
2403     if ((self.op.master_candidate == False or self.op.offline == True or
2404          self.op.drained == True) and node.master_candidate):
2405       # we will demote the node from master_candidate
2406       if self.op.node_name == self.cfg.GetMasterNode():
2407         raise errors.OpPrereqError("The master node has to be a"
2408                                    " master candidate, online and not drained")
2409       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2410       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2411       if num_candidates <= cp_size:
2412         msg = ("Not enough master candidates (desired"
2413                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2414         if self.op.force:
2415           self.LogWarning(msg)
2416         else:
2417           raise errors.OpPrereqError(msg)
2418
2419     if (self.op.master_candidate == True and
2420         ((node.offline and not self.op.offline == False) or
2421          (node.drained and not self.op.drained == False))):
2422       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2423                                  " to master_candidate" % node.name)
2424
2425     return
2426
2427   def Exec(self, feedback_fn):
2428     """Modifies a node.
2429
2430     """
2431     node = self.node
2432
2433     result = []
2434     changed_mc = False
2435
2436     if self.op.offline is not None:
2437       node.offline = self.op.offline
2438       result.append(("offline", str(self.op.offline)))
2439       if self.op.offline == True:
2440         if node.master_candidate:
2441           node.master_candidate = False
2442           changed_mc = True
2443           result.append(("master_candidate", "auto-demotion due to offline"))
2444         if node.drained:
2445           node.drained = False
2446           result.append(("drained", "clear drained status due to offline"))
2447
2448     if self.op.master_candidate is not None:
2449       node.master_candidate = self.op.master_candidate
2450       changed_mc = True
2451       result.append(("master_candidate", str(self.op.master_candidate)))
2452       if self.op.master_candidate == False:
2453         rrc = self.rpc.call_node_demote_from_mc(node.name)
2454         msg = rrc.RemoteFailMsg()
2455         if msg:
2456           self.LogWarning("Node failed to demote itself: %s" % msg)
2457
2458     if self.op.drained is not None:
2459       node.drained = self.op.drained
2460       result.append(("drained", str(self.op.drained)))
2461       if self.op.drained == True:
2462         if node.master_candidate:
2463           node.master_candidate = False
2464           changed_mc = True
2465           result.append(("master_candidate", "auto-demotion due to drain"))
2466         if node.offline:
2467           node.offline = False
2468           result.append(("offline", "clear offline status due to drain"))
2469
2470     # this will trigger configuration file update, if needed
2471     self.cfg.Update(node)
2472     # this will trigger job queue propagation or cleanup
2473     if changed_mc:
2474       self.context.ReaddNode(node)
2475
2476     return result
2477
2478
2479 class LUPowercycleNode(NoHooksLU):
2480   """Powercycles a node.
2481
2482   """
2483   _OP_REQP = ["node_name", "force"]
2484   REQ_BGL = False
2485
2486   def CheckArguments(self):
2487     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2488     if node_name is None:
2489       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2490     self.op.node_name = node_name
2491     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2492       raise errors.OpPrereqError("The node is the master and the force"
2493                                  " parameter was not set")
2494
2495   def ExpandNames(self):
2496     """Locking for PowercycleNode.
2497
2498     This is a last-resource option and shouldn't block on other
2499     jobs. Therefore, we grab no locks.
2500
2501     """
2502     self.needed_locks = {}
2503
2504   def CheckPrereq(self):
2505     """Check prerequisites.
2506
2507     This LU has no prereqs.
2508
2509     """
2510     pass
2511
2512   def Exec(self, feedback_fn):
2513     """Reboots a node.
2514
2515     """
2516     result = self.rpc.call_node_powercycle(self.op.node_name,
2517                                            self.cfg.GetHypervisorType())
2518     msg = result.RemoteFailMsg()
2519     if msg:
2520       raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2521     return result.payload
2522
2523
2524 class LUQueryClusterInfo(NoHooksLU):
2525   """Query cluster configuration.
2526
2527   """
2528   _OP_REQP = []
2529   REQ_BGL = False
2530
2531   def ExpandNames(self):
2532     self.needed_locks = {}
2533
2534   def CheckPrereq(self):
2535     """No prerequsites needed for this LU.
2536
2537     """
2538     pass
2539
2540   def Exec(self, feedback_fn):
2541     """Return cluster config.
2542
2543     """
2544     cluster = self.cfg.GetClusterInfo()
2545     result = {
2546       "software_version": constants.RELEASE_VERSION,
2547       "protocol_version": constants.PROTOCOL_VERSION,
2548       "config_version": constants.CONFIG_VERSION,
2549       "os_api_version": constants.OS_API_VERSION,
2550       "export_version": constants.EXPORT_VERSION,
2551       "architecture": (platform.architecture()[0], platform.machine()),
2552       "name": cluster.cluster_name,
2553       "master": cluster.master_node,
2554       "default_hypervisor": cluster.default_hypervisor,
2555       "enabled_hypervisors": cluster.enabled_hypervisors,
2556       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2557                         for hypervisor in cluster.enabled_hypervisors]),
2558       "beparams": cluster.beparams,
2559       "nicparams": cluster.nicparams,
2560       "candidate_pool_size": cluster.candidate_pool_size,
2561       "master_netdev": cluster.master_netdev,
2562       "volume_group_name": cluster.volume_group_name,
2563       "file_storage_dir": cluster.file_storage_dir,
2564       }
2565
2566     return result
2567
2568
2569 class LUQueryConfigValues(NoHooksLU):
2570   """Return configuration values.
2571
2572   """
2573   _OP_REQP = []
2574   REQ_BGL = False
2575   _FIELDS_DYNAMIC = utils.FieldSet()
2576   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2577
2578   def ExpandNames(self):
2579     self.needed_locks = {}
2580
2581     _CheckOutputFields(static=self._FIELDS_STATIC,
2582                        dynamic=self._FIELDS_DYNAMIC,
2583                        selected=self.op.output_fields)
2584
2585   def CheckPrereq(self):
2586     """No prerequisites.
2587
2588     """
2589     pass
2590
2591   def Exec(self, feedback_fn):
2592     """Dump a representation of the cluster config to the standard output.
2593
2594     """
2595     values = []
2596     for field in self.op.output_fields:
2597       if field == "cluster_name":
2598         entry = self.cfg.GetClusterName()
2599       elif field == "master_node":
2600         entry = self.cfg.GetMasterNode()
2601       elif field == "drain_flag":
2602         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2603       else:
2604         raise errors.ParameterError(field)
2605       values.append(entry)
2606     return values
2607
2608
2609 class LUActivateInstanceDisks(NoHooksLU):
2610   """Bring up an instance's disks.
2611
2612   """
2613   _OP_REQP = ["instance_name"]
2614   REQ_BGL = False
2615
2616   def ExpandNames(self):
2617     self._ExpandAndLockInstance()
2618     self.needed_locks[locking.LEVEL_NODE] = []
2619     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2620
2621   def DeclareLocks(self, level):
2622     if level == locking.LEVEL_NODE:
2623       self._LockInstancesNodes()
2624
2625   def CheckPrereq(self):
2626     """Check prerequisites.
2627
2628     This checks that the instance is in the cluster.
2629
2630     """
2631     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2632     assert self.instance is not None, \
2633       "Cannot retrieve locked instance %s" % self.op.instance_name
2634     _CheckNodeOnline(self, self.instance.primary_node)
2635
2636   def Exec(self, feedback_fn):
2637     """Activate the disks.
2638
2639     """
2640     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2641     if not disks_ok:
2642       raise errors.OpExecError("Cannot activate block devices")
2643
2644     return disks_info
2645
2646
2647 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2648   """Prepare the block devices for an instance.
2649
2650   This sets up the block devices on all nodes.
2651
2652   @type lu: L{LogicalUnit}
2653   @param lu: the logical unit on whose behalf we execute
2654   @type instance: L{objects.Instance}
2655   @param instance: the instance for whose disks we assemble
2656   @type ignore_secondaries: boolean
2657   @param ignore_secondaries: if true, errors on secondary nodes
2658       won't result in an error return from the function
2659   @return: False if the operation failed, otherwise a list of
2660       (host, instance_visible_name, node_visible_name)
2661       with the mapping from node devices to instance devices
2662
2663   """
2664   device_info = []
2665   disks_ok = True
2666   iname = instance.name
2667   # With the two passes mechanism we try to reduce the window of
2668   # opportunity for the race condition of switching DRBD to primary
2669   # before handshaking occured, but we do not eliminate it
2670
2671   # The proper fix would be to wait (with some limits) until the
2672   # connection has been made and drbd transitions from WFConnection
2673   # into any other network-connected state (Connected, SyncTarget,
2674   # SyncSource, etc.)
2675
2676   # 1st pass, assemble on all nodes in secondary mode
2677   for inst_disk in instance.disks:
2678     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2679       lu.cfg.SetDiskID(node_disk, node)
2680       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2681       msg = result.RemoteFailMsg()
2682       if msg:
2683         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2684                            " (is_primary=False, pass=1): %s",
2685                            inst_disk.iv_name, node, msg)
2686         if not ignore_secondaries:
2687           disks_ok = False
2688
2689   # FIXME: race condition on drbd migration to primary
2690
2691   # 2nd pass, do only the primary node
2692   for inst_disk in instance.disks:
2693     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2694       if node != instance.primary_node:
2695         continue
2696       lu.cfg.SetDiskID(node_disk, node)
2697       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2698       msg = result.RemoteFailMsg()
2699       if msg:
2700         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2701                            " (is_primary=True, pass=2): %s",
2702                            inst_disk.iv_name, node, msg)
2703         disks_ok = False
2704     device_info.append((instance.primary_node, inst_disk.iv_name,
2705                         result.payload))
2706
2707   # leave the disks configured for the primary node
2708   # this is a workaround that would be fixed better by
2709   # improving the logical/physical id handling
2710   for disk in instance.disks:
2711     lu.cfg.SetDiskID(disk, instance.primary_node)
2712
2713   return disks_ok, device_info
2714
2715
2716 def _StartInstanceDisks(lu, instance, force):
2717   """Start the disks of an instance.
2718
2719   """
2720   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2721                                            ignore_secondaries=force)
2722   if not disks_ok:
2723     _ShutdownInstanceDisks(lu, instance)
2724     if force is not None and not force:
2725       lu.proc.LogWarning("", hint="If the message above refers to a"
2726                          " secondary node,"
2727                          " you can retry the operation using '--force'.")
2728     raise errors.OpExecError("Disk consistency error")
2729
2730
2731 class LUDeactivateInstanceDisks(NoHooksLU):
2732   """Shutdown an instance's disks.
2733
2734   """
2735   _OP_REQP = ["instance_name"]
2736   REQ_BGL = False
2737
2738   def ExpandNames(self):
2739     self._ExpandAndLockInstance()
2740     self.needed_locks[locking.LEVEL_NODE] = []
2741     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2742
2743   def DeclareLocks(self, level):
2744     if level == locking.LEVEL_NODE:
2745       self._LockInstancesNodes()
2746
2747   def CheckPrereq(self):
2748     """Check prerequisites.
2749
2750     This checks that the instance is in the cluster.
2751
2752     """
2753     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2754     assert self.instance is not None, \
2755       "Cannot retrieve locked instance %s" % self.op.instance_name
2756
2757   def Exec(self, feedback_fn):
2758     """Deactivate the disks
2759
2760     """
2761     instance = self.instance
2762     _SafeShutdownInstanceDisks(self, instance)
2763
2764
2765 def _SafeShutdownInstanceDisks(lu, instance):
2766   """Shutdown block devices of an instance.
2767
2768   This function checks if an instance is running, before calling
2769   _ShutdownInstanceDisks.
2770
2771   """
2772   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2773                                       [instance.hypervisor])
2774   ins_l = ins_l[instance.primary_node]
2775   if ins_l.failed or not isinstance(ins_l.data, list):
2776     raise errors.OpExecError("Can't contact node '%s'" %
2777                              instance.primary_node)
2778
2779   if instance.name in ins_l.data:
2780     raise errors.OpExecError("Instance is running, can't shutdown"
2781                              " block devices.")
2782
2783   _ShutdownInstanceDisks(lu, instance)
2784
2785
2786 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2787   """Shutdown block devices of an instance.
2788
2789   This does the shutdown on all nodes of the instance.
2790
2791   If the ignore_primary is false, errors on the primary node are
2792   ignored.
2793
2794   """
2795   all_result = True
2796   for disk in instance.disks:
2797     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2798       lu.cfg.SetDiskID(top_disk, node)
2799       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2800       msg = result.RemoteFailMsg()
2801       if msg:
2802         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2803                       disk.iv_name, node, msg)
2804         if not ignore_primary or node != instance.primary_node:
2805           all_result = False
2806   return all_result
2807
2808
2809 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2810   """Checks if a node has enough free memory.
2811
2812   This function check if a given node has the needed amount of free
2813   memory. In case the node has less memory or we cannot get the
2814   information from the node, this function raise an OpPrereqError
2815   exception.
2816
2817   @type lu: C{LogicalUnit}
2818   @param lu: a logical unit from which we get configuration data
2819   @type node: C{str}
2820   @param node: the node to check
2821   @type reason: C{str}
2822   @param reason: string to use in the error message
2823   @type requested: C{int}
2824   @param requested: the amount of memory in MiB to check for
2825   @type hypervisor_name: C{str}
2826   @param hypervisor_name: the hypervisor to ask for memory stats
2827   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2828       we cannot check the node
2829
2830   """
2831   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2832   nodeinfo[node].Raise()
2833   free_mem = nodeinfo[node].data.get('memory_free')
2834   if not isinstance(free_mem, int):
2835     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2836                              " was '%s'" % (node, free_mem))
2837   if requested > free_mem:
2838     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2839                              " needed %s MiB, available %s MiB" %
2840                              (node, reason, requested, free_mem))
2841
2842
2843 class LUStartupInstance(LogicalUnit):
2844   """Starts an instance.
2845
2846   """
2847   HPATH = "instance-start"
2848   HTYPE = constants.HTYPE_INSTANCE
2849   _OP_REQP = ["instance_name", "force"]
2850   REQ_BGL = False
2851
2852   def ExpandNames(self):
2853     self._ExpandAndLockInstance()
2854
2855   def BuildHooksEnv(self):
2856     """Build hooks env.
2857
2858     This runs on master, primary and secondary nodes of the instance.
2859
2860     """
2861     env = {
2862       "FORCE": self.op.force,
2863       }
2864     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2865     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2866     return env, nl, nl
2867
2868   def CheckPrereq(self):
2869     """Check prerequisites.
2870
2871     This checks that the instance is in the cluster.
2872
2873     """
2874     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2875     assert self.instance is not None, \
2876       "Cannot retrieve locked instance %s" % self.op.instance_name
2877
2878     # extra beparams
2879     self.beparams = getattr(self.op, "beparams", {})
2880     if self.beparams:
2881       if not isinstance(self.beparams, dict):
2882         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2883                                    " dict" % (type(self.beparams), ))
2884       # fill the beparams dict
2885       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2886       self.op.beparams = self.beparams
2887
2888     # extra hvparams
2889     self.hvparams = getattr(self.op, "hvparams", {})
2890     if self.hvparams:
2891       if not isinstance(self.hvparams, dict):
2892         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2893                                    " dict" % (type(self.hvparams), ))
2894
2895       # check hypervisor parameter syntax (locally)
2896       cluster = self.cfg.GetClusterInfo()
2897       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2898       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2899                                     instance.hvparams)
2900       filled_hvp.update(self.hvparams)
2901       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2902       hv_type.CheckParameterSyntax(filled_hvp)
2903       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2904       self.op.hvparams = self.hvparams
2905
2906     _CheckNodeOnline(self, instance.primary_node)
2907
2908     bep = self.cfg.GetClusterInfo().FillBE(instance)
2909     # check bridges existance
2910     _CheckInstanceBridgesExist(self, instance)
2911
2912     remote_info = self.rpc.call_instance_info(instance.primary_node,
2913                                               instance.name,
2914                                               instance.hypervisor)
2915     remote_info.Raise()
2916     if not remote_info.data:
2917       _CheckNodeFreeMemory(self, instance.primary_node,
2918                            "starting instance %s" % instance.name,
2919                            bep[constants.BE_MEMORY], instance.hypervisor)
2920
2921   def Exec(self, feedback_fn):
2922     """Start the instance.
2923
2924     """
2925     instance = self.instance
2926     force = self.op.force
2927
2928     self.cfg.MarkInstanceUp(instance.name)
2929
2930     node_current = instance.primary_node
2931
2932     _StartInstanceDisks(self, instance, force)
2933
2934     result = self.rpc.call_instance_start(node_current, instance,
2935                                           self.hvparams, self.beparams)
2936     msg = result.RemoteFailMsg()
2937     if msg:
2938       _ShutdownInstanceDisks(self, instance)
2939       raise errors.OpExecError("Could not start instance: %s" % msg)
2940
2941
2942 class LURebootInstance(LogicalUnit):
2943   """Reboot an instance.
2944
2945   """
2946   HPATH = "instance-reboot"
2947   HTYPE = constants.HTYPE_INSTANCE
2948   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2949   REQ_BGL = False
2950
2951   def ExpandNames(self):
2952     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2953                                    constants.INSTANCE_REBOOT_HARD,
2954                                    constants.INSTANCE_REBOOT_FULL]:
2955       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2956                                   (constants.INSTANCE_REBOOT_SOFT,
2957                                    constants.INSTANCE_REBOOT_HARD,
2958                                    constants.INSTANCE_REBOOT_FULL))
2959     self._ExpandAndLockInstance()
2960
2961   def BuildHooksEnv(self):
2962     """Build hooks env.
2963
2964     This runs on master, primary and secondary nodes of the instance.
2965
2966     """
2967     env = {
2968       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2969       "REBOOT_TYPE": self.op.reboot_type,
2970       }
2971     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2972     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2973     return env, nl, nl
2974
2975   def CheckPrereq(self):
2976     """Check prerequisites.
2977
2978     This checks that the instance is in the cluster.
2979
2980     """
2981     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2982     assert self.instance is not None, \
2983       "Cannot retrieve locked instance %s" % self.op.instance_name
2984
2985     _CheckNodeOnline(self, instance.primary_node)
2986
2987     # check bridges existance
2988     _CheckInstanceBridgesExist(self, instance)
2989
2990   def Exec(self, feedback_fn):
2991     """Reboot the instance.
2992
2993     """
2994     instance = self.instance
2995     ignore_secondaries = self.op.ignore_secondaries
2996     reboot_type = self.op.reboot_type
2997
2998     node_current = instance.primary_node
2999
3000     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3001                        constants.INSTANCE_REBOOT_HARD]:
3002       for disk in instance.disks:
3003         self.cfg.SetDiskID(disk, node_current)
3004       result = self.rpc.call_instance_reboot(node_current, instance,
3005                                              reboot_type)
3006       msg = result.RemoteFailMsg()
3007       if msg:
3008         raise errors.OpExecError("Could not reboot instance: %s" % msg)
3009     else:
3010       result = self.rpc.call_instance_shutdown(node_current, instance)
3011       msg = result.RemoteFailMsg()
3012       if msg:
3013         raise errors.OpExecError("Could not shutdown instance for"
3014                                  " full reboot: %s" % msg)
3015       _ShutdownInstanceDisks(self, instance)
3016       _StartInstanceDisks(self, instance, ignore_secondaries)
3017       result = self.rpc.call_instance_start(node_current, instance, None, None)
3018       msg = result.RemoteFailMsg()
3019       if msg:
3020         _ShutdownInstanceDisks(self, instance)
3021         raise errors.OpExecError("Could not start instance for"
3022                                  " full reboot: %s" % msg)
3023
3024     self.cfg.MarkInstanceUp(instance.name)
3025
3026
3027 class LUShutdownInstance(LogicalUnit):
3028   """Shutdown an instance.
3029
3030   """
3031   HPATH = "instance-stop"
3032   HTYPE = constants.HTYPE_INSTANCE
3033   _OP_REQP = ["instance_name"]
3034   REQ_BGL = False
3035
3036   def ExpandNames(self):
3037     self._ExpandAndLockInstance()
3038
3039   def BuildHooksEnv(self):
3040     """Build hooks env.
3041
3042     This runs on master, primary and secondary nodes of the instance.
3043
3044     """
3045     env = _BuildInstanceHookEnvByObject(self, self.instance)
3046     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3047     return env, nl, nl
3048
3049   def CheckPrereq(self):
3050     """Check prerequisites.
3051
3052     This checks that the instance is in the cluster.
3053
3054     """
3055     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3056     assert self.instance is not None, \
3057       "Cannot retrieve locked instance %s" % self.op.instance_name
3058     _CheckNodeOnline(self, self.instance.primary_node)
3059
3060   def Exec(self, feedback_fn):
3061     """Shutdown the instance.
3062
3063     """
3064     instance = self.instance
3065     node_current = instance.primary_node
3066     self.cfg.MarkInstanceDown(instance.name)
3067     result = self.rpc.call_instance_shutdown(node_current, instance)
3068     msg = result.RemoteFailMsg()
3069     if msg:
3070       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3071
3072     _ShutdownInstanceDisks(self, instance)
3073
3074
3075 class LUReinstallInstance(LogicalUnit):
3076   """Reinstall an instance.
3077
3078   """
3079   HPATH = "instance-reinstall"
3080   HTYPE = constants.HTYPE_INSTANCE
3081   _OP_REQP = ["instance_name"]
3082   REQ_BGL = False
3083
3084   def ExpandNames(self):
3085     self._ExpandAndLockInstance()
3086
3087   def BuildHooksEnv(self):
3088     """Build hooks env.
3089
3090     This runs on master, primary and secondary nodes of the instance.
3091
3092     """
3093     env = _BuildInstanceHookEnvByObject(self, self.instance)
3094     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3095     return env, nl, nl
3096
3097   def CheckPrereq(self):
3098     """Check prerequisites.
3099
3100     This checks that the instance is in the cluster and is not running.
3101
3102     """
3103     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3104     assert instance is not None, \
3105       "Cannot retrieve locked instance %s" % self.op.instance_name
3106     _CheckNodeOnline(self, instance.primary_node)
3107
3108     if instance.disk_template == constants.DT_DISKLESS:
3109       raise errors.OpPrereqError("Instance '%s' has no disks" %
3110                                  self.op.instance_name)
3111     if instance.admin_up:
3112       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3113                                  self.op.instance_name)
3114     remote_info = self.rpc.call_instance_info(instance.primary_node,
3115                                               instance.name,
3116                                               instance.hypervisor)
3117     remote_info.Raise()
3118     if remote_info.data:
3119       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3120                                  (self.op.instance_name,
3121                                   instance.primary_node))
3122
3123     self.op.os_type = getattr(self.op, "os_type", None)
3124     if self.op.os_type is not None:
3125       # OS verification
3126       pnode = self.cfg.GetNodeInfo(
3127         self.cfg.ExpandNodeName(instance.primary_node))
3128       if pnode is None:
3129         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3130                                    self.op.pnode)
3131       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3132       result.Raise()
3133       if not isinstance(result.data, objects.OS):
3134         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3135                                    " primary node"  % self.op.os_type)
3136
3137     self.instance = instance
3138
3139   def Exec(self, feedback_fn):
3140     """Reinstall the instance.
3141
3142     """
3143     inst = self.instance
3144
3145     if self.op.os_type is not None:
3146       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3147       inst.os = self.op.os_type
3148       self.cfg.Update(inst)
3149
3150     _StartInstanceDisks(self, inst, None)
3151     try:
3152       feedback_fn("Running the instance OS create scripts...")
3153       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3154       msg = result.RemoteFailMsg()
3155       if msg:
3156         raise errors.OpExecError("Could not install OS for instance %s"
3157                                  " on node %s: %s" %
3158                                  (inst.name, inst.primary_node, msg))
3159     finally:
3160       _ShutdownInstanceDisks(self, inst)
3161
3162
3163 class LURenameInstance(LogicalUnit):
3164   """Rename an instance.
3165
3166   """
3167   HPATH = "instance-rename"
3168   HTYPE = constants.HTYPE_INSTANCE
3169   _OP_REQP = ["instance_name", "new_name"]
3170
3171   def BuildHooksEnv(self):
3172     """Build hooks env.
3173
3174     This runs on master, primary and secondary nodes of the instance.
3175
3176     """
3177     env = _BuildInstanceHookEnvByObject(self, self.instance)
3178     env["INSTANCE_NEW_NAME"] = self.op.new_name
3179     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3180     return env, nl, nl
3181
3182   def CheckPrereq(self):
3183     """Check prerequisites.
3184
3185     This checks that the instance is in the cluster and is not running.
3186
3187     """
3188     instance = self.cfg.GetInstanceInfo(
3189       self.cfg.ExpandInstanceName(self.op.instance_name))
3190     if instance is None:
3191       raise errors.OpPrereqError("Instance '%s' not known" %
3192                                  self.op.instance_name)
3193     _CheckNodeOnline(self, instance.primary_node)
3194
3195     if instance.admin_up:
3196       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3197                                  self.op.instance_name)
3198     remote_info = self.rpc.call_instance_info(instance.primary_node,
3199                                               instance.name,
3200                                               instance.hypervisor)
3201     remote_info.Raise()
3202     if remote_info.data:
3203       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3204                                  (self.op.instance_name,
3205                                   instance.primary_node))
3206     self.instance = instance
3207
3208     # new name verification
3209     name_info = utils.HostInfo(self.op.new_name)
3210
3211     self.op.new_name = new_name = name_info.name
3212     instance_list = self.cfg.GetInstanceList()
3213     if new_name in instance_list:
3214       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3215                                  new_name)
3216
3217     if not getattr(self.op, "ignore_ip", False):
3218       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3219         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3220                                    (name_info.ip, new_name))
3221
3222
3223   def Exec(self, feedback_fn):
3224     """Reinstall the instance.
3225
3226     """
3227     inst = self.instance
3228     old_name = inst.name
3229
3230     if inst.disk_template == constants.DT_FILE:
3231       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3232
3233     self.cfg.RenameInstance(inst.name, self.op.new_name)
3234     # Change the instance lock. This is definitely safe while we hold the BGL
3235     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3236     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3237
3238     # re-read the instance from the configuration after rename
3239     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3240
3241     if inst.disk_template == constants.DT_FILE:
3242       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3243       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3244                                                      old_file_storage_dir,
3245                                                      new_file_storage_dir)
3246       result.Raise()
3247       if not result.data:
3248         raise errors.OpExecError("Could not connect to node '%s' to rename"
3249                                  " directory '%s' to '%s' (but the instance"
3250                                  " has been renamed in Ganeti)" % (
3251                                  inst.primary_node, old_file_storage_dir,
3252                                  new_file_storage_dir))
3253
3254       if not result.data[0]:
3255         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3256                                  " (but the instance has been renamed in"
3257                                  " Ganeti)" % (old_file_storage_dir,
3258                                                new_file_storage_dir))
3259
3260     _StartInstanceDisks(self, inst, None)
3261     try:
3262       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3263                                                  old_name)
3264       msg = result.RemoteFailMsg()
3265       if msg:
3266         msg = ("Could not run OS rename script for instance %s on node %s"
3267                " (but the instance has been renamed in Ganeti): %s" %
3268                (inst.name, inst.primary_node, msg))
3269         self.proc.LogWarning(msg)
3270     finally:
3271       _ShutdownInstanceDisks(self, inst)
3272
3273
3274 class LURemoveInstance(LogicalUnit):
3275   """Remove an instance.
3276
3277   """
3278   HPATH = "instance-remove"
3279   HTYPE = constants.HTYPE_INSTANCE
3280   _OP_REQP = ["instance_name", "ignore_failures"]
3281   REQ_BGL = False
3282
3283   def ExpandNames(self):
3284     self._ExpandAndLockInstance()
3285     self.needed_locks[locking.LEVEL_NODE] = []
3286     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3287
3288   def DeclareLocks(self, level):
3289     if level == locking.LEVEL_NODE:
3290       self._LockInstancesNodes()
3291
3292   def BuildHooksEnv(self):
3293     """Build hooks env.
3294
3295     This runs on master, primary and secondary nodes of the instance.
3296
3297     """
3298     env = _BuildInstanceHookEnvByObject(self, self.instance)
3299     nl = [self.cfg.GetMasterNode()]
3300     return env, nl, nl
3301
3302   def CheckPrereq(self):
3303     """Check prerequisites.
3304
3305     This checks that the instance is in the cluster.
3306
3307     """
3308     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3309     assert self.instance is not None, \
3310       "Cannot retrieve locked instance %s" % self.op.instance_name
3311
3312   def Exec(self, feedback_fn):
3313     """Remove the instance.
3314
3315     """
3316     instance = self.instance
3317     logging.info("Shutting down instance %s on node %s",
3318                  instance.name, instance.primary_node)
3319
3320     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3321     msg = result.RemoteFailMsg()
3322     if msg:
3323       if self.op.ignore_failures:
3324         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3325       else:
3326         raise errors.OpExecError("Could not shutdown instance %s on"
3327                                  " node %s: %s" %
3328                                  (instance.name, instance.primary_node, msg))
3329
3330     logging.info("Removing block devices for instance %s", instance.name)
3331
3332     if not _RemoveDisks(self, instance):
3333       if self.op.ignore_failures:
3334         feedback_fn("Warning: can't remove instance's disks")
3335       else:
3336         raise errors.OpExecError("Can't remove instance's disks")
3337
3338     logging.info("Removing instance %s out of cluster config", instance.name)
3339
3340     self.cfg.RemoveInstance(instance.name)
3341     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3342
3343
3344 class LUQueryInstances(NoHooksLU):
3345   """Logical unit for querying instances.
3346
3347   """
3348   _OP_REQP = ["output_fields", "names", "use_locking"]
3349   REQ_BGL = False
3350   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3351                                     "admin_state",
3352                                     "disk_template", "ip", "mac", "bridge",
3353                                     "sda_size", "sdb_size", "vcpus", "tags",
3354                                     "network_port", "beparams",
3355                                     r"(disk)\.(size)/([0-9]+)",
3356                                     r"(disk)\.(sizes)", "disk_usage",
3357                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3358                                     r"(nic)\.(macs|ips|bridges)",
3359                                     r"(disk|nic)\.(count)",
3360                                     "serial_no", "hypervisor", "hvparams",] +
3361                                   ["hv/%s" % name
3362                                    for name in constants.HVS_PARAMETERS] +
3363                                   ["be/%s" % name
3364                                    for name in constants.BES_PARAMETERS])
3365   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3366
3367
3368   def ExpandNames(self):
3369     _CheckOutputFields(static=self._FIELDS_STATIC,
3370                        dynamic=self._FIELDS_DYNAMIC,
3371                        selected=self.op.output_fields)
3372
3373     self.needed_locks = {}
3374     self.share_locks[locking.LEVEL_INSTANCE] = 1
3375     self.share_locks[locking.LEVEL_NODE] = 1
3376
3377     if self.op.names:
3378       self.wanted = _GetWantedInstances(self, self.op.names)
3379     else:
3380       self.wanted = locking.ALL_SET
3381
3382     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3383     self.do_locking = self.do_node_query and self.op.use_locking
3384     if self.do_locking:
3385       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3386       self.needed_locks[locking.LEVEL_NODE] = []
3387       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3388
3389   def DeclareLocks(self, level):
3390     if level == locking.LEVEL_NODE and self.do_locking:
3391       self._LockInstancesNodes()
3392
3393   def CheckPrereq(self):
3394     """Check prerequisites.
3395
3396     """
3397     pass
3398
3399   def Exec(self, feedback_fn):
3400     """Computes the list of nodes and their attributes.
3401
3402     """
3403     all_info = self.cfg.GetAllInstancesInfo()
3404     if self.wanted == locking.ALL_SET:
3405       # caller didn't specify instance names, so ordering is not important
3406       if self.do_locking:
3407         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3408       else:
3409         instance_names = all_info.keys()
3410       instance_names = utils.NiceSort(instance_names)
3411     else:
3412       # caller did specify names, so we must keep the ordering
3413       if self.do_locking:
3414         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3415       else:
3416         tgt_set = all_info.keys()
3417       missing = set(self.wanted).difference(tgt_set)
3418       if missing:
3419         raise errors.OpExecError("Some instances were removed before"
3420                                  " retrieving their data: %s" % missing)
3421       instance_names = self.wanted
3422
3423     instance_list = [all_info[iname] for iname in instance_names]
3424
3425     # begin data gathering
3426
3427     nodes = frozenset([inst.primary_node for inst in instance_list])
3428     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3429
3430     bad_nodes = []
3431     off_nodes = []
3432     if self.do_node_query:
3433       live_data = {}
3434       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3435       for name in nodes:
3436         result = node_data[name]
3437         if result.offline:
3438           # offline nodes will be in both lists
3439           off_nodes.append(name)
3440         if result.failed:
3441           bad_nodes.append(name)
3442         else:
3443           if result.data:
3444             live_data.update(result.data)
3445             # else no instance is alive
3446     else:
3447       live_data = dict([(name, {}) for name in instance_names])
3448
3449     # end data gathering
3450
3451     HVPREFIX = "hv/"
3452     BEPREFIX = "be/"
3453     output = []
3454     for instance in instance_list:
3455       iout = []
3456       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3457       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3458       for field in self.op.output_fields:
3459         st_match = self._FIELDS_STATIC.Matches(field)
3460         if field == "name":
3461           val = instance.name
3462         elif field == "os":
3463           val = instance.os
3464         elif field == "pnode":
3465           val = instance.primary_node
3466         elif field == "snodes":
3467           val = list(instance.secondary_nodes)
3468         elif field == "admin_state":
3469           val = instance.admin_up
3470         elif field == "oper_state":
3471           if instance.primary_node in bad_nodes:
3472             val = None
3473           else:
3474             val = bool(live_data.get(instance.name))
3475         elif field == "status":
3476           if instance.primary_node in off_nodes:
3477             val = "ERROR_nodeoffline"
3478           elif instance.primary_node in bad_nodes:
3479             val = "ERROR_nodedown"
3480           else:
3481             running = bool(live_data.get(instance.name))
3482             if running:
3483               if instance.admin_up:
3484                 val = "running"
3485               else:
3486                 val = "ERROR_up"
3487             else:
3488               if instance.admin_up:
3489                 val = "ERROR_down"
3490               else:
3491                 val = "ADMIN_down"
3492         elif field == "oper_ram":
3493           if instance.primary_node in bad_nodes:
3494             val = None
3495           elif instance.name in live_data:
3496             val = live_data[instance.name].get("memory", "?")
3497           else:
3498             val = "-"
3499         elif field == "disk_template":
3500           val = instance.disk_template
3501         elif field == "ip":
3502           val = instance.nics[0].ip
3503         elif field == "bridge":
3504           val = instance.nics[0].bridge
3505         elif field == "mac":
3506           val = instance.nics[0].mac
3507         elif field == "sda_size" or field == "sdb_size":
3508           idx = ord(field[2]) - ord('a')
3509           try:
3510             val = instance.FindDisk(idx).size
3511           except errors.OpPrereqError:
3512             val = None
3513         elif field == "disk_usage": # total disk usage per node
3514           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3515           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3516         elif field == "tags":
3517           val = list(instance.GetTags())
3518         elif field == "serial_no":
3519           val = instance.serial_no
3520         elif field == "network_port":
3521           val = instance.network_port
3522         elif field == "hypervisor":
3523           val = instance.hypervisor
3524         elif field == "hvparams":
3525           val = i_hv
3526         elif (field.startswith(HVPREFIX) and
3527               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3528           val = i_hv.get(field[len(HVPREFIX):], None)
3529         elif field == "beparams":
3530           val = i_be
3531         elif (field.startswith(BEPREFIX) and
3532               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3533           val = i_be.get(field[len(BEPREFIX):], None)
3534         elif st_match and st_match.groups():
3535           # matches a variable list
3536           st_groups = st_match.groups()
3537           if st_groups and st_groups[0] == "disk":
3538             if st_groups[1] == "count":
3539               val = len(instance.disks)
3540             elif st_groups[1] == "sizes":
3541               val = [disk.size for disk in instance.disks]
3542             elif st_groups[1] == "size":
3543               try:
3544                 val = instance.FindDisk(st_groups[2]).size
3545               except errors.OpPrereqError:
3546                 val = None
3547             else:
3548               assert False, "Unhandled disk parameter"
3549           elif st_groups[0] == "nic":
3550             if st_groups[1] == "count":
3551               val = len(instance.nics)
3552             elif st_groups[1] == "macs":
3553               val = [nic.mac for nic in instance.nics]
3554             elif st_groups[1] == "ips":
3555               val = [nic.ip for nic in instance.nics]
3556             elif st_groups[1] == "bridges":
3557               val = [nic.bridge for nic in instance.nics]
3558             else:
3559               # index-based item
3560               nic_idx = int(st_groups[2])
3561               if nic_idx >= len(instance.nics):
3562                 val = None
3563               else:
3564                 if st_groups[1] == "mac":
3565                   val = instance.nics[nic_idx].mac
3566                 elif st_groups[1] == "ip":
3567                   val = instance.nics[nic_idx].ip
3568                 elif st_groups[1] == "bridge":
3569                   val = instance.nics[nic_idx].bridge
3570                 else:
3571                   assert False, "Unhandled NIC parameter"
3572           else:
3573             assert False, "Unhandled variable parameter"
3574         else:
3575           raise errors.ParameterError(field)
3576         iout.append(val)
3577       output.append(iout)
3578
3579     return output
3580
3581
3582 class LUFailoverInstance(LogicalUnit):
3583   """Failover an instance.
3584
3585   """
3586   HPATH = "instance-failover"
3587   HTYPE = constants.HTYPE_INSTANCE
3588   _OP_REQP = ["instance_name", "ignore_consistency"]
3589   REQ_BGL = False
3590
3591   def ExpandNames(self):
3592     self._ExpandAndLockInstance()
3593     self.needed_locks[locking.LEVEL_NODE] = []
3594     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3595
3596   def DeclareLocks(self, level):
3597     if level == locking.LEVEL_NODE:
3598       self._LockInstancesNodes()
3599
3600   def BuildHooksEnv(self):
3601     """Build hooks env.
3602
3603     This runs on master, primary and secondary nodes of the instance.
3604
3605     """
3606     env = {
3607       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3608       }
3609     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3610     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3611     return env, nl, nl
3612
3613   def CheckPrereq(self):
3614     """Check prerequisites.
3615
3616     This checks that the instance is in the cluster.
3617
3618     """
3619     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3620     assert self.instance is not None, \
3621       "Cannot retrieve locked instance %s" % self.op.instance_name
3622
3623     bep = self.cfg.GetClusterInfo().FillBE(instance)
3624     if instance.disk_template not in constants.DTS_NET_MIRROR:
3625       raise errors.OpPrereqError("Instance's disk layout is not"
3626                                  " network mirrored, cannot failover.")
3627
3628     secondary_nodes = instance.secondary_nodes
3629     if not secondary_nodes:
3630       raise errors.ProgrammerError("no secondary node but using "
3631                                    "a mirrored disk template")
3632
3633     target_node = secondary_nodes[0]
3634     _CheckNodeOnline(self, target_node)
3635     _CheckNodeNotDrained(self, target_node)
3636     # check memory requirements on the secondary node
3637     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3638                          instance.name, bep[constants.BE_MEMORY],
3639                          instance.hypervisor)
3640     # check bridge existance
3641     _CheckInstanceBridgesExist(self, instance, node=target_node)
3642
3643   def Exec(self, feedback_fn):
3644     """Failover an instance.
3645
3646     The failover is done by shutting it down on its present node and
3647     starting it on the secondary.
3648
3649     """
3650     instance = self.instance
3651
3652     source_node = instance.primary_node
3653     target_node = instance.secondary_nodes[0]
3654
3655     feedback_fn("* checking disk consistency between source and target")
3656     for dev in instance.disks:
3657       # for drbd, these are drbd over lvm
3658       if not _CheckDiskConsistency(self, dev, target_node, False):
3659         if instance.admin_up and not self.op.ignore_consistency:
3660           raise errors.OpExecError("Disk %s is degraded on target node,"
3661                                    " aborting failover." % dev.iv_name)
3662
3663     feedback_fn("* shutting down instance on source node")
3664     logging.info("Shutting down instance %s on node %s",
3665                  instance.name, source_node)
3666
3667     result = self.rpc.call_instance_shutdown(source_node, instance)
3668     msg = result.RemoteFailMsg()
3669     if msg:
3670       if self.op.ignore_consistency:
3671         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3672                              " Proceeding anyway. Please make sure node"
3673                              " %s is down. Error details: %s",
3674                              instance.name, source_node, source_node, msg)
3675       else:
3676         raise errors.OpExecError("Could not shutdown instance %s on"
3677                                  " node %s: %s" %
3678                                  (instance.name, source_node, msg))
3679
3680     feedback_fn("* deactivating the instance's disks on source node")
3681     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3682       raise errors.OpExecError("Can't shut down the instance's disks.")
3683
3684     instance.primary_node = target_node
3685     # distribute new instance config to the other nodes
3686     self.cfg.Update(instance)
3687
3688     # Only start the instance if it's marked as up
3689     if instance.admin_up:
3690       feedback_fn("* activating the instance's disks on target node")
3691       logging.info("Starting instance %s on node %s",
3692                    instance.name, target_node)
3693
3694       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3695                                                ignore_secondaries=True)
3696       if not disks_ok:
3697         _ShutdownInstanceDisks(self, instance)
3698         raise errors.OpExecError("Can't activate the instance's disks")
3699
3700       feedback_fn("* starting the instance on the target node")
3701       result = self.rpc.call_instance_start(target_node, instance, None, None)
3702       msg = result.RemoteFailMsg()
3703       if msg:
3704         _ShutdownInstanceDisks(self, instance)
3705         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3706                                  (instance.name, target_node, msg))
3707
3708
3709 class LUMigrateInstance(LogicalUnit):
3710   """Migrate an instance.
3711
3712   This is migration without shutting down, compared to the failover,
3713   which is done with shutdown.
3714
3715   """
3716   HPATH = "instance-migrate"
3717   HTYPE = constants.HTYPE_INSTANCE
3718   _OP_REQP = ["instance_name", "live", "cleanup"]
3719
3720   REQ_BGL = False
3721
3722   def ExpandNames(self):
3723     self._ExpandAndLockInstance()
3724     self.needed_locks[locking.LEVEL_NODE] = []
3725     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3726
3727   def DeclareLocks(self, level):
3728     if level == locking.LEVEL_NODE:
3729       self._LockInstancesNodes()
3730
3731   def BuildHooksEnv(self):
3732     """Build hooks env.
3733
3734     This runs on master, primary and secondary nodes of the instance.
3735
3736     """
3737     env = _BuildInstanceHookEnvByObject(self, self.instance)
3738     env["MIGRATE_LIVE"] = self.op.live
3739     env["MIGRATE_CLEANUP"] = self.op.cleanup
3740     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3741     return env, nl, nl
3742
3743   def CheckPrereq(self):
3744     """Check prerequisites.
3745
3746     This checks that the instance is in the cluster.
3747
3748     """
3749     instance = self.cfg.GetInstanceInfo(
3750       self.cfg.ExpandInstanceName(self.op.instance_name))
3751     if instance is None:
3752       raise errors.OpPrereqError("Instance '%s' not known" %
3753                                  self.op.instance_name)
3754
3755     if instance.disk_template != constants.DT_DRBD8:
3756       raise errors.OpPrereqError("Instance's disk layout is not"
3757                                  " drbd8, cannot migrate.")
3758
3759     secondary_nodes = instance.secondary_nodes
3760     if not secondary_nodes:
3761       raise errors.ConfigurationError("No secondary node but using"
3762                                       " drbd8 disk template")
3763
3764     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3765
3766     target_node = secondary_nodes[0]
3767     # check memory requirements on the secondary node
3768     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3769                          instance.name, i_be[constants.BE_MEMORY],
3770                          instance.hypervisor)
3771
3772     # check bridge existance
3773     _CheckInstanceBridgesExist(self, instance, node=target_node)
3774
3775     if not self.op.cleanup:
3776       _CheckNodeNotDrained(self, target_node)
3777       result = self.rpc.call_instance_migratable(instance.primary_node,
3778                                                  instance)
3779       msg = result.RemoteFailMsg()
3780       if msg:
3781         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3782                                    msg)
3783
3784     self.instance = instance
3785
3786   def _WaitUntilSync(self):
3787     """Poll with custom rpc for disk sync.
3788
3789     This uses our own step-based rpc call.
3790
3791     """
3792     self.feedback_fn("* wait until resync is done")
3793     all_done = False
3794     while not all_done:
3795       all_done = True
3796       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3797                                             self.nodes_ip,
3798                                             self.instance.disks)
3799       min_percent = 100
3800       for node, nres in result.items():
3801         msg = nres.RemoteFailMsg()
3802         if msg:
3803           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3804                                    (node, msg))
3805         node_done, node_percent = nres.payload
3806         all_done = all_done and node_done
3807         if node_percent is not None:
3808           min_percent = min(min_percent, node_percent)
3809       if not all_done:
3810         if min_percent < 100:
3811           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3812         time.sleep(2)
3813
3814   def _EnsureSecondary(self, node):
3815     """Demote a node to secondary.
3816
3817     """
3818     self.feedback_fn("* switching node %s to secondary mode" % node)
3819
3820     for dev in self.instance.disks:
3821       self.cfg.SetDiskID(dev, node)
3822
3823     result = self.rpc.call_blockdev_close(node, self.instance.name,
3824                                           self.instance.disks)
3825     msg = result.RemoteFailMsg()
3826     if msg:
3827       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3828                                " error %s" % (node, msg))
3829
3830   def _GoStandalone(self):
3831     """Disconnect from the network.
3832
3833     """
3834     self.feedback_fn("* changing into standalone mode")
3835     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3836                                                self.instance.disks)
3837     for node, nres in result.items():
3838       msg = nres.RemoteFailMsg()
3839       if msg:
3840         raise errors.OpExecError("Cannot disconnect disks node %s,"
3841                                  " error %s" % (node, msg))
3842
3843   def _GoReconnect(self, multimaster):
3844     """Reconnect to the network.
3845
3846     """
3847     if multimaster:
3848       msg = "dual-master"
3849     else:
3850       msg = "single-master"
3851     self.feedback_fn("* changing disks into %s mode" % msg)
3852     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3853                                            self.instance.disks,
3854                                            self.instance.name, multimaster)
3855     for node, nres in result.items():
3856       msg = nres.RemoteFailMsg()
3857       if msg:
3858         raise errors.OpExecError("Cannot change disks config on node %s,"
3859                                  " error: %s" % (node, msg))
3860
3861   def _ExecCleanup(self):
3862     """Try to cleanup after a failed migration.
3863
3864     The cleanup is done by:
3865       - check that the instance is running only on one node
3866         (and update the config if needed)
3867       - change disks on its secondary node to secondary
3868       - wait until disks are fully synchronized
3869       - disconnect from the network
3870       - change disks into single-master mode
3871       - wait again until disks are fully synchronized
3872
3873     """
3874     instance = self.instance
3875     target_node = self.target_node
3876     source_node = self.source_node
3877
3878     # check running on only one node
3879     self.feedback_fn("* checking where the instance actually runs"
3880                      " (if this hangs, the hypervisor might be in"
3881                      " a bad state)")
3882     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3883     for node, result in ins_l.items():
3884       result.Raise()
3885       if not isinstance(result.data, list):
3886         raise errors.OpExecError("Can't contact node '%s'" % node)
3887
3888     runningon_source = instance.name in ins_l[source_node].data
3889     runningon_target = instance.name in ins_l[target_node].data
3890
3891     if runningon_source and runningon_target:
3892       raise errors.OpExecError("Instance seems to be running on two nodes,"
3893                                " or the hypervisor is confused. You will have"
3894                                " to ensure manually that it runs only on one"
3895                                " and restart this operation.")
3896
3897     if not (runningon_source or runningon_target):
3898       raise errors.OpExecError("Instance does not seem to be running at all."
3899                                " In this case, it's safer to repair by"
3900                                " running 'gnt-instance stop' to ensure disk"
3901                                " shutdown, and then restarting it.")
3902
3903     if runningon_target:
3904       # the migration has actually succeeded, we need to update the config
3905       self.feedback_fn("* instance running on secondary node (%s),"
3906                        " updating config" % target_node)
3907       instance.primary_node = target_node
3908       self.cfg.Update(instance)
3909       demoted_node = source_node
3910     else:
3911       self.feedback_fn("* instance confirmed to be running on its"
3912                        " primary node (%s)" % source_node)
3913       demoted_node = target_node
3914
3915     self._EnsureSecondary(demoted_node)
3916     try:
3917       self._WaitUntilSync()
3918     except errors.OpExecError:
3919       # we ignore here errors, since if the device is standalone, it
3920       # won't be able to sync
3921       pass
3922     self._GoStandalone()
3923     self._GoReconnect(False)
3924     self._WaitUntilSync()
3925
3926     self.feedback_fn("* done")
3927
3928   def _RevertDiskStatus(self):
3929     """Try to revert the disk status after a failed migration.
3930
3931     """
3932     target_node = self.target_node
3933     try:
3934       self._EnsureSecondary(target_node)
3935       self._GoStandalone()
3936       self._GoReconnect(False)
3937       self._WaitUntilSync()
3938     except errors.OpExecError, err:
3939       self.LogWarning("Migration failed and I can't reconnect the"
3940                       " drives: error '%s'\n"
3941                       "Please look and recover the instance status" %
3942                       str(err))
3943
3944   def _AbortMigration(self):
3945     """Call the hypervisor code to abort a started migration.
3946
3947     """
3948     instance = self.instance
3949     target_node = self.target_node
3950     migration_info = self.migration_info
3951
3952     abort_result = self.rpc.call_finalize_migration(target_node,
3953                                                     instance,
3954                                                     migration_info,
3955                                                     False)
3956     abort_msg = abort_result.RemoteFailMsg()
3957     if abort_msg:
3958       logging.error("Aborting migration failed on target node %s: %s" %
3959                     (target_node, abort_msg))
3960       # Don't raise an exception here, as we stil have to try to revert the
3961       # disk status, even if this step failed.
3962
3963   def _ExecMigration(self):
3964     """Migrate an instance.
3965
3966     The migrate is done by:
3967       - change the disks into dual-master mode
3968       - wait until disks are fully synchronized again
3969       - migrate the instance
3970       - change disks on the new secondary node (the old primary) to secondary
3971       - wait until disks are fully synchronized
3972       - change disks into single-master mode
3973
3974     """
3975     instance = self.instance
3976     target_node = self.target_node
3977     source_node = self.source_node
3978
3979     self.feedback_fn("* checking disk consistency between source and target")
3980     for dev in instance.disks:
3981       if not _CheckDiskConsistency(self, dev, target_node, False):
3982         raise errors.OpExecError("Disk %s is degraded or not fully"
3983                                  " synchronized on target node,"
3984                                  " aborting migrate." % dev.iv_name)
3985
3986     # First get the migration information from the remote node
3987     result = self.rpc.call_migration_info(source_node, instance)
3988     msg = result.RemoteFailMsg()
3989     if msg:
3990       log_err = ("Failed fetching source migration information from %s: %s" %
3991                  (source_node, msg))
3992       logging.error(log_err)
3993       raise errors.OpExecError(log_err)
3994
3995     self.migration_info = migration_info = result.payload
3996
3997     # Then switch the disks to master/master mode
3998     self._EnsureSecondary(target_node)
3999     self._GoStandalone()
4000     self._GoReconnect(True)
4001     self._WaitUntilSync()
4002
4003     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4004     result = self.rpc.call_accept_instance(target_node,
4005                                            instance,
4006                                            migration_info,
4007                                            self.nodes_ip[target_node])
4008
4009     msg = result.RemoteFailMsg()
4010     if msg:
4011       logging.error("Instance pre-migration failed, trying to revert"
4012                     " disk status: %s", msg)
4013       self._AbortMigration()
4014       self._RevertDiskStatus()
4015       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4016                                (instance.name, msg))
4017
4018     self.feedback_fn("* migrating instance to %s" % target_node)
4019     time.sleep(10)
4020     result = self.rpc.call_instance_migrate(source_node, instance,
4021                                             self.nodes_ip[target_node],
4022                                             self.op.live)
4023     msg = result.RemoteFailMsg()
4024     if msg:
4025       logging.error("Instance migration failed, trying to revert"
4026                     " disk status: %s", msg)
4027       self._AbortMigration()
4028       self._RevertDiskStatus()
4029       raise errors.OpExecError("Could not migrate instance %s: %s" %
4030                                (instance.name, msg))
4031     time.sleep(10)
4032
4033     instance.primary_node = target_node
4034     # distribute new instance config to the other nodes
4035     self.cfg.Update(instance)
4036
4037     result = self.rpc.call_finalize_migration(target_node,
4038                                               instance,
4039                                               migration_info,
4040                                               True)
4041     msg = result.RemoteFailMsg()
4042     if msg:
4043       logging.error("Instance migration succeeded, but finalization failed:"
4044                     " %s" % msg)
4045       raise errors.OpExecError("Could not finalize instance migration: %s" %
4046                                msg)
4047
4048     self._EnsureSecondary(source_node)
4049     self._WaitUntilSync()
4050     self._GoStandalone()
4051     self._GoReconnect(False)
4052     self._WaitUntilSync()
4053
4054     self.feedback_fn("* done")
4055
4056   def Exec(self, feedback_fn):
4057     """Perform the migration.
4058
4059     """
4060     self.feedback_fn = feedback_fn
4061
4062     self.source_node = self.instance.primary_node
4063     self.target_node = self.instance.secondary_nodes[0]
4064     self.all_nodes = [self.source_node, self.target_node]
4065     self.nodes_ip = {
4066       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4067       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4068       }
4069     if self.op.cleanup:
4070       return self._ExecCleanup()
4071     else:
4072       return self._ExecMigration()
4073
4074
4075 def _CreateBlockDev(lu, node, instance, device, force_create,
4076                     info, force_open):
4077   """Create a tree of block devices on a given node.
4078
4079   If this device type has to be created on secondaries, create it and
4080   all its children.
4081
4082   If not, just recurse to children keeping the same 'force' value.
4083
4084   @param lu: the lu on whose behalf we execute
4085   @param node: the node on which to create the device
4086   @type instance: L{objects.Instance}
4087   @param instance: the instance which owns the device
4088   @type device: L{objects.Disk}
4089   @param device: the device to create
4090   @type force_create: boolean
4091   @param force_create: whether to force creation of this device; this
4092       will be change to True whenever we find a device which has
4093       CreateOnSecondary() attribute
4094   @param info: the extra 'metadata' we should attach to the device
4095       (this will be represented as a LVM tag)
4096   @type force_open: boolean
4097   @param force_open: this parameter will be passes to the
4098       L{backend.BlockdevCreate} function where it specifies
4099       whether we run on primary or not, and it affects both
4100       the child assembly and the device own Open() execution
4101
4102   """
4103   if device.CreateOnSecondary():
4104     force_create = True
4105
4106   if device.children:
4107     for child in device.children:
4108       _CreateBlockDev(lu, node, instance, child, force_create,
4109                       info, force_open)
4110
4111   if not force_create:
4112     return
4113
4114   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4115
4116
4117 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4118   """Create a single block device on a given node.
4119
4120   This will not recurse over children of the device, so they must be
4121   created in advance.
4122
4123   @param lu: the lu on whose behalf we execute
4124   @param node: the node on which to create the device
4125   @type instance: L{objects.Instance}
4126   @param instance: the instance which owns the device
4127   @type device: L{objects.Disk}
4128   @param device: the device to create
4129   @param info: the extra 'metadata' we should attach to the device
4130       (this will be represented as a LVM tag)
4131   @type force_open: boolean
4132   @param force_open: this parameter will be passes to the
4133       L{backend.BlockdevCreate} function where it specifies
4134       whether we run on primary or not, and it affects both
4135       the child assembly and the device own Open() execution
4136
4137   """
4138   lu.cfg.SetDiskID(device, node)
4139   result = lu.rpc.call_blockdev_create(node, device, device.size,
4140                                        instance.name, force_open, info)
4141   msg = result.RemoteFailMsg()
4142   if msg:
4143     raise errors.OpExecError("Can't create block device %s on"
4144                              " node %s for instance %s: %s" %
4145                              (device, node, instance.name, msg))
4146   if device.physical_id is None:
4147     device.physical_id = result.payload
4148
4149
4150 def _GenerateUniqueNames(lu, exts):
4151   """Generate a suitable LV name.
4152
4153   This will generate a logical volume name for the given instance.
4154
4155   """
4156   results = []
4157   for val in exts:
4158     new_id = lu.cfg.GenerateUniqueID()
4159     results.append("%s%s" % (new_id, val))
4160   return results
4161
4162
4163 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4164                          p_minor, s_minor):
4165   """Generate a drbd8 device complete with its children.
4166
4167   """
4168   port = lu.cfg.AllocatePort()
4169   vgname = lu.cfg.GetVGName()
4170   shared_secret = lu.cfg.GenerateDRBDSecret()
4171   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4172                           logical_id=(vgname, names[0]))
4173   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4174                           logical_id=(vgname, names[1]))
4175   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4176                           logical_id=(primary, secondary, port,
4177                                       p_minor, s_minor,
4178                                       shared_secret),
4179                           children=[dev_data, dev_meta],
4180                           iv_name=iv_name)
4181   return drbd_dev
4182
4183
4184 def _GenerateDiskTemplate(lu, template_name,
4185                           instance_name, primary_node,
4186                           secondary_nodes, disk_info,
4187                           file_storage_dir, file_driver,
4188                           base_index):
4189   """Generate the entire disk layout for a given template type.
4190
4191   """
4192   #TODO: compute space requirements
4193
4194   vgname = lu.cfg.GetVGName()
4195   disk_count = len(disk_info)
4196   disks = []
4197   if template_name == constants.DT_DISKLESS:
4198     pass
4199   elif template_name == constants.DT_PLAIN:
4200     if len(secondary_nodes) != 0:
4201       raise errors.ProgrammerError("Wrong template configuration")
4202
4203     names = _GenerateUniqueNames(lu, [".disk%d" % i
4204                                       for i in range(disk_count)])
4205     for idx, disk in enumerate(disk_info):
4206       disk_index = idx + base_index
4207       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4208                               logical_id=(vgname, names[idx]),
4209                               iv_name="disk/%d" % disk_index,
4210                               mode=disk["mode"])
4211       disks.append(disk_dev)
4212   elif template_name == constants.DT_DRBD8:
4213     if len(secondary_nodes) != 1:
4214       raise errors.ProgrammerError("Wrong template configuration")
4215     remote_node = secondary_nodes[0]
4216     minors = lu.cfg.AllocateDRBDMinor(
4217       [primary_node, remote_node] * len(disk_info), instance_name)
4218
4219     names = []
4220     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4221                                                for i in range(disk_count)]):
4222       names.append(lv_prefix + "_data")
4223       names.append(lv_prefix + "_meta")
4224     for idx, disk in enumerate(disk_info):
4225       disk_index = idx + base_index
4226       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4227                                       disk["size"], names[idx*2:idx*2+2],
4228                                       "disk/%d" % disk_index,
4229                                       minors[idx*2], minors[idx*2+1])
4230       disk_dev.mode = disk["mode"]
4231       disks.append(disk_dev)
4232   elif template_name == constants.DT_FILE:
4233     if len(secondary_nodes) != 0:
4234       raise errors.ProgrammerError("Wrong template configuration")
4235
4236     for idx, disk in enumerate(disk_info):
4237       disk_index = idx + base_index
4238       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4239                               iv_name="disk/%d" % disk_index,
4240                               logical_id=(file_driver,
4241                                           "%s/disk%d" % (file_storage_dir,
4242                                                          disk_index)),
4243                               mode=disk["mode"])
4244       disks.append(disk_dev)
4245   else:
4246     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4247   return disks
4248
4249
4250 def _GetInstanceInfoText(instance):
4251   """Compute that text that should be added to the disk's metadata.
4252
4253   """
4254   return "originstname+%s" % instance.name
4255
4256
4257 def _CreateDisks(lu, instance):
4258   """Create all disks for an instance.
4259
4260   This abstracts away some work from AddInstance.
4261
4262   @type lu: L{LogicalUnit}
4263   @param lu: the logical unit on whose behalf we execute
4264   @type instance: L{objects.Instance}
4265   @param instance: the instance whose disks we should create
4266   @rtype: boolean
4267   @return: the success of the creation
4268
4269   """
4270   info = _GetInstanceInfoText(instance)
4271   pnode = instance.primary_node
4272
4273   if instance.disk_template == constants.DT_FILE:
4274     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4275     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4276
4277     if result.failed or not result.data:
4278       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4279
4280     if not result.data[0]:
4281       raise errors.OpExecError("Failed to create directory '%s'" %
4282                                file_storage_dir)
4283
4284   # Note: this needs to be kept in sync with adding of disks in
4285   # LUSetInstanceParams
4286   for device in instance.disks:
4287     logging.info("Creating volume %s for instance %s",
4288                  device.iv_name, instance.name)
4289     #HARDCODE
4290     for node in instance.all_nodes:
4291       f_create = node == pnode
4292       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4293
4294
4295 def _RemoveDisks(lu, instance):
4296   """Remove all disks for an instance.
4297
4298   This abstracts away some work from `AddInstance()` and
4299   `RemoveInstance()`. Note that in case some of the devices couldn't
4300   be removed, the removal will continue with the other ones (compare
4301   with `_CreateDisks()`).
4302
4303   @type lu: L{LogicalUnit}
4304   @param lu: the logical unit on whose behalf we execute
4305   @type instance: L{objects.Instance}
4306   @param instance: the instance whose disks we should remove
4307   @rtype: boolean
4308   @return: the success of the removal
4309
4310   """
4311   logging.info("Removing block devices for instance %s", instance.name)
4312
4313   all_result = True
4314   for device in instance.disks:
4315     for node, disk in device.ComputeNodeTree(instance.primary_node):
4316       lu.cfg.SetDiskID(disk, node)
4317       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4318       if msg:
4319         lu.LogWarning("Could not remove block device %s on node %s,"
4320                       " continuing anyway: %s", device.iv_name, node, msg)
4321         all_result = False
4322
4323   if instance.disk_template == constants.DT_FILE:
4324     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4325     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4326                                                  file_storage_dir)
4327     if result.failed or not result.data:
4328       logging.error("Could not remove directory '%s'", file_storage_dir)
4329       all_result = False
4330
4331   return all_result
4332
4333
4334 def _ComputeDiskSize(disk_template, disks):
4335   """Compute disk size requirements in the volume group
4336
4337   """
4338   # Required free disk space as a function of disk and swap space
4339   req_size_dict = {
4340     constants.DT_DISKLESS: None,
4341     constants.DT_PLAIN: sum(d["size"] for d in disks),
4342     # 128 MB are added for drbd metadata for each disk
4343     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4344     constants.DT_FILE: None,
4345   }
4346
4347   if disk_template not in req_size_dict:
4348     raise errors.ProgrammerError("Disk template '%s' size requirement"
4349                                  " is unknown" %  disk_template)
4350
4351   return req_size_dict[disk_template]
4352
4353
4354 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4355   """Hypervisor parameter validation.
4356
4357   This function abstract the hypervisor parameter validation to be
4358   used in both instance create and instance modify.
4359
4360   @type lu: L{LogicalUnit}
4361   @param lu: the logical unit for which we check
4362   @type nodenames: list
4363   @param nodenames: the list of nodes on which we should check
4364   @type hvname: string
4365   @param hvname: the name of the hypervisor we should use
4366   @type hvparams: dict
4367   @param hvparams: the parameters which we need to check
4368   @raise errors.OpPrereqError: if the parameters are not valid
4369
4370   """
4371   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4372                                                   hvname,
4373                                                   hvparams)
4374   for node in nodenames:
4375     info = hvinfo[node]
4376     if info.offline:
4377       continue
4378     msg = info.RemoteFailMsg()
4379     if msg:
4380       raise errors.OpPrereqError("Hypervisor parameter validation"
4381                                  " failed on node %s: %s" % (node, msg))
4382
4383
4384 class LUCreateInstance(LogicalUnit):
4385   """Create an instance.
4386
4387   """
4388   HPATH = "instance-add"
4389   HTYPE = constants.HTYPE_INSTANCE
4390   _OP_REQP = ["instance_name", "disks", "disk_template",
4391               "mode", "start",
4392               "wait_for_sync", "ip_check", "nics",
4393               "hvparams", "beparams"]
4394   REQ_BGL = False
4395
4396   def _ExpandNode(self, node):
4397     """Expands and checks one node name.
4398
4399     """
4400     node_full = self.cfg.ExpandNodeName(node)
4401     if node_full is None:
4402       raise errors.OpPrereqError("Unknown node %s" % node)
4403     return node_full
4404
4405   def ExpandNames(self):
4406     """ExpandNames for CreateInstance.
4407
4408     Figure out the right locks for instance creation.
4409
4410     """
4411     self.needed_locks = {}
4412
4413     # set optional parameters to none if they don't exist
4414     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4415       if not hasattr(self.op, attr):
4416         setattr(self.op, attr, None)
4417
4418     # cheap checks, mostly valid constants given
4419
4420     # verify creation mode
4421     if self.op.mode not in (constants.INSTANCE_CREATE,
4422                             constants.INSTANCE_IMPORT):
4423       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4424                                  self.op.mode)
4425
4426     # disk template and mirror node verification
4427     if self.op.disk_template not in constants.DISK_TEMPLATES:
4428       raise errors.OpPrereqError("Invalid disk template name")
4429
4430     if self.op.hypervisor is None:
4431       self.op.hypervisor = self.cfg.GetHypervisorType()
4432
4433     cluster = self.cfg.GetClusterInfo()
4434     enabled_hvs = cluster.enabled_hypervisors
4435     if self.op.hypervisor not in enabled_hvs:
4436       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4437                                  " cluster (%s)" % (self.op.hypervisor,
4438                                   ",".join(enabled_hvs)))
4439
4440     # check hypervisor parameter syntax (locally)
4441     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4442     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4443                                   self.op.hvparams)
4444     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4445     hv_type.CheckParameterSyntax(filled_hvp)
4446
4447     # fill and remember the beparams dict
4448     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4449     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4450                                     self.op.beparams)
4451
4452     #### instance parameters check
4453
4454     # instance name verification
4455     hostname1 = utils.HostInfo(self.op.instance_name)
4456     self.op.instance_name = instance_name = hostname1.name
4457
4458     # this is just a preventive check, but someone might still add this
4459     # instance in the meantime, and creation will fail at lock-add time
4460     if instance_name in self.cfg.GetInstanceList():
4461       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4462                                  instance_name)
4463
4464     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4465
4466     # NIC buildup
4467     self.nics = []
4468     for idx, nic in enumerate(self.op.nics):
4469       nic_mode_req = nic.get("mode", None)
4470       nic_mode = nic_mode_req
4471       if nic_mode is None:
4472         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4473
4474       # in routed mode, for the first nic, the default ip is 'auto'
4475       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4476         default_ip_mode = constants.VALUE_AUTO
4477       else:
4478         default_ip_mode = constants.VALUE_NONE
4479
4480       # ip validity checks
4481       ip = nic.get("ip", default_ip_mode)
4482       if ip is None or ip.lower() == constants.VALUE_NONE:
4483         nic_ip = None
4484       elif ip.lower() == constants.VALUE_AUTO:
4485         nic_ip = hostname1.ip
4486       else:
4487         if not utils.IsValidIP(ip):
4488           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4489                                      " like a valid IP" % ip)
4490         nic_ip = ip
4491
4492       # TODO: check the ip for uniqueness !!
4493       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4494         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4495
4496       # MAC address verification
4497       mac = nic.get("mac", constants.VALUE_AUTO)
4498       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4499         if not utils.IsValidMac(mac.lower()):
4500           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4501                                      mac)
4502       # bridge verification
4503       bridge = nic.get("bridge", None)
4504       link = nic.get("link", None)
4505       if bridge and link:
4506         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4507       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4508         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4509       elif bridge:
4510         link = bridge
4511
4512       nicparams = {}
4513       if nic_mode_req:
4514         nicparams[constants.NIC_MODE] = nic_mode_req
4515       if link:
4516         nicparams[constants.NIC_LINK] = link
4517
4518       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4519                                       nicparams)
4520       objects.NIC.CheckParameterSyntax(check_params)
4521       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4522
4523     # disk checks/pre-build
4524     self.disks = []
4525     for disk in self.op.disks:
4526       mode = disk.get("mode", constants.DISK_RDWR)
4527       if mode not in constants.DISK_ACCESS_SET:
4528         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4529                                    mode)
4530       size = disk.get("size", None)
4531       if size is None:
4532         raise errors.OpPrereqError("Missing disk size")
4533       try:
4534         size = int(size)
4535       except ValueError:
4536         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4537       self.disks.append({"size": size, "mode": mode})
4538
4539     # used in CheckPrereq for ip ping check
4540     self.check_ip = hostname1.ip
4541
4542     # file storage checks
4543     if (self.op.file_driver and
4544         not self.op.file_driver in constants.FILE_DRIVER):
4545       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4546                                  self.op.file_driver)
4547
4548     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4549       raise errors.OpPrereqError("File storage directory path not absolute")
4550
4551     ### Node/iallocator related checks
4552     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4553       raise errors.OpPrereqError("One and only one of iallocator and primary"
4554                                  " node must be given")
4555
4556     if self.op.iallocator:
4557       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4558     else:
4559       self.op.pnode = self._ExpandNode(self.op.pnode)
4560       nodelist = [self.op.pnode]
4561       if self.op.snode is not None:
4562         self.op.snode = self._ExpandNode(self.op.snode)
4563         nodelist.append(self.op.snode)
4564       self.needed_locks[locking.LEVEL_NODE] = nodelist
4565
4566     # in case of import lock the source node too
4567     if self.op.mode == constants.INSTANCE_IMPORT:
4568       src_node = getattr(self.op, "src_node", None)
4569       src_path = getattr(self.op, "src_path", None)
4570
4571       if src_path is None:
4572         self.op.src_path = src_path = self.op.instance_name
4573
4574       if src_node is None:
4575         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4576         self.op.src_node = None
4577         if os.path.isabs(src_path):
4578           raise errors.OpPrereqError("Importing an instance from an absolute"
4579                                      " path requires a source node option.")
4580       else:
4581         self.op.src_node = src_node = self._ExpandNode(src_node)
4582         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4583           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4584         if not os.path.isabs(src_path):
4585           self.op.src_path = src_path = \
4586             os.path.join(constants.EXPORT_DIR, src_path)
4587
4588     else: # INSTANCE_CREATE
4589       if getattr(self.op, "os_type", None) is None:
4590         raise errors.OpPrereqError("No guest OS specified")
4591
4592   def _RunAllocator(self):
4593     """Run the allocator based on input opcode.
4594
4595     """
4596     nics = [n.ToDict() for n in self.nics]
4597     ial = IAllocator(self,
4598                      mode=constants.IALLOCATOR_MODE_ALLOC,
4599                      name=self.op.instance_name,
4600                      disk_template=self.op.disk_template,
4601                      tags=[],
4602                      os=self.op.os_type,
4603                      vcpus=self.be_full[constants.BE_VCPUS],
4604                      mem_size=self.be_full[constants.BE_MEMORY],
4605                      disks=self.disks,
4606                      nics=nics,
4607                      hypervisor=self.op.hypervisor,
4608                      )
4609
4610     ial.Run(self.op.iallocator)
4611
4612     if not ial.success:
4613       raise errors.OpPrereqError("Can't compute nodes using"
4614                                  " iallocator '%s': %s" % (self.op.iallocator,
4615                                                            ial.info))
4616     if len(ial.nodes) != ial.required_nodes:
4617       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4618                                  " of nodes (%s), required %s" %
4619                                  (self.op.iallocator, len(ial.nodes),
4620                                   ial.required_nodes))
4621     self.op.pnode = ial.nodes[0]
4622     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4623                  self.op.instance_name, self.op.iallocator,
4624                  ", ".join(ial.nodes))
4625     if ial.required_nodes == 2:
4626       self.op.snode = ial.nodes[1]
4627
4628   def BuildHooksEnv(self):
4629     """Build hooks env.
4630
4631     This runs on master, primary and secondary nodes of the instance.
4632
4633     """
4634     env = {
4635       "ADD_MODE": self.op.mode,
4636       }
4637     if self.op.mode == constants.INSTANCE_IMPORT:
4638       env["SRC_NODE"] = self.op.src_node
4639       env["SRC_PATH"] = self.op.src_path
4640       env["SRC_IMAGES"] = self.src_images
4641
4642     env.update(_BuildInstanceHookEnv(
4643       name=self.op.instance_name,
4644       primary_node=self.op.pnode,
4645       secondary_nodes=self.secondaries,
4646       status=self.op.start,
4647       os_type=self.op.os_type,
4648       memory=self.be_full[constants.BE_MEMORY],
4649       vcpus=self.be_full[constants.BE_VCPUS],
4650       nics=_PreBuildNICHooksList(self, self.nics),
4651       disk_template=self.op.disk_template,
4652       disks=[(d["size"], d["mode"]) for d in self.disks],
4653     ))
4654
4655     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4656           self.secondaries)
4657     return env, nl, nl
4658
4659
4660   def CheckPrereq(self):
4661     """Check prerequisites.
4662
4663     """
4664     if (not self.cfg.GetVGName() and
4665         self.op.disk_template not in constants.DTS_NOT_LVM):
4666       raise errors.OpPrereqError("Cluster does not support lvm-based"
4667                                  " instances")
4668
4669     if self.op.mode == constants.INSTANCE_IMPORT:
4670       src_node = self.op.src_node
4671       src_path = self.op.src_path
4672
4673       if src_node is None:
4674         exp_list = self.rpc.call_export_list(
4675           self.acquired_locks[locking.LEVEL_NODE])
4676         found = False
4677         for node in exp_list:
4678           if not exp_list[node].failed and src_path in exp_list[node].data:
4679             found = True
4680             self.op.src_node = src_node = node
4681             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4682                                                        src_path)
4683             break
4684         if not found:
4685           raise errors.OpPrereqError("No export found for relative path %s" %
4686                                       src_path)
4687
4688       _CheckNodeOnline(self, src_node)
4689       result = self.rpc.call_export_info(src_node, src_path)
4690       result.Raise()
4691       if not result.data:
4692         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4693
4694       export_info = result.data
4695       if not export_info.has_section(constants.INISECT_EXP):
4696         raise errors.ProgrammerError("Corrupted export config")
4697
4698       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4699       if (int(ei_version) != constants.EXPORT_VERSION):
4700         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4701                                    (ei_version, constants.EXPORT_VERSION))
4702
4703       # Check that the new instance doesn't have less disks than the export
4704       instance_disks = len(self.disks)
4705       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4706       if instance_disks < export_disks:
4707         raise errors.OpPrereqError("Not enough disks to import."
4708                                    " (instance: %d, export: %d)" %
4709                                    (instance_disks, export_disks))
4710
4711       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4712       disk_images = []
4713       for idx in range(export_disks):
4714         option = 'disk%d_dump' % idx
4715         if export_info.has_option(constants.INISECT_INS, option):
4716           # FIXME: are the old os-es, disk sizes, etc. useful?
4717           export_name = export_info.get(constants.INISECT_INS, option)
4718           image = os.path.join(src_path, export_name)
4719           disk_images.append(image)
4720         else:
4721           disk_images.append(False)
4722
4723       self.src_images = disk_images
4724
4725       old_name = export_info.get(constants.INISECT_INS, 'name')
4726       # FIXME: int() here could throw a ValueError on broken exports
4727       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4728       if self.op.instance_name == old_name:
4729         for idx, nic in enumerate(self.nics):
4730           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4731             nic_mac_ini = 'nic%d_mac' % idx
4732             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4733
4734     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4735     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4736     if self.op.start and not self.op.ip_check:
4737       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4738                                  " adding an instance in start mode")
4739
4740     if self.op.ip_check:
4741       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4742         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4743                                    (self.check_ip, self.op.instance_name))
4744
4745     #### mac address generation
4746     # By generating here the mac address both the allocator and the hooks get
4747     # the real final mac address rather than the 'auto' or 'generate' value.
4748     # There is a race condition between the generation and the instance object
4749     # creation, which means that we know the mac is valid now, but we're not
4750     # sure it will be when we actually add the instance. If things go bad
4751     # adding the instance will abort because of a duplicate mac, and the
4752     # creation job will fail.
4753     for nic in self.nics:
4754       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4755         nic.mac = self.cfg.GenerateMAC()
4756
4757     #### allocator run
4758
4759     if self.op.iallocator is not None:
4760       self._RunAllocator()
4761
4762     #### node related checks
4763
4764     # check primary node
4765     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4766     assert self.pnode is not None, \
4767       "Cannot retrieve locked node %s" % self.op.pnode
4768     if pnode.offline:
4769       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4770                                  pnode.name)
4771     if pnode.drained:
4772       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4773                                  pnode.name)
4774
4775     self.secondaries = []
4776
4777     # mirror node verification
4778     if self.op.disk_template in constants.DTS_NET_MIRROR:
4779       if self.op.snode is None:
4780         raise errors.OpPrereqError("The networked disk templates need"
4781                                    " a mirror node")
4782       if self.op.snode == pnode.name:
4783         raise errors.OpPrereqError("The secondary node cannot be"
4784                                    " the primary node.")
4785       _CheckNodeOnline(self, self.op.snode)
4786       _CheckNodeNotDrained(self, self.op.snode)
4787       self.secondaries.append(self.op.snode)
4788
4789     nodenames = [pnode.name] + self.secondaries
4790
4791     req_size = _ComputeDiskSize(self.op.disk_template,
4792                                 self.disks)
4793
4794     # Check lv size requirements
4795     if req_size is not None:
4796       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4797                                          self.op.hypervisor)
4798       for node in nodenames:
4799         info = nodeinfo[node]
4800         info.Raise()
4801         info = info.data
4802         if not info:
4803           raise errors.OpPrereqError("Cannot get current information"
4804                                      " from node '%s'" % node)
4805         vg_free = info.get('vg_free', None)
4806         if not isinstance(vg_free, int):
4807           raise errors.OpPrereqError("Can't compute free disk space on"
4808                                      " node %s" % node)
4809         if req_size > info['vg_free']:
4810           raise errors.OpPrereqError("Not enough disk space on target node %s."
4811                                      " %d MB available, %d MB required" %
4812                                      (node, info['vg_free'], req_size))
4813
4814     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4815
4816     # os verification
4817     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4818     result.Raise()
4819     if not isinstance(result.data, objects.OS):
4820       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4821                                  " primary node"  % self.op.os_type)
4822
4823     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4824
4825     # memory check on primary node
4826     if self.op.start:
4827       _CheckNodeFreeMemory(self, self.pnode.name,
4828                            "creating instance %s" % self.op.instance_name,
4829                            self.be_full[constants.BE_MEMORY],
4830                            self.op.hypervisor)
4831
4832   def Exec(self, feedback_fn):
4833     """Create and add the instance to the cluster.
4834
4835     """
4836     instance = self.op.instance_name
4837     pnode_name = self.pnode.name
4838
4839     ht_kind = self.op.hypervisor
4840     if ht_kind in constants.HTS_REQ_PORT:
4841       network_port = self.cfg.AllocatePort()
4842     else:
4843       network_port = None
4844
4845     ##if self.op.vnc_bind_address is None:
4846     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4847
4848     # this is needed because os.path.join does not accept None arguments
4849     if self.op.file_storage_dir is None:
4850       string_file_storage_dir = ""
4851     else:
4852       string_file_storage_dir = self.op.file_storage_dir
4853
4854     # build the full file storage dir path
4855     file_storage_dir = os.path.normpath(os.path.join(
4856                                         self.cfg.GetFileStorageDir(),
4857                                         string_file_storage_dir, instance))
4858
4859
4860     disks = _GenerateDiskTemplate(self,
4861                                   self.op.disk_template,
4862                                   instance, pnode_name,
4863                                   self.secondaries,
4864                                   self.disks,
4865                                   file_storage_dir,
4866                                   self.op.file_driver,
4867                                   0)
4868
4869     iobj = objects.Instance(name=instance, os=self.op.os_type,
4870                             primary_node=pnode_name,
4871                             nics=self.nics, disks=disks,
4872                             disk_template=self.op.disk_template,
4873                             admin_up=False,
4874                             network_port=network_port,
4875                             beparams=self.op.beparams,
4876                             hvparams=self.op.hvparams,
4877                             hypervisor=self.op.hypervisor,
4878                             )
4879
4880     feedback_fn("* creating instance disks...")
4881     try:
4882       _CreateDisks(self, iobj)
4883     except errors.OpExecError:
4884       self.LogWarning("Device creation failed, reverting...")
4885       try:
4886         _RemoveDisks(self, iobj)
4887       finally:
4888         self.cfg.ReleaseDRBDMinors(instance)
4889         raise
4890
4891     feedback_fn("adding instance %s to cluster config" % instance)
4892
4893     self.cfg.AddInstance(iobj)
4894     # Declare that we don't want to remove the instance lock anymore, as we've
4895     # added the instance to the config
4896     del self.remove_locks[locking.LEVEL_INSTANCE]
4897     # Unlock all the nodes
4898     if self.op.mode == constants.INSTANCE_IMPORT:
4899       nodes_keep = [self.op.src_node]
4900       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4901                        if node != self.op.src_node]
4902       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4903       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4904     else:
4905       self.context.glm.release(locking.LEVEL_NODE)
4906       del self.acquired_locks[locking.LEVEL_NODE]
4907
4908     if self.op.wait_for_sync:
4909       disk_abort = not _WaitForSync(self, iobj)
4910     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4911       # make sure the disks are not degraded (still sync-ing is ok)
4912       time.sleep(15)
4913       feedback_fn("* checking mirrors status")
4914       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4915     else:
4916       disk_abort = False
4917
4918     if disk_abort:
4919       _RemoveDisks(self, iobj)
4920       self.cfg.RemoveInstance(iobj.name)
4921       # Make sure the instance lock gets removed
4922       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4923       raise errors.OpExecError("There are some degraded disks for"
4924                                " this instance")
4925
4926     feedback_fn("creating os for instance %s on node %s" %
4927                 (instance, pnode_name))
4928
4929     if iobj.disk_template != constants.DT_DISKLESS:
4930       if self.op.mode == constants.INSTANCE_CREATE:
4931         feedback_fn("* running the instance OS create scripts...")
4932         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4933         msg = result.RemoteFailMsg()
4934         if msg:
4935           raise errors.OpExecError("Could not add os for instance %s"
4936                                    " on node %s: %s" %
4937                                    (instance, pnode_name, msg))
4938
4939       elif self.op.mode == constants.INSTANCE_IMPORT:
4940         feedback_fn("* running the instance OS import scripts...")
4941         src_node = self.op.src_node
4942         src_images = self.src_images
4943         cluster_name = self.cfg.GetClusterName()
4944         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4945                                                          src_node, src_images,
4946                                                          cluster_name)
4947         import_result.Raise()
4948         for idx, result in enumerate(import_result.data):
4949           if not result:
4950             self.LogWarning("Could not import the image %s for instance"
4951                             " %s, disk %d, on node %s" %
4952                             (src_images[idx], instance, idx, pnode_name))
4953       else:
4954         # also checked in the prereq part
4955         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4956                                      % self.op.mode)
4957
4958     if self.op.start:
4959       iobj.admin_up = True
4960       self.cfg.Update(iobj)
4961       logging.info("Starting instance %s on node %s", instance, pnode_name)
4962       feedback_fn("* starting instance...")
4963       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4964       msg = result.RemoteFailMsg()
4965       if msg:
4966         raise errors.OpExecError("Could not start instance: %s" % msg)
4967
4968
4969 class LUConnectConsole(NoHooksLU):
4970   """Connect to an instance's console.
4971
4972   This is somewhat special in that it returns the command line that
4973   you need to run on the master node in order to connect to the
4974   console.
4975
4976   """
4977   _OP_REQP = ["instance_name"]
4978   REQ_BGL = False
4979
4980   def ExpandNames(self):
4981     self._ExpandAndLockInstance()
4982
4983   def CheckPrereq(self):
4984     """Check prerequisites.
4985
4986     This checks that the instance is in the cluster.
4987
4988     """
4989     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4990     assert self.instance is not None, \
4991       "Cannot retrieve locked instance %s" % self.op.instance_name
4992     _CheckNodeOnline(self, self.instance.primary_node)
4993
4994   def Exec(self, feedback_fn):
4995     """Connect to the console of an instance
4996
4997     """
4998     instance = self.instance
4999     node = instance.primary_node
5000
5001     node_insts = self.rpc.call_instance_list([node],
5002                                              [instance.hypervisor])[node]
5003     node_insts.Raise()
5004
5005     if instance.name not in node_insts.data:
5006       raise errors.OpExecError("Instance %s is not running." % instance.name)
5007
5008     logging.debug("Connecting to console of %s on %s", instance.name, node)
5009
5010     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5011     cluster = self.cfg.GetClusterInfo()
5012     # beparams and hvparams are passed separately, to avoid editing the
5013     # instance and then saving the defaults in the instance itself.
5014     hvparams = cluster.FillHV(instance)
5015     beparams = cluster.FillBE(instance)
5016     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5017
5018     # build ssh cmdline
5019     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5020
5021
5022 class LUReplaceDisks(LogicalUnit):
5023   """Replace the disks of an instance.
5024
5025   """
5026   HPATH = "mirrors-replace"
5027   HTYPE = constants.HTYPE_INSTANCE
5028   _OP_REQP = ["instance_name", "mode", "disks"]
5029   REQ_BGL = False
5030
5031   def CheckArguments(self):
5032     if not hasattr(self.op, "remote_node"):
5033       self.op.remote_node = None
5034     if not hasattr(self.op, "iallocator"):
5035       self.op.iallocator = None
5036
5037     # check for valid parameter combination
5038     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5039     if self.op.mode == constants.REPLACE_DISK_CHG:
5040       if cnt == 2:
5041         raise errors.OpPrereqError("When changing the secondary either an"
5042                                    " iallocator script must be used or the"
5043                                    " new node given")
5044       elif cnt == 0:
5045         raise errors.OpPrereqError("Give either the iallocator or the new"
5046                                    " secondary, not both")
5047     else: # not replacing the secondary
5048       if cnt != 2:
5049         raise errors.OpPrereqError("The iallocator and new node options can"
5050                                    " be used only when changing the"
5051                                    " secondary node")
5052
5053   def ExpandNames(self):
5054     self._ExpandAndLockInstance()
5055
5056     if self.op.iallocator is not None:
5057       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5058     elif self.op.remote_node is not None:
5059       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5060       if remote_node is None:
5061         raise errors.OpPrereqError("Node '%s' not known" %
5062                                    self.op.remote_node)
5063       self.op.remote_node = remote_node
5064       # Warning: do not remove the locking of the new secondary here
5065       # unless DRBD8.AddChildren is changed to work in parallel;
5066       # currently it doesn't since parallel invocations of
5067       # FindUnusedMinor will conflict
5068       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5069       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5070     else:
5071       self.needed_locks[locking.LEVEL_NODE] = []
5072       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5073
5074   def DeclareLocks(self, level):
5075     # If we're not already locking all nodes in the set we have to declare the
5076     # instance's primary/secondary nodes.
5077     if (level == locking.LEVEL_NODE and
5078         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5079       self._LockInstancesNodes()
5080
5081   def _RunAllocator(self):
5082     """Compute a new secondary node using an IAllocator.
5083
5084     """
5085     ial = IAllocator(self,
5086                      mode=constants.IALLOCATOR_MODE_RELOC,
5087                      name=self.op.instance_name,
5088                      relocate_from=[self.sec_node])
5089
5090     ial.Run(self.op.iallocator)
5091
5092     if not ial.success:
5093       raise errors.OpPrereqError("Can't compute nodes using"
5094                                  " iallocator '%s': %s" % (self.op.iallocator,
5095                                                            ial.info))
5096     if len(ial.nodes) != ial.required_nodes:
5097       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5098                                  " of nodes (%s), required %s" %
5099                                  (len(ial.nodes), ial.required_nodes))
5100     self.op.remote_node = ial.nodes[0]
5101     self.LogInfo("Selected new secondary for the instance: %s",
5102                  self.op.remote_node)
5103
5104   def BuildHooksEnv(self):
5105     """Build hooks env.
5106
5107     This runs on the master, the primary and all the secondaries.
5108
5109     """
5110     env = {
5111       "MODE": self.op.mode,
5112       "NEW_SECONDARY": self.op.remote_node,
5113       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5114       }
5115     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5116     nl = [
5117       self.cfg.GetMasterNode(),
5118       self.instance.primary_node,
5119       ]
5120     if self.op.remote_node is not None:
5121       nl.append(self.op.remote_node)
5122     return env, nl, nl
5123
5124   def CheckPrereq(self):
5125     """Check prerequisites.
5126
5127     This checks that the instance is in the cluster.
5128
5129     """
5130     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5131     assert instance is not None, \
5132       "Cannot retrieve locked instance %s" % self.op.instance_name
5133     self.instance = instance
5134
5135     if instance.disk_template != constants.DT_DRBD8:
5136       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5137                                  " instances")
5138
5139     if len(instance.secondary_nodes) != 1:
5140       raise errors.OpPrereqError("The instance has a strange layout,"
5141                                  " expected one secondary but found %d" %
5142                                  len(instance.secondary_nodes))
5143
5144     self.sec_node = instance.secondary_nodes[0]
5145
5146     if self.op.iallocator is not None:
5147       self._RunAllocator()
5148
5149     remote_node = self.op.remote_node
5150     if remote_node is not None:
5151       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5152       assert self.remote_node_info is not None, \
5153         "Cannot retrieve locked node %s" % remote_node
5154     else:
5155       self.remote_node_info = None
5156     if remote_node == instance.primary_node:
5157       raise errors.OpPrereqError("The specified node is the primary node of"
5158                                  " the instance.")
5159     elif remote_node == self.sec_node:
5160       raise errors.OpPrereqError("The specified node is already the"
5161                                  " secondary node of the instance.")
5162
5163     if self.op.mode == constants.REPLACE_DISK_PRI:
5164       n1 = self.tgt_node = instance.primary_node
5165       n2 = self.oth_node = self.sec_node
5166     elif self.op.mode == constants.REPLACE_DISK_SEC:
5167       n1 = self.tgt_node = self.sec_node
5168       n2 = self.oth_node = instance.primary_node
5169     elif self.op.mode == constants.REPLACE_DISK_CHG:
5170       n1 = self.new_node = remote_node
5171       n2 = self.oth_node = instance.primary_node
5172       self.tgt_node = self.sec_node
5173       _CheckNodeNotDrained(self, remote_node)
5174     else:
5175       raise errors.ProgrammerError("Unhandled disk replace mode")
5176
5177     _CheckNodeOnline(self, n1)
5178     _CheckNodeOnline(self, n2)
5179
5180     if not self.op.disks:
5181       self.op.disks = range(len(instance.disks))
5182
5183     for disk_idx in self.op.disks:
5184       instance.FindDisk(disk_idx)
5185
5186   def _ExecD8DiskOnly(self, feedback_fn):
5187     """Replace a disk on the primary or secondary for dbrd8.
5188
5189     The algorithm for replace is quite complicated:
5190
5191       1. for each disk to be replaced:
5192
5193         1. create new LVs on the target node with unique names
5194         1. detach old LVs from the drbd device
5195         1. rename old LVs to name_replaced.<time_t>
5196         1. rename new LVs to old LVs
5197         1. attach the new LVs (with the old names now) to the drbd device
5198
5199       1. wait for sync across all devices
5200
5201       1. for each modified disk:
5202
5203         1. remove old LVs (which have the name name_replaces.<time_t>)
5204
5205     Failures are not very well handled.
5206
5207     """
5208     steps_total = 6
5209     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5210     instance = self.instance
5211     iv_names = {}
5212     vgname = self.cfg.GetVGName()
5213     # start of work
5214     cfg = self.cfg
5215     tgt_node = self.tgt_node
5216     oth_node = self.oth_node
5217
5218     # Step: check device activation
5219     self.proc.LogStep(1, steps_total, "check device existence")
5220     info("checking volume groups")
5221     my_vg = cfg.GetVGName()
5222     results = self.rpc.call_vg_list([oth_node, tgt_node])
5223     if not results:
5224       raise errors.OpExecError("Can't list volume groups on the nodes")
5225     for node in oth_node, tgt_node:
5226       res = results[node]
5227       if res.failed or not res.data or my_vg not in res.data:
5228         raise errors.OpExecError("Volume group '%s' not found on %s" %
5229                                  (my_vg, node))
5230     for idx, dev in enumerate(instance.disks):
5231       if idx not in self.op.disks:
5232         continue
5233       for node in tgt_node, oth_node:
5234         info("checking disk/%d on %s" % (idx, node))
5235         cfg.SetDiskID(dev, node)
5236         result = self.rpc.call_blockdev_find(node, dev)
5237         msg = result.RemoteFailMsg()
5238         if not msg and not result.payload:
5239           msg = "disk not found"
5240         if msg:
5241           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5242                                    (idx, node, msg))
5243
5244     # Step: check other node consistency
5245     self.proc.LogStep(2, steps_total, "check peer consistency")
5246     for idx, dev in enumerate(instance.disks):
5247       if idx not in self.op.disks:
5248         continue
5249       info("checking disk/%d consistency on %s" % (idx, oth_node))
5250       if not _CheckDiskConsistency(self, dev, oth_node,
5251                                    oth_node==instance.primary_node):
5252         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5253                                  " to replace disks on this node (%s)" %
5254                                  (oth_node, tgt_node))
5255
5256     # Step: create new storage
5257     self.proc.LogStep(3, steps_total, "allocate new storage")
5258     for idx, dev in enumerate(instance.disks):
5259       if idx not in self.op.disks:
5260         continue
5261       size = dev.size
5262       cfg.SetDiskID(dev, tgt_node)
5263       lv_names = [".disk%d_%s" % (idx, suf)
5264                   for suf in ["data", "meta"]]
5265       names = _GenerateUniqueNames(self, lv_names)
5266       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5267                              logical_id=(vgname, names[0]))
5268       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5269                              logical_id=(vgname, names[1]))
5270       new_lvs = [lv_data, lv_meta]
5271       old_lvs = dev.children
5272       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5273       info("creating new local storage on %s for %s" %
5274            (tgt_node, dev.iv_name))
5275       # we pass force_create=True to force the LVM creation
5276       for new_lv in new_lvs:
5277         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5278                         _GetInstanceInfoText(instance), False)
5279
5280     # Step: for each lv, detach+rename*2+attach
5281     self.proc.LogStep(4, steps_total, "change drbd configuration")
5282     for dev, old_lvs, new_lvs in iv_names.itervalues():
5283       info("detaching %s drbd from local storage" % dev.iv_name)
5284       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5285       msg = result.RemoteFailMsg()
5286       if msg:
5287         raise errors.OpExecError("Can't detach drbd from local storage on node"
5288                                  " %s for device %s: %s" %
5289                                  (tgt_node, dev.iv_name, msg))
5290       #dev.children = []
5291       #cfg.Update(instance)
5292
5293       # ok, we created the new LVs, so now we know we have the needed
5294       # storage; as such, we proceed on the target node to rename
5295       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5296       # using the assumption that logical_id == physical_id (which in
5297       # turn is the unique_id on that node)
5298
5299       # FIXME(iustin): use a better name for the replaced LVs
5300       temp_suffix = int(time.time())
5301       ren_fn = lambda d, suff: (d.physical_id[0],
5302                                 d.physical_id[1] + "_replaced-%s" % suff)
5303       # build the rename list based on what LVs exist on the node
5304       rlist = []
5305       for to_ren in old_lvs:
5306         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5307         if not result.RemoteFailMsg() and result.payload:
5308           # device exists
5309           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5310
5311       info("renaming the old LVs on the target node")
5312       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5313       msg = result.RemoteFailMsg()
5314       if msg:
5315         raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5316                                  (tgt_node, msg))
5317       # now we rename the new LVs to the old LVs
5318       info("renaming the new LVs on the target node")
5319       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5320       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5321       msg = result.RemoteFailMsg()
5322       if msg:
5323         raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5324                                  (tgt_node, msg))
5325
5326       for old, new in zip(old_lvs, new_lvs):
5327         new.logical_id = old.logical_id
5328         cfg.SetDiskID(new, tgt_node)
5329
5330       for disk in old_lvs:
5331         disk.logical_id = ren_fn(disk, temp_suffix)
5332         cfg.SetDiskID(disk, tgt_node)
5333
5334       # now that the new lvs have the old name, we can add them to the device
5335       info("adding new mirror component on %s" % tgt_node)
5336       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5337       msg = result.RemoteFailMsg()
5338       if msg:
5339         for new_lv in new_lvs:
5340           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5341           if msg:
5342             warning("Can't rollback device %s: %s", dev, msg,
5343                     hint="cleanup manually the unused logical volumes")
5344         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5345
5346       dev.children = new_lvs
5347       cfg.Update(instance)
5348
5349     # Step: wait for sync
5350
5351     # this can fail as the old devices are degraded and _WaitForSync
5352     # does a combined result over all disks, so we don't check its
5353     # return value
5354     self.proc.LogStep(5, steps_total, "sync devices")
5355     _WaitForSync(self, instance, unlock=True)
5356
5357     # so check manually all the devices
5358     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5359       cfg.SetDiskID(dev, instance.primary_node)
5360       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5361       msg = result.RemoteFailMsg()
5362       if not msg and not result.payload:
5363         msg = "disk not found"
5364       if msg:
5365         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5366                                  (name, msg))
5367       if result.payload[5]:
5368         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5369
5370     # Step: remove old storage
5371     self.proc.LogStep(6, steps_total, "removing old storage")
5372     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5373       info("remove logical volumes for %s" % name)
5374       for lv in old_lvs:
5375         cfg.SetDiskID(lv, tgt_node)
5376         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5377         if msg:
5378           warning("Can't remove old LV: %s" % msg,
5379                   hint="manually remove unused LVs")
5380           continue
5381
5382   def _ExecD8Secondary(self, feedback_fn):
5383     """Replace the secondary node for drbd8.
5384
5385     The algorithm for replace is quite complicated:
5386       - for all disks of the instance:
5387         - create new LVs on the new node with same names
5388         - shutdown the drbd device on the old secondary
5389         - disconnect the drbd network on the primary
5390         - create the drbd device on the new secondary
5391         - network attach the drbd on the primary, using an artifice:
5392           the drbd code for Attach() will connect to the network if it
5393           finds a device which is connected to the good local disks but
5394           not network enabled
5395       - wait for sync across all devices
5396       - remove all disks from the old secondary
5397
5398     Failures are not very well handled.
5399
5400     """
5401     steps_total = 6
5402     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5403     instance = self.instance
5404     iv_names = {}
5405     # start of work
5406     cfg = self.cfg
5407     old_node = self.tgt_node
5408     new_node = self.new_node
5409     pri_node = instance.primary_node
5410     nodes_ip = {
5411       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5412       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5413       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5414       }
5415
5416     # Step: check device activation
5417     self.proc.LogStep(1, steps_total, "check device existence")
5418     info("checking volume groups")
5419     my_vg = cfg.GetVGName()
5420     results = self.rpc.call_vg_list([pri_node, new_node])
5421     for node in pri_node, new_node:
5422       res = results[node]
5423       if res.failed or not res.data or my_vg not in res.data:
5424         raise errors.OpExecError("Volume group '%s' not found on %s" %
5425                                  (my_vg, node))
5426     for idx, dev in enumerate(instance.disks):
5427       if idx not in self.op.disks:
5428         continue
5429       info("checking disk/%d on %s" % (idx, pri_node))
5430       cfg.SetDiskID(dev, pri_node)
5431       result = self.rpc.call_blockdev_find(pri_node, dev)
5432       msg = result.RemoteFailMsg()
5433       if not msg and not result.payload:
5434         msg = "disk not found"
5435       if msg:
5436         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5437                                  (idx, pri_node, msg))
5438
5439     # Step: check other node consistency
5440     self.proc.LogStep(2, steps_total, "check peer consistency")
5441     for idx, dev in enumerate(instance.disks):
5442       if idx not in self.op.disks:
5443         continue
5444       info("checking disk/%d consistency on %s" % (idx, pri_node))
5445       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5446         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5447                                  " unsafe to replace the secondary" %
5448                                  pri_node)
5449
5450     # Step: create new storage
5451     self.proc.LogStep(3, steps_total, "allocate new storage")
5452     for idx, dev in enumerate(instance.disks):
5453       info("adding new local storage on %s for disk/%d" %
5454            (new_node, idx))
5455       # we pass force_create=True to force LVM creation
5456       for new_lv in dev.children:
5457         _CreateBlockDev(self, new_node, instance, new_lv, True,
5458                         _GetInstanceInfoText(instance), False)
5459
5460     # Step 4: dbrd minors and drbd setups changes
5461     # after this, we must manually remove the drbd minors on both the
5462     # error and the success paths
5463     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5464                                    instance.name)
5465     logging.debug("Allocated minors %s" % (minors,))
5466     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5467     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5468       size = dev.size
5469       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5470       # create new devices on new_node; note that we create two IDs:
5471       # one without port, so the drbd will be activated without
5472       # networking information on the new node at this stage, and one
5473       # with network, for the latter activation in step 4
5474       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5475       if pri_node == o_node1:
5476         p_minor = o_minor1
5477       else:
5478         p_minor = o_minor2
5479
5480       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5481       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5482
5483       iv_names[idx] = (dev, dev.children, new_net_id)
5484       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5485                     new_net_id)
5486       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5487                               logical_id=new_alone_id,
5488                               children=dev.children)
5489       try:
5490         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5491                               _GetInstanceInfoText(instance), False)
5492       except errors.GenericError:
5493         self.cfg.ReleaseDRBDMinors(instance.name)
5494         raise
5495
5496     for idx, dev in enumerate(instance.disks):
5497       # we have new devices, shutdown the drbd on the old secondary
5498       info("shutting down drbd for disk/%d on old node" % idx)
5499       cfg.SetDiskID(dev, old_node)
5500       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5501       if msg:
5502         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5503                 (idx, msg),
5504                 hint="Please cleanup this device manually as soon as possible")
5505
5506     info("detaching primary drbds from the network (=> standalone)")
5507     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5508                                                instance.disks)[pri_node]
5509
5510     msg = result.RemoteFailMsg()
5511     if msg:
5512       # detaches didn't succeed (unlikely)
5513       self.cfg.ReleaseDRBDMinors(instance.name)
5514       raise errors.OpExecError("Can't detach the disks from the network on"
5515                                " old node: %s" % (msg,))
5516
5517     # if we managed to detach at least one, we update all the disks of
5518     # the instance to point to the new secondary
5519     info("updating instance configuration")
5520     for dev, _, new_logical_id in iv_names.itervalues():
5521       dev.logical_id = new_logical_id
5522       cfg.SetDiskID(dev, pri_node)
5523     cfg.Update(instance)
5524
5525     # and now perform the drbd attach
5526     info("attaching primary drbds to new secondary (standalone => connected)")
5527     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5528                                            instance.disks, instance.name,
5529                                            False)
5530     for to_node, to_result in result.items():
5531       msg = to_result.RemoteFailMsg()
5532       if msg:
5533         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5534                 hint="please do a gnt-instance info to see the"
5535                 " status of disks")
5536
5537     # this can fail as the old devices are degraded and _WaitForSync
5538     # does a combined result over all disks, so we don't check its
5539     # return value
5540     self.proc.LogStep(5, steps_total, "sync devices")
5541     _WaitForSync(self, instance, unlock=True)
5542
5543     # so check manually all the devices
5544     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5545       cfg.SetDiskID(dev, pri_node)
5546       result = self.rpc.call_blockdev_find(pri_node, dev)
5547       msg = result.RemoteFailMsg()
5548       if not msg and not result.payload:
5549         msg = "disk not found"
5550       if msg:
5551         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5552                                  (idx, msg))
5553       if result.payload[5]:
5554         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5555
5556     self.proc.LogStep(6, steps_total, "removing old storage")
5557     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5558       info("remove logical volumes for disk/%d" % idx)
5559       for lv in old_lvs:
5560         cfg.SetDiskID(lv, old_node)
5561         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5562         if msg:
5563           warning("Can't remove LV on old secondary: %s", msg,
5564                   hint="Cleanup stale volumes by hand")
5565
5566   def Exec(self, feedback_fn):
5567     """Execute disk replacement.
5568
5569     This dispatches the disk replacement to the appropriate handler.
5570
5571     """
5572     instance = self.instance
5573
5574     # Activate the instance disks if we're replacing them on a down instance
5575     if not instance.admin_up:
5576       _StartInstanceDisks(self, instance, True)
5577
5578     if self.op.mode == constants.REPLACE_DISK_CHG:
5579       fn = self._ExecD8Secondary
5580     else:
5581       fn = self._ExecD8DiskOnly
5582
5583     ret = fn(feedback_fn)
5584
5585     # Deactivate the instance disks if we're replacing them on a down instance
5586     if not instance.admin_up:
5587       _SafeShutdownInstanceDisks(self, instance)
5588
5589     return ret
5590
5591
5592 class LUGrowDisk(LogicalUnit):
5593   """Grow a disk of an instance.
5594
5595   """
5596   HPATH = "disk-grow"
5597   HTYPE = constants.HTYPE_INSTANCE
5598   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5599   REQ_BGL = False
5600
5601   def ExpandNames(self):
5602     self._ExpandAndLockInstance()
5603     self.needed_locks[locking.LEVEL_NODE] = []
5604     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5605
5606   def DeclareLocks(self, level):
5607     if level == locking.LEVEL_NODE:
5608       self._LockInstancesNodes()
5609
5610   def BuildHooksEnv(self):
5611     """Build hooks env.
5612
5613     This runs on the master, the primary and all the secondaries.
5614
5615     """
5616     env = {
5617       "DISK": self.op.disk,
5618       "AMOUNT": self.op.amount,
5619       }
5620     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5621     nl = [
5622       self.cfg.GetMasterNode(),
5623       self.instance.primary_node,
5624       ]
5625     return env, nl, nl
5626
5627   def CheckPrereq(self):
5628     """Check prerequisites.
5629
5630     This checks that the instance is in the cluster.
5631
5632     """
5633     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5634     assert instance is not None, \
5635       "Cannot retrieve locked instance %s" % self.op.instance_name
5636     nodenames = list(instance.all_nodes)
5637     for node in nodenames:
5638       _CheckNodeOnline(self, node)
5639
5640
5641     self.instance = instance
5642
5643     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5644       raise errors.OpPrereqError("Instance's disk layout does not support"
5645                                  " growing.")
5646
5647     self.disk = instance.FindDisk(self.op.disk)
5648
5649     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5650                                        instance.hypervisor)
5651     for node in nodenames:
5652       info = nodeinfo[node]
5653       if info.failed or not info.data:
5654         raise errors.OpPrereqError("Cannot get current information"
5655                                    " from node '%s'" % node)
5656       vg_free = info.data.get('vg_free', None)
5657       if not isinstance(vg_free, int):
5658         raise errors.OpPrereqError("Can't compute free disk space on"
5659                                    " node %s" % node)
5660       if self.op.amount > vg_free:
5661         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5662                                    " %d MiB available, %d MiB required" %
5663                                    (node, vg_free, self.op.amount))
5664
5665   def Exec(self, feedback_fn):
5666     """Execute disk grow.
5667
5668     """
5669     instance = self.instance
5670     disk = self.disk
5671     for node in instance.all_nodes:
5672       self.cfg.SetDiskID(disk, node)
5673       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5674       msg = result.RemoteFailMsg()
5675       if msg:
5676         raise errors.OpExecError("Grow request failed to node %s: %s" %
5677                                  (node, msg))
5678     disk.RecordGrow(self.op.amount)
5679     self.cfg.Update(instance)
5680     if self.op.wait_for_sync:
5681       disk_abort = not _WaitForSync(self, instance)
5682       if disk_abort:
5683         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5684                              " status.\nPlease check the instance.")
5685
5686
5687 class LUQueryInstanceData(NoHooksLU):
5688   """Query runtime instance data.
5689
5690   """
5691   _OP_REQP = ["instances", "static"]
5692   REQ_BGL = False
5693
5694   def ExpandNames(self):
5695     self.needed_locks = {}
5696     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5697
5698     if not isinstance(self.op.instances, list):
5699       raise errors.OpPrereqError("Invalid argument type 'instances'")
5700
5701     if self.op.instances:
5702       self.wanted_names = []
5703       for name in self.op.instances:
5704         full_name = self.cfg.ExpandInstanceName(name)
5705         if full_name is None:
5706           raise errors.OpPrereqError("Instance '%s' not known" % name)
5707         self.wanted_names.append(full_name)
5708       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5709     else:
5710       self.wanted_names = None
5711       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5712
5713     self.needed_locks[locking.LEVEL_NODE] = []
5714     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5715
5716   def DeclareLocks(self, level):
5717     if level == locking.LEVEL_NODE:
5718       self._LockInstancesNodes()
5719
5720   def CheckPrereq(self):
5721     """Check prerequisites.
5722
5723     This only checks the optional instance list against the existing names.
5724
5725     """
5726     if self.wanted_names is None:
5727       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5728
5729     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5730                              in self.wanted_names]
5731     return
5732
5733   def _ComputeDiskStatus(self, instance, snode, dev):
5734     """Compute block device status.
5735
5736     """
5737     static = self.op.static
5738     if not static:
5739       self.cfg.SetDiskID(dev, instance.primary_node)
5740       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5741       if dev_pstatus.offline:
5742         dev_pstatus = None
5743       else:
5744         msg = dev_pstatus.RemoteFailMsg()
5745         if msg:
5746           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5747                                    (instance.name, msg))
5748         dev_pstatus = dev_pstatus.payload
5749     else:
5750       dev_pstatus = None
5751
5752     if dev.dev_type in constants.LDS_DRBD:
5753       # we change the snode then (otherwise we use the one passed in)
5754       if dev.logical_id[0] == instance.primary_node:
5755         snode = dev.logical_id[1]
5756       else:
5757         snode = dev.logical_id[0]
5758
5759     if snode and not static:
5760       self.cfg.SetDiskID(dev, snode)
5761       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5762       if dev_sstatus.offline:
5763         dev_sstatus = None
5764       else:
5765         msg = dev_sstatus.RemoteFailMsg()
5766         if msg:
5767           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5768                                    (instance.name, msg))
5769         dev_sstatus = dev_sstatus.payload
5770     else:
5771       dev_sstatus = None
5772
5773     if dev.children:
5774       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5775                       for child in dev.children]
5776     else:
5777       dev_children = []
5778
5779     data = {
5780       "iv_name": dev.iv_name,
5781       "dev_type": dev.dev_type,
5782       "logical_id": dev.logical_id,
5783       "physical_id": dev.physical_id,
5784       "pstatus": dev_pstatus,
5785       "sstatus": dev_sstatus,
5786       "children": dev_children,
5787       "mode": dev.mode,
5788       }
5789
5790     return data
5791
5792   def Exec(self, feedback_fn):
5793     """Gather and return data"""
5794     result = {}
5795
5796     cluster = self.cfg.GetClusterInfo()
5797
5798     for instance in self.wanted_instances:
5799       if not self.op.static:
5800         remote_info = self.rpc.call_instance_info(instance.primary_node,
5801                                                   instance.name,
5802                                                   instance.hypervisor)
5803         remote_info.Raise()
5804         remote_info = remote_info.data
5805         if remote_info and "state" in remote_info:
5806           remote_state = "up"
5807         else:
5808           remote_state = "down"
5809       else:
5810         remote_state = None
5811       if instance.admin_up:
5812         config_state = "up"
5813       else:
5814         config_state = "down"
5815
5816       disks = [self._ComputeDiskStatus(instance, None, device)
5817                for device in instance.disks]
5818
5819       idict = {
5820         "name": instance.name,
5821         "config_state": config_state,
5822         "run_state": remote_state,
5823         "pnode": instance.primary_node,
5824         "snodes": instance.secondary_nodes,
5825         "os": instance.os,
5826         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5827         "disks": disks,
5828         "hypervisor": instance.hypervisor,
5829         "network_port": instance.network_port,
5830         "hv_instance": instance.hvparams,
5831         "hv_actual": cluster.FillHV(instance),
5832         "be_instance": instance.beparams,
5833         "be_actual": cluster.FillBE(instance),
5834         }
5835
5836       result[instance.name] = idict
5837
5838     return result
5839
5840
5841 class LUSetInstanceParams(LogicalUnit):
5842   """Modifies an instances's parameters.
5843
5844   """
5845   HPATH = "instance-modify"
5846   HTYPE = constants.HTYPE_INSTANCE
5847   _OP_REQP = ["instance_name"]
5848   REQ_BGL = False
5849
5850   def CheckArguments(self):
5851     if not hasattr(self.op, 'nics'):
5852       self.op.nics = []
5853     if not hasattr(self.op, 'disks'):
5854       self.op.disks = []
5855     if not hasattr(self.op, 'beparams'):
5856       self.op.beparams = {}
5857     if not hasattr(self.op, 'hvparams'):
5858       self.op.hvparams = {}
5859     self.op.force = getattr(self.op, "force", False)
5860     if not (self.op.nics or self.op.disks or
5861             self.op.hvparams or self.op.beparams):
5862       raise errors.OpPrereqError("No changes submitted")
5863
5864     # Disk validation
5865     disk_addremove = 0
5866     for disk_op, disk_dict in self.op.disks:
5867       if disk_op == constants.DDM_REMOVE:
5868         disk_addremove += 1
5869         continue
5870       elif disk_op == constants.DDM_ADD:
5871         disk_addremove += 1
5872       else:
5873         if not isinstance(disk_op, int):
5874           raise errors.OpPrereqError("Invalid disk index")
5875       if disk_op == constants.DDM_ADD:
5876         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5877         if mode not in constants.DISK_ACCESS_SET:
5878           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5879         size = disk_dict.get('size', None)
5880         if size is None:
5881           raise errors.OpPrereqError("Required disk parameter size missing")
5882         try:
5883           size = int(size)
5884         except ValueError, err:
5885           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5886                                      str(err))
5887         disk_dict['size'] = size
5888       else:
5889         # modification of disk
5890         if 'size' in disk_dict:
5891           raise errors.OpPrereqError("Disk size change not possible, use"
5892                                      " grow-disk")
5893
5894     if disk_addremove > 1:
5895       raise errors.OpPrereqError("Only one disk add or remove operation"
5896                                  " supported at a time")
5897
5898     # NIC validation
5899     nic_addremove = 0
5900     for nic_op, nic_dict in self.op.nics:
5901       if nic_op == constants.DDM_REMOVE:
5902         nic_addremove += 1
5903         continue
5904       elif nic_op == constants.DDM_ADD:
5905         nic_addremove += 1
5906       else:
5907         if not isinstance(nic_op, int):
5908           raise errors.OpPrereqError("Invalid nic index")
5909
5910       # nic_dict should be a dict
5911       nic_ip = nic_dict.get('ip', None)
5912       if nic_ip is not None:
5913         if nic_ip.lower() == constants.VALUE_NONE:
5914           nic_dict['ip'] = None
5915         else:
5916           if not utils.IsValidIP(nic_ip):
5917             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5918
5919       nic_bridge = nic_dict.get('bridge', None)
5920       nic_link = nic_dict.get('link', None)
5921       if nic_bridge and nic_link:
5922         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5923       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5924         nic_dict['bridge'] = None
5925       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5926         nic_dict['link'] = None
5927
5928       if nic_op == constants.DDM_ADD:
5929         nic_mac = nic_dict.get('mac', None)
5930         if nic_mac is None:
5931           nic_dict['mac'] = constants.VALUE_AUTO
5932
5933       if 'mac' in nic_dict:
5934         nic_mac = nic_dict['mac']
5935         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5936           if not utils.IsValidMac(nic_mac):
5937             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5938         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5939           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5940                                      " modifying an existing nic")
5941
5942     if nic_addremove > 1:
5943       raise errors.OpPrereqError("Only one NIC add or remove operation"
5944                                  " supported at a time")
5945
5946   def ExpandNames(self):
5947     self._ExpandAndLockInstance()
5948     self.needed_locks[locking.LEVEL_NODE] = []
5949     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5950
5951   def DeclareLocks(self, level):
5952     if level == locking.LEVEL_NODE:
5953       self._LockInstancesNodes()
5954
5955   def BuildHooksEnv(self):
5956     """Build hooks env.
5957
5958     This runs on the master, primary and secondaries.
5959
5960     """
5961     args = dict()
5962     if constants.BE_MEMORY in self.be_new:
5963       args['memory'] = self.be_new[constants.BE_MEMORY]
5964     if constants.BE_VCPUS in self.be_new:
5965       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5966     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5967     # information at all.
5968     if self.op.nics:
5969       args['nics'] = []
5970       nic_override = dict(self.op.nics)
5971       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
5972       for idx, nic in enumerate(self.instance.nics):
5973         if idx in nic_override:
5974           this_nic_override = nic_override[idx]
5975         else:
5976           this_nic_override = {}
5977         if 'ip' in this_nic_override:
5978           ip = this_nic_override['ip']
5979         else:
5980           ip = nic.ip
5981         if 'mac' in this_nic_override:
5982           mac = this_nic_override['mac']
5983         else:
5984           mac = nic.mac
5985         if idx in self.nic_pnew:
5986           nicparams = self.nic_pnew[idx]
5987         else:
5988           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
5989         mode = nicparams[constants.NIC_MODE]
5990         link = nicparams[constants.NIC_LINK]
5991         args['nics'].append((ip, mac, mode, link))
5992       if constants.DDM_ADD in nic_override:
5993         ip = nic_override[constants.DDM_ADD].get('ip', None)
5994         mac = nic_override[constants.DDM_ADD]['mac']
5995         nicparams = self.nic_pnew[constants.DDM_ADD]
5996         mode = nicparams[constants.NIC_MODE]
5997         link = nicparams[constants.NIC_LINK]
5998         args['nics'].append((ip, mac, mode, link))
5999       elif constants.DDM_REMOVE in nic_override:
6000         del args['nics'][-1]
6001
6002     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6003     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6004     return env, nl, nl
6005
6006   def _GetUpdatedParams(self, old_params, update_dict,
6007                         default_values, parameter_types):
6008     """Return the new params dict for the given params.
6009
6010     @type old_params: dict
6011     @type old_params: old parameters
6012     @type update_dict: dict
6013     @type update_dict: dict containing new parameter values,
6014                        or constants.VALUE_DEFAULT to reset the
6015                        parameter to its default value
6016     @type default_values: dict
6017     @param default_values: default values for the filled parameters
6018     @type parameter_types: dict
6019     @param parameter_types: dict mapping target dict keys to types
6020                             in constants.ENFORCEABLE_TYPES
6021     @rtype: (dict, dict)
6022     @return: (new_parameters, filled_parameters)
6023
6024     """
6025     params_copy = copy.deepcopy(old_params)
6026     for key, val in update_dict.iteritems():
6027       if val == constants.VALUE_DEFAULT:
6028         try:
6029           del params_copy[key]
6030         except KeyError:
6031           pass
6032       else:
6033         params_copy[key] = val
6034     utils.ForceDictType(params_copy, parameter_types)
6035     params_filled = objects.FillDict(default_values, params_copy)
6036     return (params_copy, params_filled)
6037
6038   def CheckPrereq(self):
6039     """Check prerequisites.
6040
6041     This only checks the instance list against the existing names.
6042
6043     """
6044     force = self.force = self.op.force
6045
6046     # checking the new params on the primary/secondary nodes
6047
6048     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6049     cluster = self.cluster = self.cfg.GetClusterInfo()
6050     assert self.instance is not None, \
6051       "Cannot retrieve locked instance %s" % self.op.instance_name
6052     pnode = instance.primary_node
6053     nodelist = list(instance.all_nodes)
6054
6055     # hvparams processing
6056     if self.op.hvparams:
6057       i_hvdict, hv_new = self._GetUpdatedParams(
6058                              instance.hvparams, self.op.hvparams,
6059                              cluster.hvparams[instance.hypervisor],
6060                              constants.HVS_PARAMETER_TYPES)
6061       # local check
6062       hypervisor.GetHypervisor(
6063         instance.hypervisor).CheckParameterSyntax(hv_new)
6064       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6065       self.hv_new = hv_new # the new actual values
6066       self.hv_inst = i_hvdict # the new dict (without defaults)
6067     else:
6068       self.hv_new = self.hv_inst = {}
6069
6070     # beparams processing
6071     if self.op.beparams:
6072       i_bedict, be_new = self._GetUpdatedParams(
6073                              instance.beparams, self.op.beparams,
6074                              cluster.beparams[constants.PP_DEFAULT],
6075                              constants.BES_PARAMETER_TYPES)
6076       self.be_new = be_new # the new actual values
6077       self.be_inst = i_bedict # the new dict (without defaults)
6078     else:
6079       self.be_new = self.be_inst = {}
6080
6081     self.warn = []
6082
6083     if constants.BE_MEMORY in self.op.beparams and not self.force:
6084       mem_check_list = [pnode]
6085       if be_new[constants.BE_AUTO_BALANCE]:
6086         # either we changed auto_balance to yes or it was from before
6087         mem_check_list.extend(instance.secondary_nodes)
6088       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6089                                                   instance.hypervisor)
6090       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6091                                          instance.hypervisor)
6092       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6093         # Assume the primary node is unreachable and go ahead
6094         self.warn.append("Can't get info from primary node %s" % pnode)
6095       else:
6096         if not instance_info.failed and instance_info.data:
6097           current_mem = int(instance_info.data['memory'])
6098         else:
6099           # Assume instance not running
6100           # (there is a slight race condition here, but it's not very probable,
6101           # and we have no other way to check)
6102           current_mem = 0
6103         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6104                     nodeinfo[pnode].data['memory_free'])
6105         if miss_mem > 0:
6106           raise errors.OpPrereqError("This change will prevent the instance"
6107                                      " from starting, due to %d MB of memory"
6108                                      " missing on its primary node" % miss_mem)
6109
6110       if be_new[constants.BE_AUTO_BALANCE]:
6111         for node, nres in nodeinfo.iteritems():
6112           if node not in instance.secondary_nodes:
6113             continue
6114           if nres.failed or not isinstance(nres.data, dict):
6115             self.warn.append("Can't get info from secondary node %s" % node)
6116           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6117             self.warn.append("Not enough memory to failover instance to"
6118                              " secondary node %s" % node)
6119
6120     # NIC processing
6121     self.nic_pnew = {}
6122     self.nic_pinst = {}
6123     for nic_op, nic_dict in self.op.nics:
6124       if nic_op == constants.DDM_REMOVE:
6125         if not instance.nics:
6126           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6127         continue
6128       if nic_op != constants.DDM_ADD:
6129         # an existing nic
6130         if nic_op < 0 or nic_op >= len(instance.nics):
6131           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6132                                      " are 0 to %d" %
6133                                      (nic_op, len(instance.nics)))
6134         old_nic_params = instance.nics[nic_op].nicparams
6135         old_nic_ip = instance.nics[nic_op].ip
6136       else:
6137         old_nic_params = {}
6138         old_nic_ip = None
6139
6140       update_params_dict = dict([(key, nic_dict[key])
6141                                  for key in constants.NICS_PARAMETERS
6142                                  if key in nic_dict])
6143
6144       if 'bridge' in nic_dict:
6145         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6146
6147       new_nic_params, new_filled_nic_params = \
6148           self._GetUpdatedParams(old_nic_params, update_params_dict,
6149                                  cluster.nicparams[constants.PP_DEFAULT],
6150                                  constants.NICS_PARAMETER_TYPES)
6151       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6152       self.nic_pinst[nic_op] = new_nic_params
6153       self.nic_pnew[nic_op] = new_filled_nic_params
6154       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6155
6156       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6157         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6158         result = self.rpc.call_bridges_exist(pnode, [nic_bridge])
6159         result.Raise()
6160         if not result.data:
6161           msg = ("Bridge '%s' doesn't exist on one of"
6162                  " the instance nodes" % nic_bridge)
6163           if self.force:
6164             self.warn.append(msg)
6165           else:
6166             raise errors.OpPrereqError(msg)
6167       if new_nic_mode == constants.NIC_MODE_ROUTED:
6168         if 'ip' in nic_dict:
6169           nic_ip = nic_dict['ip']
6170         else:
6171           nic_ip = old_nic_ip
6172         if nic_ip is None:
6173           raise errors.OpPrereqError('Cannot set the nic ip to None'
6174                                      ' on a routed nic')
6175       if 'mac' in nic_dict:
6176         nic_mac = nic_dict['mac']
6177         if nic_mac is None:
6178           raise errors.OpPrereqError('Cannot set the nic mac to None')
6179         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6180           # otherwise generate the mac
6181           nic_dict['mac'] = self.cfg.GenerateMAC()
6182         else:
6183           # or validate/reserve the current one
6184           if self.cfg.IsMacInUse(nic_mac):
6185             raise errors.OpPrereqError("MAC address %s already in use"
6186                                        " in cluster" % nic_mac)
6187
6188     # DISK processing
6189     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6190       raise errors.OpPrereqError("Disk operations not supported for"
6191                                  " diskless instances")
6192     for disk_op, disk_dict in self.op.disks:
6193       if disk_op == constants.DDM_REMOVE:
6194         if len(instance.disks) == 1:
6195           raise errors.OpPrereqError("Cannot remove the last disk of"
6196                                      " an instance")
6197         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6198         ins_l = ins_l[pnode]
6199         if ins_l.failed or not isinstance(ins_l.data, list):
6200           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6201         if instance.name in ins_l.data:
6202           raise errors.OpPrereqError("Instance is running, can't remove"
6203                                      " disks.")
6204
6205       if (disk_op == constants.DDM_ADD and
6206           len(instance.nics) >= constants.MAX_DISKS):
6207         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6208                                    " add more" % constants.MAX_DISKS)
6209       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6210         # an existing disk
6211         if disk_op < 0 or disk_op >= len(instance.disks):
6212           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6213                                      " are 0 to %d" %
6214                                      (disk_op, len(instance.disks)))
6215
6216     return
6217
6218   def Exec(self, feedback_fn):
6219     """Modifies an instance.
6220
6221     All parameters take effect only at the next restart of the instance.
6222
6223     """
6224     # Process here the warnings from CheckPrereq, as we don't have a
6225     # feedback_fn there.
6226     for warn in self.warn:
6227       feedback_fn("WARNING: %s" % warn)
6228
6229     result = []
6230     instance = self.instance
6231     cluster = self.cluster
6232     # disk changes
6233     for disk_op, disk_dict in self.op.disks:
6234       if disk_op == constants.DDM_REMOVE:
6235         # remove the last disk
6236         device = instance.disks.pop()
6237         device_idx = len(instance.disks)
6238         for node, disk in device.ComputeNodeTree(instance.primary_node):
6239           self.cfg.SetDiskID(disk, node)
6240           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6241           if msg:
6242             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6243                             " continuing anyway", device_idx, node, msg)
6244         result.append(("disk/%d" % device_idx, "remove"))
6245       elif disk_op == constants.DDM_ADD:
6246         # add a new disk
6247         if instance.disk_template == constants.DT_FILE:
6248           file_driver, file_path = instance.disks[0].logical_id
6249           file_path = os.path.dirname(file_path)
6250         else:
6251           file_driver = file_path = None
6252         disk_idx_base = len(instance.disks)
6253         new_disk = _GenerateDiskTemplate(self,
6254                                          instance.disk_template,
6255                                          instance.name, instance.primary_node,
6256                                          instance.secondary_nodes,
6257                                          [disk_dict],
6258                                          file_path,
6259                                          file_driver,
6260                                          disk_idx_base)[0]
6261         instance.disks.append(new_disk)
6262         info = _GetInstanceInfoText(instance)
6263
6264         logging.info("Creating volume %s for instance %s",
6265                      new_disk.iv_name, instance.name)
6266         # Note: this needs to be kept in sync with _CreateDisks
6267         #HARDCODE
6268         for node in instance.all_nodes:
6269           f_create = node == instance.primary_node
6270           try:
6271             _CreateBlockDev(self, node, instance, new_disk,
6272                             f_create, info, f_create)
6273           except errors.OpExecError, err:
6274             self.LogWarning("Failed to create volume %s (%s) on"
6275                             " node %s: %s",
6276                             new_disk.iv_name, new_disk, node, err)
6277         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6278                        (new_disk.size, new_disk.mode)))
6279       else:
6280         # change a given disk
6281         instance.disks[disk_op].mode = disk_dict['mode']
6282         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6283     # NIC changes
6284     for nic_op, nic_dict in self.op.nics:
6285       if nic_op == constants.DDM_REMOVE:
6286         # remove the last nic
6287         del instance.nics[-1]
6288         result.append(("nic.%d" % len(instance.nics), "remove"))
6289       elif nic_op == constants.DDM_ADD:
6290         # mac and bridge should be set, by now
6291         mac = nic_dict['mac']
6292         ip = nic_dict.get('ip', None)
6293         nicparams = self.nic_pinst[constants.DDM_ADD]
6294         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6295         instance.nics.append(new_nic)
6296         result.append(("nic.%d" % (len(instance.nics) - 1),
6297                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6298                        (new_nic.mac, new_nic.ip,
6299                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6300                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6301                        )))
6302       else:
6303         for key in 'mac', 'ip':
6304           if key in nic_dict:
6305             setattr(instance.nics[nic_op], key, nic_dict[key])
6306         if nic_op in self.nic_pnew:
6307           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6308         for key, val in nic_dict.iteritems():
6309           result.append(("nic.%s/%d" % (key, nic_op), val))
6310
6311     # hvparams changes
6312     if self.op.hvparams:
6313       instance.hvparams = self.hv_inst
6314       for key, val in self.op.hvparams.iteritems():
6315         result.append(("hv/%s" % key, val))
6316
6317     # beparams changes
6318     if self.op.beparams:
6319       instance.beparams = self.be_inst
6320       for key, val in self.op.beparams.iteritems():
6321         result.append(("be/%s" % key, val))
6322
6323     self.cfg.Update(instance)
6324
6325     return result
6326
6327
6328 class LUQueryExports(NoHooksLU):
6329   """Query the exports list
6330
6331   """
6332   _OP_REQP = ['nodes']
6333   REQ_BGL = False
6334
6335   def ExpandNames(self):
6336     self.needed_locks = {}
6337     self.share_locks[locking.LEVEL_NODE] = 1
6338     if not self.op.nodes:
6339       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6340     else:
6341       self.needed_locks[locking.LEVEL_NODE] = \
6342         _GetWantedNodes(self, self.op.nodes)
6343
6344   def CheckPrereq(self):
6345     """Check prerequisites.
6346
6347     """
6348     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6349
6350   def Exec(self, feedback_fn):
6351     """Compute the list of all the exported system images.
6352
6353     @rtype: dict
6354     @return: a dictionary with the structure node->(export-list)
6355         where export-list is a list of the instances exported on
6356         that node.
6357
6358     """
6359     rpcresult = self.rpc.call_export_list(self.nodes)
6360     result = {}
6361     for node in rpcresult:
6362       if rpcresult[node].failed:
6363         result[node] = False
6364       else:
6365         result[node] = rpcresult[node].data
6366
6367     return result
6368
6369
6370 class LUExportInstance(LogicalUnit):
6371   """Export an instance to an image in the cluster.
6372
6373   """
6374   HPATH = "instance-export"
6375   HTYPE = constants.HTYPE_INSTANCE
6376   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6377   REQ_BGL = False
6378
6379   def ExpandNames(self):
6380     self._ExpandAndLockInstance()
6381     # FIXME: lock only instance primary and destination node
6382     #
6383     # Sad but true, for now we have do lock all nodes, as we don't know where
6384     # the previous export might be, and and in this LU we search for it and
6385     # remove it from its current node. In the future we could fix this by:
6386     #  - making a tasklet to search (share-lock all), then create the new one,
6387     #    then one to remove, after
6388     #  - removing the removal operation altoghether
6389     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6390
6391   def DeclareLocks(self, level):
6392     """Last minute lock declaration."""
6393     # All nodes are locked anyway, so nothing to do here.
6394
6395   def BuildHooksEnv(self):
6396     """Build hooks env.
6397
6398     This will run on the master, primary node and target node.
6399
6400     """
6401     env = {
6402       "EXPORT_NODE": self.op.target_node,
6403       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6404       }
6405     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6406     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6407           self.op.target_node]
6408     return env, nl, nl
6409
6410   def CheckPrereq(self):
6411     """Check prerequisites.
6412
6413     This checks that the instance and node names are valid.
6414
6415     """
6416     instance_name = self.op.instance_name
6417     self.instance = self.cfg.GetInstanceInfo(instance_name)
6418     assert self.instance is not None, \
6419           "Cannot retrieve locked instance %s" % self.op.instance_name
6420     _CheckNodeOnline(self, self.instance.primary_node)
6421
6422     self.dst_node = self.cfg.GetNodeInfo(
6423       self.cfg.ExpandNodeName(self.op.target_node))
6424
6425     if self.dst_node is None:
6426       # This is wrong node name, not a non-locked node
6427       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6428     _CheckNodeOnline(self, self.dst_node.name)
6429     _CheckNodeNotDrained(self, self.dst_node.name)
6430
6431     # instance disk type verification
6432     for disk in self.instance.disks:
6433       if disk.dev_type == constants.LD_FILE:
6434         raise errors.OpPrereqError("Export not supported for instances with"
6435                                    " file-based disks")
6436
6437   def Exec(self, feedback_fn):
6438     """Export an instance to an image in the cluster.
6439
6440     """
6441     instance = self.instance
6442     dst_node = self.dst_node
6443     src_node = instance.primary_node
6444     if self.op.shutdown:
6445       # shutdown the instance, but not the disks
6446       result = self.rpc.call_instance_shutdown(src_node, instance)
6447       msg = result.RemoteFailMsg()
6448       if msg:
6449         raise errors.OpExecError("Could not shutdown instance %s on"
6450                                  " node %s: %s" %
6451                                  (instance.name, src_node, msg))
6452
6453     vgname = self.cfg.GetVGName()
6454
6455     snap_disks = []
6456
6457     # set the disks ID correctly since call_instance_start needs the
6458     # correct drbd minor to create the symlinks
6459     for disk in instance.disks:
6460       self.cfg.SetDiskID(disk, src_node)
6461
6462     try:
6463       for disk in instance.disks:
6464         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6465         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6466         if new_dev_name.failed or not new_dev_name.data:
6467           self.LogWarning("Could not snapshot block device %s on node %s",
6468                           disk.logical_id[1], src_node)
6469           snap_disks.append(False)
6470         else:
6471           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6472                                  logical_id=(vgname, new_dev_name.data),
6473                                  physical_id=(vgname, new_dev_name.data),
6474                                  iv_name=disk.iv_name)
6475           snap_disks.append(new_dev)
6476
6477     finally:
6478       if self.op.shutdown and instance.admin_up:
6479         result = self.rpc.call_instance_start(src_node, instance, None, None)
6480         msg = result.RemoteFailMsg()
6481         if msg:
6482           _ShutdownInstanceDisks(self, instance)
6483           raise errors.OpExecError("Could not start instance: %s" % msg)
6484
6485     # TODO: check for size
6486
6487     cluster_name = self.cfg.GetClusterName()
6488     for idx, dev in enumerate(snap_disks):
6489       if dev:
6490         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6491                                                instance, cluster_name, idx)
6492         if result.failed or not result.data:
6493           self.LogWarning("Could not export block device %s from node %s to"
6494                           " node %s", dev.logical_id[1], src_node,
6495                           dst_node.name)
6496         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6497         if msg:
6498           self.LogWarning("Could not remove snapshot block device %s from node"
6499                           " %s: %s", dev.logical_id[1], src_node, msg)
6500
6501     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6502     if result.failed or not result.data:
6503       self.LogWarning("Could not finalize export for instance %s on node %s",
6504                       instance.name, dst_node.name)
6505
6506     nodelist = self.cfg.GetNodeList()
6507     nodelist.remove(dst_node.name)
6508
6509     # on one-node clusters nodelist will be empty after the removal
6510     # if we proceed the backup would be removed because OpQueryExports
6511     # substitutes an empty list with the full cluster node list.
6512     if nodelist:
6513       exportlist = self.rpc.call_export_list(nodelist)
6514       for node in exportlist:
6515         if exportlist[node].failed:
6516           continue
6517         if instance.name in exportlist[node].data:
6518           if not self.rpc.call_export_remove(node, instance.name):
6519             self.LogWarning("Could not remove older export for instance %s"
6520                             " on node %s", instance.name, node)
6521
6522
6523 class LURemoveExport(NoHooksLU):
6524   """Remove exports related to the named instance.
6525
6526   """
6527   _OP_REQP = ["instance_name"]
6528   REQ_BGL = False
6529
6530   def ExpandNames(self):
6531     self.needed_locks = {}
6532     # We need all nodes to be locked in order for RemoveExport to work, but we
6533     # don't need to lock the instance itself, as nothing will happen to it (and
6534     # we can remove exports also for a removed instance)
6535     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6536
6537   def CheckPrereq(self):
6538     """Check prerequisites.
6539     """
6540     pass
6541
6542   def Exec(self, feedback_fn):
6543     """Remove any export.
6544
6545     """
6546     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6547     # If the instance was not found we'll try with the name that was passed in.
6548     # This will only work if it was an FQDN, though.
6549     fqdn_warn = False
6550     if not instance_name:
6551       fqdn_warn = True
6552       instance_name = self.op.instance_name
6553
6554     exportlist = self.rpc.call_export_list(self.acquired_locks[
6555       locking.LEVEL_NODE])
6556     found = False
6557     for node in exportlist:
6558       if exportlist[node].failed:
6559         self.LogWarning("Failed to query node %s, continuing" % node)
6560         continue
6561       if instance_name in exportlist[node].data:
6562         found = True
6563         result = self.rpc.call_export_remove(node, instance_name)
6564         if result.failed or not result.data:
6565           logging.error("Could not remove export for instance %s"
6566                         " on node %s", instance_name, node)
6567
6568     if fqdn_warn and not found:
6569       feedback_fn("Export not found. If trying to remove an export belonging"
6570                   " to a deleted instance please use its Fully Qualified"
6571                   " Domain Name.")
6572
6573
6574 class TagsLU(NoHooksLU):
6575   """Generic tags LU.
6576
6577   This is an abstract class which is the parent of all the other tags LUs.
6578
6579   """
6580
6581   def ExpandNames(self):
6582     self.needed_locks = {}
6583     if self.op.kind == constants.TAG_NODE:
6584       name = self.cfg.ExpandNodeName(self.op.name)
6585       if name is None:
6586         raise errors.OpPrereqError("Invalid node name (%s)" %
6587                                    (self.op.name,))
6588       self.op.name = name
6589       self.needed_locks[locking.LEVEL_NODE] = name
6590     elif self.op.kind == constants.TAG_INSTANCE:
6591       name = self.cfg.ExpandInstanceName(self.op.name)
6592       if name is None:
6593         raise errors.OpPrereqError("Invalid instance name (%s)" %
6594                                    (self.op.name,))
6595       self.op.name = name
6596       self.needed_locks[locking.LEVEL_INSTANCE] = name
6597
6598   def CheckPrereq(self):
6599     """Check prerequisites.
6600
6601     """
6602     if self.op.kind == constants.TAG_CLUSTER:
6603       self.target = self.cfg.GetClusterInfo()
6604     elif self.op.kind == constants.TAG_NODE:
6605       self.target = self.cfg.GetNodeInfo(self.op.name)
6606     elif self.op.kind == constants.TAG_INSTANCE:
6607       self.target = self.cfg.GetInstanceInfo(self.op.name)
6608     else:
6609       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6610                                  str(self.op.kind))
6611
6612
6613 class LUGetTags(TagsLU):
6614   """Returns the tags of a given object.
6615
6616   """
6617   _OP_REQP = ["kind", "name"]
6618   REQ_BGL = False
6619
6620   def Exec(self, feedback_fn):
6621     """Returns the tag list.
6622
6623     """
6624     return list(self.target.GetTags())
6625
6626
6627 class LUSearchTags(NoHooksLU):
6628   """Searches the tags for a given pattern.
6629
6630   """
6631   _OP_REQP = ["pattern"]
6632   REQ_BGL = False
6633
6634   def ExpandNames(self):
6635     self.needed_locks = {}
6636
6637   def CheckPrereq(self):
6638     """Check prerequisites.
6639
6640     This checks the pattern passed for validity by compiling it.
6641
6642     """
6643     try:
6644       self.re = re.compile(self.op.pattern)
6645     except re.error, err:
6646       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6647                                  (self.op.pattern, err))
6648
6649   def Exec(self, feedback_fn):
6650     """Returns the tag list.
6651
6652     """
6653     cfg = self.cfg
6654     tgts = [("/cluster", cfg.GetClusterInfo())]
6655     ilist = cfg.GetAllInstancesInfo().values()
6656     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6657     nlist = cfg.GetAllNodesInfo().values()
6658     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6659     results = []
6660     for path, target in tgts:
6661       for tag in target.GetTags():
6662         if self.re.search(tag):
6663           results.append((path, tag))
6664     return results
6665
6666
6667 class LUAddTags(TagsLU):
6668   """Sets a tag on a given object.
6669
6670   """
6671   _OP_REQP = ["kind", "name", "tags"]
6672   REQ_BGL = False
6673
6674   def CheckPrereq(self):
6675     """Check prerequisites.
6676
6677     This checks the type and length of the tag name and value.
6678
6679     """
6680     TagsLU.CheckPrereq(self)
6681     for tag in self.op.tags:
6682       objects.TaggableObject.ValidateTag(tag)
6683
6684   def Exec(self, feedback_fn):
6685     """Sets the tag.
6686
6687     """
6688     try:
6689       for tag in self.op.tags:
6690         self.target.AddTag(tag)
6691     except errors.TagError, err:
6692       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6693     try:
6694       self.cfg.Update(self.target)
6695     except errors.ConfigurationError:
6696       raise errors.OpRetryError("There has been a modification to the"
6697                                 " config file and the operation has been"
6698                                 " aborted. Please retry.")
6699
6700
6701 class LUDelTags(TagsLU):
6702   """Delete a list of tags from a given object.
6703
6704   """
6705   _OP_REQP = ["kind", "name", "tags"]
6706   REQ_BGL = False
6707
6708   def CheckPrereq(self):
6709     """Check prerequisites.
6710
6711     This checks that we have the given tag.
6712
6713     """
6714     TagsLU.CheckPrereq(self)
6715     for tag in self.op.tags:
6716       objects.TaggableObject.ValidateTag(tag)
6717     del_tags = frozenset(self.op.tags)
6718     cur_tags = self.target.GetTags()
6719     if not del_tags <= cur_tags:
6720       diff_tags = del_tags - cur_tags
6721       diff_names = ["'%s'" % tag for tag in diff_tags]
6722       diff_names.sort()
6723       raise errors.OpPrereqError("Tag(s) %s not found" %
6724                                  (",".join(diff_names)))
6725
6726   def Exec(self, feedback_fn):
6727     """Remove the tag from the object.
6728
6729     """
6730     for tag in self.op.tags:
6731       self.target.RemoveTag(tag)
6732     try:
6733       self.cfg.Update(self.target)
6734     except errors.ConfigurationError:
6735       raise errors.OpRetryError("There has been a modification to the"
6736                                 " config file and the operation has been"
6737                                 " aborted. Please retry.")
6738
6739
6740 class LUTestDelay(NoHooksLU):
6741   """Sleep for a specified amount of time.
6742
6743   This LU sleeps on the master and/or nodes for a specified amount of
6744   time.
6745
6746   """
6747   _OP_REQP = ["duration", "on_master", "on_nodes"]
6748   REQ_BGL = False
6749
6750   def ExpandNames(self):
6751     """Expand names and set required locks.
6752
6753     This expands the node list, if any.
6754
6755     """
6756     self.needed_locks = {}
6757     if self.op.on_nodes:
6758       # _GetWantedNodes can be used here, but is not always appropriate to use
6759       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6760       # more information.
6761       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6762       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6763
6764   def CheckPrereq(self):
6765     """Check prerequisites.
6766
6767     """
6768
6769   def Exec(self, feedback_fn):
6770     """Do the actual sleep.
6771
6772     """
6773     if self.op.on_master:
6774       if not utils.TestDelay(self.op.duration):
6775         raise errors.OpExecError("Error during master delay test")
6776     if self.op.on_nodes:
6777       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6778       if not result:
6779         raise errors.OpExecError("Complete failure from rpc call")
6780       for node, node_result in result.items():
6781         node_result.Raise()
6782         if not node_result.data:
6783           raise errors.OpExecError("Failure during rpc call to node %s,"
6784                                    " result: %s" % (node, node_result.data))
6785
6786
6787 class IAllocator(object):
6788   """IAllocator framework.
6789
6790   An IAllocator instance has three sets of attributes:
6791     - cfg that is needed to query the cluster
6792     - input data (all members of the _KEYS class attribute are required)
6793     - four buffer attributes (in|out_data|text), that represent the
6794       input (to the external script) in text and data structure format,
6795       and the output from it, again in two formats
6796     - the result variables from the script (success, info, nodes) for
6797       easy usage
6798
6799   """
6800   _ALLO_KEYS = [
6801     "mem_size", "disks", "disk_template",
6802     "os", "tags", "nics", "vcpus", "hypervisor",
6803     ]
6804   _RELO_KEYS = [
6805     "relocate_from",
6806     ]
6807
6808   def __init__(self, lu, mode, name, **kwargs):
6809     self.lu = lu
6810     # init buffer variables
6811     self.in_text = self.out_text = self.in_data = self.out_data = None
6812     # init all input fields so that pylint is happy
6813     self.mode = mode
6814     self.name = name
6815     self.mem_size = self.disks = self.disk_template = None
6816     self.os = self.tags = self.nics = self.vcpus = None
6817     self.hypervisor = None
6818     self.relocate_from = None
6819     # computed fields
6820     self.required_nodes = None
6821     # init result fields
6822     self.success = self.info = self.nodes = None
6823     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6824       keyset = self._ALLO_KEYS
6825     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6826       keyset = self._RELO_KEYS
6827     else:
6828       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6829                                    " IAllocator" % self.mode)
6830     for key in kwargs:
6831       if key not in keyset:
6832         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6833                                      " IAllocator" % key)
6834       setattr(self, key, kwargs[key])
6835     for key in keyset:
6836       if key not in kwargs:
6837         raise errors.ProgrammerError("Missing input parameter '%s' to"
6838                                      " IAllocator" % key)
6839     self._BuildInputData()
6840
6841   def _ComputeClusterData(self):
6842     """Compute the generic allocator input data.
6843
6844     This is the data that is independent of the actual operation.
6845
6846     """
6847     cfg = self.lu.cfg
6848     cluster_info = cfg.GetClusterInfo()
6849     # cluster data
6850     data = {
6851       "version": constants.IALLOCATOR_VERSION,
6852       "cluster_name": cfg.GetClusterName(),
6853       "cluster_tags": list(cluster_info.GetTags()),
6854       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6855       # we don't have job IDs
6856       }
6857     iinfo = cfg.GetAllInstancesInfo().values()
6858     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6859
6860     # node data
6861     node_results = {}
6862     node_list = cfg.GetNodeList()
6863
6864     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6865       hypervisor_name = self.hypervisor
6866     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6867       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6868
6869     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6870                                            hypervisor_name)
6871     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6872                        cluster_info.enabled_hypervisors)
6873     for nname, nresult in node_data.items():
6874       # first fill in static (config-based) values
6875       ninfo = cfg.GetNodeInfo(nname)
6876       pnr = {
6877         "tags": list(ninfo.GetTags()),
6878         "primary_ip": ninfo.primary_ip,
6879         "secondary_ip": ninfo.secondary_ip,
6880         "offline": ninfo.offline,
6881         "drained": ninfo.drained,
6882         "master_candidate": ninfo.master_candidate,
6883         }
6884
6885       if not ninfo.offline:
6886         nresult.Raise()
6887         if not isinstance(nresult.data, dict):
6888           raise errors.OpExecError("Can't get data for node %s" % nname)
6889         remote_info = nresult.data
6890         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6891                      'vg_size', 'vg_free', 'cpu_total']:
6892           if attr not in remote_info:
6893             raise errors.OpExecError("Node '%s' didn't return attribute"
6894                                      " '%s'" % (nname, attr))
6895           try:
6896             remote_info[attr] = int(remote_info[attr])
6897           except ValueError, err:
6898             raise errors.OpExecError("Node '%s' returned invalid value"
6899                                      " for '%s': %s" % (nname, attr, err))
6900         # compute memory used by primary instances
6901         i_p_mem = i_p_up_mem = 0
6902         for iinfo, beinfo in i_list:
6903           if iinfo.primary_node == nname:
6904             i_p_mem += beinfo[constants.BE_MEMORY]
6905             if iinfo.name not in node_iinfo[nname].data:
6906               i_used_mem = 0
6907             else:
6908               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6909             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6910             remote_info['memory_free'] -= max(0, i_mem_diff)
6911
6912             if iinfo.admin_up:
6913               i_p_up_mem += beinfo[constants.BE_MEMORY]
6914
6915         # compute memory used by instances
6916         pnr_dyn = {
6917           "total_memory": remote_info['memory_total'],
6918           "reserved_memory": remote_info['memory_dom0'],
6919           "free_memory": remote_info['memory_free'],
6920           "total_disk": remote_info['vg_size'],
6921           "free_disk": remote_info['vg_free'],
6922           "total_cpus": remote_info['cpu_total'],
6923           "i_pri_memory": i_p_mem,
6924           "i_pri_up_memory": i_p_up_mem,
6925           }
6926         pnr.update(pnr_dyn)
6927
6928       node_results[nname] = pnr
6929     data["nodes"] = node_results
6930
6931     # instance data
6932     instance_data = {}
6933     for iinfo, beinfo in i_list:
6934       nic_data = []
6935       for nic in iinfo.nics:
6936         filled_params = objects.FillDict(
6937             cluster_info.nicparams[constants.PP_DEFAULT],
6938             nic.nicparams)
6939         nic_dict = {"mac": nic.mac,
6940                     "ip": nic.ip,
6941                     "mode": filled_params[constants.NIC_MODE],
6942                     "link": filled_params[constants.NIC_LINK],
6943                    }
6944         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
6945           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
6946         nic_data.append(nic_dict)
6947       pir = {
6948         "tags": list(iinfo.GetTags()),
6949         "admin_up": iinfo.admin_up,
6950         "vcpus": beinfo[constants.BE_VCPUS],
6951         "memory": beinfo[constants.BE_MEMORY],
6952         "os": iinfo.os,
6953         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6954         "nics": nic_data,
6955         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6956         "disk_template": iinfo.disk_template,
6957         "hypervisor": iinfo.hypervisor,
6958         }
6959       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6960                                                  pir["disks"])
6961       instance_data[iinfo.name] = pir
6962
6963     data["instances"] = instance_data
6964
6965     self.in_data = data
6966
6967   def _AddNewInstance(self):
6968     """Add new instance data to allocator structure.
6969
6970     This in combination with _AllocatorGetClusterData will create the
6971     correct structure needed as input for the allocator.
6972
6973     The checks for the completeness of the opcode must have already been
6974     done.
6975
6976     """
6977     data = self.in_data
6978
6979     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6980
6981     if self.disk_template in constants.DTS_NET_MIRROR:
6982       self.required_nodes = 2
6983     else:
6984       self.required_nodes = 1
6985     request = {
6986       "type": "allocate",
6987       "name": self.name,
6988       "disk_template": self.disk_template,
6989       "tags": self.tags,
6990       "os": self.os,
6991       "vcpus": self.vcpus,
6992       "memory": self.mem_size,
6993       "disks": self.disks,
6994       "disk_space_total": disk_space,
6995       "nics": self.nics,
6996       "required_nodes": self.required_nodes,
6997       }
6998     data["request"] = request
6999
7000   def _AddRelocateInstance(self):
7001     """Add relocate instance data to allocator structure.
7002
7003     This in combination with _IAllocatorGetClusterData will create the
7004     correct structure needed as input for the allocator.
7005
7006     The checks for the completeness of the opcode must have already been
7007     done.
7008
7009     """
7010     instance = self.lu.cfg.GetInstanceInfo(self.name)
7011     if instance is None:
7012       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7013                                    " IAllocator" % self.name)
7014
7015     if instance.disk_template not in constants.DTS_NET_MIRROR:
7016       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7017
7018     if len(instance.secondary_nodes) != 1:
7019       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7020
7021     self.required_nodes = 1
7022     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7023     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7024
7025     request = {
7026       "type": "relocate",
7027       "name": self.name,
7028       "disk_space_total": disk_space,
7029       "required_nodes": self.required_nodes,
7030       "relocate_from": self.relocate_from,
7031       }
7032     self.in_data["request"] = request
7033
7034   def _BuildInputData(self):
7035     """Build input data structures.
7036
7037     """
7038     self._ComputeClusterData()
7039
7040     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7041       self._AddNewInstance()
7042     else:
7043       self._AddRelocateInstance()
7044
7045     self.in_text = serializer.Dump(self.in_data)
7046
7047   def Run(self, name, validate=True, call_fn=None):
7048     """Run an instance allocator and return the results.
7049
7050     """
7051     if call_fn is None:
7052       call_fn = self.lu.rpc.call_iallocator_runner
7053     data = self.in_text
7054
7055     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7056     result.Raise()
7057
7058     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7059       raise errors.OpExecError("Invalid result from master iallocator runner")
7060
7061     rcode, stdout, stderr, fail = result.data
7062
7063     if rcode == constants.IARUN_NOTFOUND:
7064       raise errors.OpExecError("Can't find allocator '%s'" % name)
7065     elif rcode == constants.IARUN_FAILURE:
7066       raise errors.OpExecError("Instance allocator call failed: %s,"
7067                                " output: %s" % (fail, stdout+stderr))
7068     self.out_text = stdout
7069     if validate:
7070       self._ValidateResult()
7071
7072   def _ValidateResult(self):
7073     """Process the allocator results.
7074
7075     This will process and if successful save the result in
7076     self.out_data and the other parameters.
7077
7078     """
7079     try:
7080       rdict = serializer.Load(self.out_text)
7081     except Exception, err:
7082       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7083
7084     if not isinstance(rdict, dict):
7085       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7086
7087     for key in "success", "info", "nodes":
7088       if key not in rdict:
7089         raise errors.OpExecError("Can't parse iallocator results:"
7090                                  " missing key '%s'" % key)
7091       setattr(self, key, rdict[key])
7092
7093     if not isinstance(rdict["nodes"], list):
7094       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7095                                " is not a list")
7096     self.out_data = rdict
7097
7098
7099 class LUTestAllocator(NoHooksLU):
7100   """Run allocator tests.
7101
7102   This LU runs the allocator tests
7103
7104   """
7105   _OP_REQP = ["direction", "mode", "name"]
7106
7107   def CheckPrereq(self):
7108     """Check prerequisites.
7109
7110     This checks the opcode parameters depending on the director and mode test.
7111
7112     """
7113     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7114       for attr in ["name", "mem_size", "disks", "disk_template",
7115                    "os", "tags", "nics", "vcpus"]:
7116         if not hasattr(self.op, attr):
7117           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7118                                      attr)
7119       iname = self.cfg.ExpandInstanceName(self.op.name)
7120       if iname is not None:
7121         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7122                                    iname)
7123       if not isinstance(self.op.nics, list):
7124         raise errors.OpPrereqError("Invalid parameter 'nics'")
7125       for row in self.op.nics:
7126         if (not isinstance(row, dict) or
7127             "mac" not in row or
7128             "ip" not in row or
7129             "bridge" not in row):
7130           raise errors.OpPrereqError("Invalid contents of the"
7131                                      " 'nics' parameter")
7132       if not isinstance(self.op.disks, list):
7133         raise errors.OpPrereqError("Invalid parameter 'disks'")
7134       for row in self.op.disks:
7135         if (not isinstance(row, dict) or
7136             "size" not in row or
7137             not isinstance(row["size"], int) or
7138             "mode" not in row or
7139             row["mode"] not in ['r', 'w']):
7140           raise errors.OpPrereqError("Invalid contents of the"
7141                                      " 'disks' parameter")
7142       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7143         self.op.hypervisor = self.cfg.GetHypervisorType()
7144     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7145       if not hasattr(self.op, "name"):
7146         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7147       fname = self.cfg.ExpandInstanceName(self.op.name)
7148       if fname is None:
7149         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7150                                    self.op.name)
7151       self.op.name = fname
7152       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7153     else:
7154       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7155                                  self.op.mode)
7156
7157     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7158       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7159         raise errors.OpPrereqError("Missing allocator name")
7160     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7161       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7162                                  self.op.direction)
7163
7164   def Exec(self, feedback_fn):
7165     """Run the allocator test.
7166
7167     """
7168     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7169       ial = IAllocator(self,
7170                        mode=self.op.mode,
7171                        name=self.op.name,
7172                        mem_size=self.op.mem_size,
7173                        disks=self.op.disks,
7174                        disk_template=self.op.disk_template,
7175                        os=self.op.os,
7176                        tags=self.op.tags,
7177                        nics=self.op.nics,
7178                        vcpus=self.op.vcpus,
7179                        hypervisor=self.op.hypervisor,
7180                        )
7181     else:
7182       ial = IAllocator(self,
7183                        mode=self.op.mode,
7184                        name=self.op.name,
7185                        relocate_from=list(self.relocate_from),
7186                        )
7187
7188     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7189       result = ial.in_text
7190     else:
7191       ial.Run(self.op.allocator, validate=False)
7192       result = ial.out_text
7193     return result