Convert iallocator_runner rpc to new result style
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, mac, mode, link) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_MAC" % idx] = mac
509       env["INSTANCE_NIC%d_MODE" % idx] = mode
510       env["INSTANCE_NIC%d_LINK" % idx] = link
511       if mode == constants.NIC_MODE_BRIDGED:
512         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
513   else:
514     nic_count = 0
515
516   env["INSTANCE_NIC_COUNT"] = nic_count
517
518   if disks:
519     disk_count = len(disks)
520     for idx, (size, mode) in enumerate(disks):
521       env["INSTANCE_DISK%d_SIZE" % idx] = size
522       env["INSTANCE_DISK%d_MODE" % idx] = mode
523   else:
524     disk_count = 0
525
526   env["INSTANCE_DISK_COUNT"] = disk_count
527
528   return env
529
530 def _PreBuildNICHooksList(lu, nics):
531   """Build a list of nic information tuples.
532
533   This list is suitable to be passed to _BuildInstanceHookEnv.
534
535   @type lu:  L{LogicalUnit}
536   @param lu: the logical unit on whose behalf we execute
537   @type nics: list of L{objects.NIC}
538   @param nics: list of nics to convert to hooks tuples
539
540   """
541   hooks_nics = []
542   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
543   for nic in nics:
544     ip = nic.ip
545     mac = nic.mac
546     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547     mode = filled_params[constants.NIC_MODE]
548     link = filled_params[constants.NIC_LINK]
549     hooks_nics.append((ip, mac, mode, link))
550   return hooks_nics
551
552 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553   """Builds instance related env variables for hooks from an object.
554
555   @type lu: L{LogicalUnit}
556   @param lu: the logical unit on whose behalf we execute
557   @type instance: L{objects.Instance}
558   @param instance: the instance for which we should build the
559       environment
560   @type override: dict
561   @param override: dictionary with key/values that will override
562       our values
563   @rtype: dict
564   @return: the hook environment dictionary
565
566   """
567   bep = lu.cfg.GetClusterInfo().FillBE(instance)
568   args = {
569     'name': instance.name,
570     'primary_node': instance.primary_node,
571     'secondary_nodes': instance.secondary_nodes,
572     'os_type': instance.os,
573     'status': instance.admin_up,
574     'memory': bep[constants.BE_MEMORY],
575     'vcpus': bep[constants.BE_VCPUS],
576     'nics': _PreBuildNICHooksList(lu, instance.nics),
577     'disk_template': instance.disk_template,
578     'disks': [(disk.size, disk.mode) for disk in instance.disks],
579   }
580   if override:
581     args.update(override)
582   return _BuildInstanceHookEnv(**args)
583
584
585 def _AdjustCandidatePool(lu):
586   """Adjust the candidate pool after node operations.
587
588   """
589   mod_list = lu.cfg.MaintainCandidatePool()
590   if mod_list:
591     lu.LogInfo("Promoted nodes to master candidate role: %s",
592                ", ".join(node.name for node in mod_list))
593     for name in mod_list:
594       lu.context.ReaddNode(name)
595   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
596   if mc_now > mc_max:
597     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
598                (mc_now, mc_max))
599
600
601 def _CheckNicsBridgesExist(lu, target_nics, target_node,
602                                profile=constants.PP_DEFAULT):
603   """Check that the brigdes needed by a list of nics exist.
604
605   """
606   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608                 for nic in target_nics]
609   brlist = [params[constants.NIC_LINK] for params in paramslist
610             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
611   if brlist:
612     result = lu.rpc.call_bridges_exist(target_node, brlist)
613     msg = result.RemoteFailMsg()
614     if msg:
615       raise errors.OpPrereqError("Error checking bridges on destination node"
616                                  " '%s': %s" % (target_node, msg))
617
618
619 def _CheckInstanceBridgesExist(lu, instance, node=None):
620   """Check that the brigdes needed by an instance exist.
621
622   """
623   if node is None:
624     node=instance.primary_node
625   _CheckNicsBridgesExist(lu, instance.nics, node)
626
627
628 class LUDestroyCluster(NoHooksLU):
629   """Logical unit for destroying the cluster.
630
631   """
632   _OP_REQP = []
633
634   def CheckPrereq(self):
635     """Check prerequisites.
636
637     This checks whether the cluster is empty.
638
639     Any errors are signalled by raising errors.OpPrereqError.
640
641     """
642     master = self.cfg.GetMasterNode()
643
644     nodelist = self.cfg.GetNodeList()
645     if len(nodelist) != 1 or nodelist[0] != master:
646       raise errors.OpPrereqError("There are still %d node(s) in"
647                                  " this cluster." % (len(nodelist) - 1))
648     instancelist = self.cfg.GetInstanceList()
649     if instancelist:
650       raise errors.OpPrereqError("There are still %d instance(s) in"
651                                  " this cluster." % len(instancelist))
652
653   def Exec(self, feedback_fn):
654     """Destroys the cluster.
655
656     """
657     master = self.cfg.GetMasterNode()
658     result = self.rpc.call_node_stop_master(master, False)
659     msg = result.RemoteFailMsg()
660     if msg:
661       raise errors.OpExecError("Could not disable the master role: %s" % msg)
662     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
663     utils.CreateBackup(priv_key)
664     utils.CreateBackup(pub_key)
665     return master
666
667
668 class LUVerifyCluster(LogicalUnit):
669   """Verifies the cluster status.
670
671   """
672   HPATH = "cluster-verify"
673   HTYPE = constants.HTYPE_CLUSTER
674   _OP_REQP = ["skip_checks"]
675   REQ_BGL = False
676
677   def ExpandNames(self):
678     self.needed_locks = {
679       locking.LEVEL_NODE: locking.ALL_SET,
680       locking.LEVEL_INSTANCE: locking.ALL_SET,
681     }
682     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
683
684   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
685                   node_result, feedback_fn, master_files,
686                   drbd_map, vg_name):
687     """Run multiple tests against a node.
688
689     Test list:
690
691       - compares ganeti version
692       - checks vg existance and size > 20G
693       - checks config file checksum
694       - checks ssh to other nodes
695
696     @type nodeinfo: L{objects.Node}
697     @param nodeinfo: the node to check
698     @param file_list: required list of files
699     @param local_cksum: dictionary of local files and their checksums
700     @param node_result: the results from the node
701     @param feedback_fn: function used to accumulate results
702     @param master_files: list of files that only masters should have
703     @param drbd_map: the useddrbd minors for this node, in
704         form of minor: (instance, must_exist) which correspond to instances
705         and their running status
706     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
707
708     """
709     node = nodeinfo.name
710
711     # main result, node_result should be a non-empty dict
712     if not node_result or not isinstance(node_result, dict):
713       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
714       return True
715
716     # compares ganeti version
717     local_version = constants.PROTOCOL_VERSION
718     remote_version = node_result.get('version', None)
719     if not (remote_version and isinstance(remote_version, (list, tuple)) and
720             len(remote_version) == 2):
721       feedback_fn("  - ERROR: connection to %s failed" % (node))
722       return True
723
724     if local_version != remote_version[0]:
725       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
726                   " node %s %s" % (local_version, node, remote_version[0]))
727       return True
728
729     # node seems compatible, we can actually try to look into its results
730
731     bad = False
732
733     # full package version
734     if constants.RELEASE_VERSION != remote_version[1]:
735       feedback_fn("  - WARNING: software version mismatch: master %s,"
736                   " node %s %s" %
737                   (constants.RELEASE_VERSION, node, remote_version[1]))
738
739     # checks vg existence and size > 20G
740     if vg_name is not None:
741       vglist = node_result.get(constants.NV_VGLIST, None)
742       if not vglist:
743         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
744                         (node,))
745         bad = True
746       else:
747         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
748                                               constants.MIN_VG_SIZE)
749         if vgstatus:
750           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
751           bad = True
752
753     # checks config file checksum
754
755     remote_cksum = node_result.get(constants.NV_FILELIST, None)
756     if not isinstance(remote_cksum, dict):
757       bad = True
758       feedback_fn("  - ERROR: node hasn't returned file checksum data")
759     else:
760       for file_name in file_list:
761         node_is_mc = nodeinfo.master_candidate
762         must_have_file = file_name not in master_files
763         if file_name not in remote_cksum:
764           if node_is_mc or must_have_file:
765             bad = True
766             feedback_fn("  - ERROR: file '%s' missing" % file_name)
767         elif remote_cksum[file_name] != local_cksum[file_name]:
768           if node_is_mc or must_have_file:
769             bad = True
770             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
771           else:
772             # not candidate and this is not a must-have file
773             bad = True
774             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
775                         " '%s'" % file_name)
776         else:
777           # all good, except non-master/non-must have combination
778           if not node_is_mc and not must_have_file:
779             feedback_fn("  - ERROR: file '%s' should not exist on non master"
780                         " candidates" % file_name)
781
782     # checks ssh to any
783
784     if constants.NV_NODELIST not in node_result:
785       bad = True
786       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
787     else:
788       if node_result[constants.NV_NODELIST]:
789         bad = True
790         for node in node_result[constants.NV_NODELIST]:
791           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
792                           (node, node_result[constants.NV_NODELIST][node]))
793
794     if constants.NV_NODENETTEST not in node_result:
795       bad = True
796       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
797     else:
798       if node_result[constants.NV_NODENETTEST]:
799         bad = True
800         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
801         for node in nlist:
802           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
803                           (node, node_result[constants.NV_NODENETTEST][node]))
804
805     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
806     if isinstance(hyp_result, dict):
807       for hv_name, hv_result in hyp_result.iteritems():
808         if hv_result is not None:
809           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
810                       (hv_name, hv_result))
811
812     # check used drbd list
813     if vg_name is not None:
814       used_minors = node_result.get(constants.NV_DRBDLIST, [])
815       if not isinstance(used_minors, (tuple, list)):
816         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
817                     str(used_minors))
818       else:
819         for minor, (iname, must_exist) in drbd_map.items():
820           if minor not in used_minors and must_exist:
821             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
822                         " not active" % (minor, iname))
823             bad = True
824         for minor in used_minors:
825           if minor not in drbd_map:
826             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
827                         minor)
828             bad = True
829
830     return bad
831
832   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
833                       node_instance, feedback_fn, n_offline):
834     """Verify an instance.
835
836     This function checks to see if the required block devices are
837     available on the instance's node.
838
839     """
840     bad = False
841
842     node_current = instanceconfig.primary_node
843
844     node_vol_should = {}
845     instanceconfig.MapLVsByNode(node_vol_should)
846
847     for node in node_vol_should:
848       if node in n_offline:
849         # ignore missing volumes on offline nodes
850         continue
851       for volume in node_vol_should[node]:
852         if node not in node_vol_is or volume not in node_vol_is[node]:
853           feedback_fn("  - ERROR: volume %s missing on node %s" %
854                           (volume, node))
855           bad = True
856
857     if instanceconfig.admin_up:
858       if ((node_current not in node_instance or
859           not instance in node_instance[node_current]) and
860           node_current not in n_offline):
861         feedback_fn("  - ERROR: instance %s not running on node %s" %
862                         (instance, node_current))
863         bad = True
864
865     for node in node_instance:
866       if (not node == node_current):
867         if instance in node_instance[node]:
868           feedback_fn("  - ERROR: instance %s should not run on node %s" %
869                           (instance, node))
870           bad = True
871
872     return bad
873
874   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
875     """Verify if there are any unknown volumes in the cluster.
876
877     The .os, .swap and backup volumes are ignored. All other volumes are
878     reported as unknown.
879
880     """
881     bad = False
882
883     for node in node_vol_is:
884       for volume in node_vol_is[node]:
885         if node not in node_vol_should or volume not in node_vol_should[node]:
886           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
887                       (volume, node))
888           bad = True
889     return bad
890
891   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
892     """Verify the list of running instances.
893
894     This checks what instances are running but unknown to the cluster.
895
896     """
897     bad = False
898     for node in node_instance:
899       for runninginstance in node_instance[node]:
900         if runninginstance not in instancelist:
901           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
902                           (runninginstance, node))
903           bad = True
904     return bad
905
906   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
907     """Verify N+1 Memory Resilience.
908
909     Check that if one single node dies we can still start all the instances it
910     was primary for.
911
912     """
913     bad = False
914
915     for node, nodeinfo in node_info.iteritems():
916       # This code checks that every node which is now listed as secondary has
917       # enough memory to host all instances it is supposed to should a single
918       # other node in the cluster fail.
919       # FIXME: not ready for failover to an arbitrary node
920       # FIXME: does not support file-backed instances
921       # WARNING: we currently take into account down instances as well as up
922       # ones, considering that even if they're down someone might want to start
923       # them even in the event of a node failure.
924       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
925         needed_mem = 0
926         for instance in instances:
927           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
928           if bep[constants.BE_AUTO_BALANCE]:
929             needed_mem += bep[constants.BE_MEMORY]
930         if nodeinfo['mfree'] < needed_mem:
931           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
932                       " failovers should node %s fail" % (node, prinode))
933           bad = True
934     return bad
935
936   def CheckPrereq(self):
937     """Check prerequisites.
938
939     Transform the list of checks we're going to skip into a set and check that
940     all its members are valid.
941
942     """
943     self.skip_set = frozenset(self.op.skip_checks)
944     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
945       raise errors.OpPrereqError("Invalid checks to be skipped specified")
946
947   def BuildHooksEnv(self):
948     """Build hooks env.
949
950     Cluster-Verify hooks just rone in the post phase and their failure makes
951     the output be logged in the verify output and the verification to fail.
952
953     """
954     all_nodes = self.cfg.GetNodeList()
955     env = {
956       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
957       }
958     for node in self.cfg.GetAllNodesInfo().values():
959       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
960
961     return env, [], all_nodes
962
963   def Exec(self, feedback_fn):
964     """Verify integrity of cluster, performing various test on nodes.
965
966     """
967     bad = False
968     feedback_fn("* Verifying global settings")
969     for msg in self.cfg.VerifyConfig():
970       feedback_fn("  - ERROR: %s" % msg)
971
972     vg_name = self.cfg.GetVGName()
973     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
974     nodelist = utils.NiceSort(self.cfg.GetNodeList())
975     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
976     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
977     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
978                         for iname in instancelist)
979     i_non_redundant = [] # Non redundant instances
980     i_non_a_balanced = [] # Non auto-balanced instances
981     n_offline = [] # List of offline nodes
982     n_drained = [] # List of nodes being drained
983     node_volume = {}
984     node_instance = {}
985     node_info = {}
986     instance_cfg = {}
987
988     # FIXME: verify OS list
989     # do local checksums
990     master_files = [constants.CLUSTER_CONF_FILE]
991
992     file_names = ssconf.SimpleStore().GetFileList()
993     file_names.append(constants.SSL_CERT_FILE)
994     file_names.append(constants.RAPI_CERT_FILE)
995     file_names.extend(master_files)
996
997     local_checksums = utils.FingerprintFiles(file_names)
998
999     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1000     node_verify_param = {
1001       constants.NV_FILELIST: file_names,
1002       constants.NV_NODELIST: [node.name for node in nodeinfo
1003                               if not node.offline],
1004       constants.NV_HYPERVISOR: hypervisors,
1005       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1006                                   node.secondary_ip) for node in nodeinfo
1007                                  if not node.offline],
1008       constants.NV_INSTANCELIST: hypervisors,
1009       constants.NV_VERSION: None,
1010       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1011       }
1012     if vg_name is not None:
1013       node_verify_param[constants.NV_VGLIST] = None
1014       node_verify_param[constants.NV_LVLIST] = vg_name
1015       node_verify_param[constants.NV_DRBDLIST] = None
1016     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1017                                            self.cfg.GetClusterName())
1018
1019     cluster = self.cfg.GetClusterInfo()
1020     master_node = self.cfg.GetMasterNode()
1021     all_drbd_map = self.cfg.ComputeDRBDMap()
1022
1023     for node_i in nodeinfo:
1024       node = node_i.name
1025
1026       if node_i.offline:
1027         feedback_fn("* Skipping offline node %s" % (node,))
1028         n_offline.append(node)
1029         continue
1030
1031       if node == master_node:
1032         ntype = "master"
1033       elif node_i.master_candidate:
1034         ntype = "master candidate"
1035       elif node_i.drained:
1036         ntype = "drained"
1037         n_drained.append(node)
1038       else:
1039         ntype = "regular"
1040       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1041
1042       msg = all_nvinfo[node].RemoteFailMsg()
1043       if msg:
1044         feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1045         bad = True
1046         continue
1047
1048       nresult = all_nvinfo[node].payload
1049       node_drbd = {}
1050       for minor, instance in all_drbd_map[node].items():
1051         if instance not in instanceinfo:
1052           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1053                       instance)
1054           # ghost instance should not be running, but otherwise we
1055           # don't give double warnings (both ghost instance and
1056           # unallocated minor in use)
1057           node_drbd[minor] = (instance, False)
1058         else:
1059           instance = instanceinfo[instance]
1060           node_drbd[minor] = (instance.name, instance.admin_up)
1061       result = self._VerifyNode(node_i, file_names, local_checksums,
1062                                 nresult, feedback_fn, master_files,
1063                                 node_drbd, vg_name)
1064       bad = bad or result
1065
1066       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1067       if vg_name is None:
1068         node_volume[node] = {}
1069       elif isinstance(lvdata, basestring):
1070         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1071                     (node, utils.SafeEncode(lvdata)))
1072         bad = True
1073         node_volume[node] = {}
1074       elif not isinstance(lvdata, dict):
1075         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1076         bad = True
1077         continue
1078       else:
1079         node_volume[node] = lvdata
1080
1081       # node_instance
1082       idata = nresult.get(constants.NV_INSTANCELIST, None)
1083       if not isinstance(idata, list):
1084         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1085                     (node,))
1086         bad = True
1087         continue
1088
1089       node_instance[node] = idata
1090
1091       # node_info
1092       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1093       if not isinstance(nodeinfo, dict):
1094         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1095         bad = True
1096         continue
1097
1098       try:
1099         node_info[node] = {
1100           "mfree": int(nodeinfo['memory_free']),
1101           "pinst": [],
1102           "sinst": [],
1103           # dictionary holding all instances this node is secondary for,
1104           # grouped by their primary node. Each key is a cluster node, and each
1105           # value is a list of instances which have the key as primary and the
1106           # current node as secondary.  this is handy to calculate N+1 memory
1107           # availability if you can only failover from a primary to its
1108           # secondary.
1109           "sinst-by-pnode": {},
1110         }
1111         # FIXME: devise a free space model for file based instances as well
1112         if vg_name is not None:
1113           if (constants.NV_VGLIST not in nresult or
1114               vg_name not in nresult[constants.NV_VGLIST]):
1115             feedback_fn("  - ERROR: node %s didn't return data for the"
1116                         " volume group '%s' - it is either missing or broken" %
1117                         (node, vg_name))
1118             bad = True
1119             continue
1120           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1121       except (ValueError, KeyError):
1122         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1123                     " from node %s" % (node,))
1124         bad = True
1125         continue
1126
1127     node_vol_should = {}
1128
1129     for instance in instancelist:
1130       feedback_fn("* Verifying instance %s" % instance)
1131       inst_config = instanceinfo[instance]
1132       result =  self._VerifyInstance(instance, inst_config, node_volume,
1133                                      node_instance, feedback_fn, n_offline)
1134       bad = bad or result
1135       inst_nodes_offline = []
1136
1137       inst_config.MapLVsByNode(node_vol_should)
1138
1139       instance_cfg[instance] = inst_config
1140
1141       pnode = inst_config.primary_node
1142       if pnode in node_info:
1143         node_info[pnode]['pinst'].append(instance)
1144       elif pnode not in n_offline:
1145         feedback_fn("  - ERROR: instance %s, connection to primary node"
1146                     " %s failed" % (instance, pnode))
1147         bad = True
1148
1149       if pnode in n_offline:
1150         inst_nodes_offline.append(pnode)
1151
1152       # If the instance is non-redundant we cannot survive losing its primary
1153       # node, so we are not N+1 compliant. On the other hand we have no disk
1154       # templates with more than one secondary so that situation is not well
1155       # supported either.
1156       # FIXME: does not support file-backed instances
1157       if len(inst_config.secondary_nodes) == 0:
1158         i_non_redundant.append(instance)
1159       elif len(inst_config.secondary_nodes) > 1:
1160         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1161                     % instance)
1162
1163       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1164         i_non_a_balanced.append(instance)
1165
1166       for snode in inst_config.secondary_nodes:
1167         if snode in node_info:
1168           node_info[snode]['sinst'].append(instance)
1169           if pnode not in node_info[snode]['sinst-by-pnode']:
1170             node_info[snode]['sinst-by-pnode'][pnode] = []
1171           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1172         elif snode not in n_offline:
1173           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1174                       " %s failed" % (instance, snode))
1175           bad = True
1176         if snode in n_offline:
1177           inst_nodes_offline.append(snode)
1178
1179       if inst_nodes_offline:
1180         # warn that the instance lives on offline nodes, and set bad=True
1181         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1182                     ", ".join(inst_nodes_offline))
1183         bad = True
1184
1185     feedback_fn("* Verifying orphan volumes")
1186     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1187                                        feedback_fn)
1188     bad = bad or result
1189
1190     feedback_fn("* Verifying remaining instances")
1191     result = self._VerifyOrphanInstances(instancelist, node_instance,
1192                                          feedback_fn)
1193     bad = bad or result
1194
1195     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1196       feedback_fn("* Verifying N+1 Memory redundancy")
1197       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1198       bad = bad or result
1199
1200     feedback_fn("* Other Notes")
1201     if i_non_redundant:
1202       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1203                   % len(i_non_redundant))
1204
1205     if i_non_a_balanced:
1206       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1207                   % len(i_non_a_balanced))
1208
1209     if n_offline:
1210       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1211
1212     if n_drained:
1213       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1214
1215     return not bad
1216
1217   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1218     """Analize the post-hooks' result
1219
1220     This method analyses the hook result, handles it, and sends some
1221     nicely-formatted feedback back to the user.
1222
1223     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1224         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1225     @param hooks_results: the results of the multi-node hooks rpc call
1226     @param feedback_fn: function used send feedback back to the caller
1227     @param lu_result: previous Exec result
1228     @return: the new Exec result, based on the previous result
1229         and hook results
1230
1231     """
1232     # We only really run POST phase hooks, and are only interested in
1233     # their results
1234     if phase == constants.HOOKS_PHASE_POST:
1235       # Used to change hooks' output to proper indentation
1236       indent_re = re.compile('^', re.M)
1237       feedback_fn("* Hooks Results")
1238       if not hooks_results:
1239         feedback_fn("  - ERROR: general communication failure")
1240         lu_result = 1
1241       else:
1242         for node_name in hooks_results:
1243           show_node_header = True
1244           res = hooks_results[node_name]
1245           msg = res.RemoteFailMsg()
1246           if msg:
1247             if res.offline:
1248               # no need to warn or set fail return value
1249               continue
1250             feedback_fn("    Communication failure in hooks execution: %s" %
1251                         msg)
1252             lu_result = 1
1253             continue
1254           for script, hkr, output in res.payload:
1255             if hkr == constants.HKR_FAIL:
1256               # The node header is only shown once, if there are
1257               # failing hooks on that node
1258               if show_node_header:
1259                 feedback_fn("  Node %s:" % node_name)
1260                 show_node_header = False
1261               feedback_fn("    ERROR: Script %s failed, output:" % script)
1262               output = indent_re.sub('      ', output)
1263               feedback_fn("%s" % output)
1264               lu_result = 1
1265
1266       return lu_result
1267
1268
1269 class LUVerifyDisks(NoHooksLU):
1270   """Verifies the cluster disks status.
1271
1272   """
1273   _OP_REQP = []
1274   REQ_BGL = False
1275
1276   def ExpandNames(self):
1277     self.needed_locks = {
1278       locking.LEVEL_NODE: locking.ALL_SET,
1279       locking.LEVEL_INSTANCE: locking.ALL_SET,
1280     }
1281     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1282
1283   def CheckPrereq(self):
1284     """Check prerequisites.
1285
1286     This has no prerequisites.
1287
1288     """
1289     pass
1290
1291   def Exec(self, feedback_fn):
1292     """Verify integrity of cluster disks.
1293
1294     @rtype: tuple of three items
1295     @return: a tuple of (dict of node-to-node_error, list of instances
1296         which need activate-disks, dict of instance: (node, volume) for
1297         missing volumes
1298
1299     """
1300     result = res_nodes, res_instances, res_missing = {}, [], {}
1301
1302     vg_name = self.cfg.GetVGName()
1303     nodes = utils.NiceSort(self.cfg.GetNodeList())
1304     instances = [self.cfg.GetInstanceInfo(name)
1305                  for name in self.cfg.GetInstanceList()]
1306
1307     nv_dict = {}
1308     for inst in instances:
1309       inst_lvs = {}
1310       if (not inst.admin_up or
1311           inst.disk_template not in constants.DTS_NET_MIRROR):
1312         continue
1313       inst.MapLVsByNode(inst_lvs)
1314       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1315       for node, vol_list in inst_lvs.iteritems():
1316         for vol in vol_list:
1317           nv_dict[(node, vol)] = inst
1318
1319     if not nv_dict:
1320       return result
1321
1322     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1323
1324     to_act = set()
1325     for node in nodes:
1326       # node_volume
1327       node_res = node_lvs[node]
1328       if node_res.offline:
1329         continue
1330       msg = node_res.RemoteFailMsg()
1331       if msg:
1332         logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1333         res_nodes[node] = msg
1334         continue
1335
1336       lvs = node_res.payload
1337       for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1338         inst = nv_dict.pop((node, lv_name), None)
1339         if (not lv_online and inst is not None
1340             and inst.name not in res_instances):
1341           res_instances.append(inst.name)
1342
1343     # any leftover items in nv_dict are missing LVs, let's arrange the
1344     # data better
1345     for key, inst in nv_dict.iteritems():
1346       if inst.name not in res_missing:
1347         res_missing[inst.name] = []
1348       res_missing[inst.name].append(key)
1349
1350     return result
1351
1352
1353 class LURenameCluster(LogicalUnit):
1354   """Rename the cluster.
1355
1356   """
1357   HPATH = "cluster-rename"
1358   HTYPE = constants.HTYPE_CLUSTER
1359   _OP_REQP = ["name"]
1360
1361   def BuildHooksEnv(self):
1362     """Build hooks env.
1363
1364     """
1365     env = {
1366       "OP_TARGET": self.cfg.GetClusterName(),
1367       "NEW_NAME": self.op.name,
1368       }
1369     mn = self.cfg.GetMasterNode()
1370     return env, [mn], [mn]
1371
1372   def CheckPrereq(self):
1373     """Verify that the passed name is a valid one.
1374
1375     """
1376     hostname = utils.HostInfo(self.op.name)
1377
1378     new_name = hostname.name
1379     self.ip = new_ip = hostname.ip
1380     old_name = self.cfg.GetClusterName()
1381     old_ip = self.cfg.GetMasterIP()
1382     if new_name == old_name and new_ip == old_ip:
1383       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1384                                  " cluster has changed")
1385     if new_ip != old_ip:
1386       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1387         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1388                                    " reachable on the network. Aborting." %
1389                                    new_ip)
1390
1391     self.op.name = new_name
1392
1393   def Exec(self, feedback_fn):
1394     """Rename the cluster.
1395
1396     """
1397     clustername = self.op.name
1398     ip = self.ip
1399
1400     # shutdown the master IP
1401     master = self.cfg.GetMasterNode()
1402     result = self.rpc.call_node_stop_master(master, False)
1403     msg = result.RemoteFailMsg()
1404     if msg:
1405       raise errors.OpExecError("Could not disable the master role: %s" % msg)
1406
1407     try:
1408       cluster = self.cfg.GetClusterInfo()
1409       cluster.cluster_name = clustername
1410       cluster.master_ip = ip
1411       self.cfg.Update(cluster)
1412
1413       # update the known hosts file
1414       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1415       node_list = self.cfg.GetNodeList()
1416       try:
1417         node_list.remove(master)
1418       except ValueError:
1419         pass
1420       result = self.rpc.call_upload_file(node_list,
1421                                          constants.SSH_KNOWN_HOSTS_FILE)
1422       for to_node, to_result in result.iteritems():
1423          msg = to_result.RemoteFailMsg()
1424          if msg:
1425            msg = ("Copy of file %s to node %s failed: %s" %
1426                    (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1427            self.proc.LogWarning(msg)
1428
1429     finally:
1430       result = self.rpc.call_node_start_master(master, False)
1431       msg = result.RemoteFailMsg()
1432       if msg:
1433         self.LogWarning("Could not re-enable the master role on"
1434                         " the master, please restart manually: %s", msg)
1435
1436
1437 def _RecursiveCheckIfLVMBased(disk):
1438   """Check if the given disk or its children are lvm-based.
1439
1440   @type disk: L{objects.Disk}
1441   @param disk: the disk to check
1442   @rtype: booleean
1443   @return: boolean indicating whether a LD_LV dev_type was found or not
1444
1445   """
1446   if disk.children:
1447     for chdisk in disk.children:
1448       if _RecursiveCheckIfLVMBased(chdisk):
1449         return True
1450   return disk.dev_type == constants.LD_LV
1451
1452
1453 class LUSetClusterParams(LogicalUnit):
1454   """Change the parameters of the cluster.
1455
1456   """
1457   HPATH = "cluster-modify"
1458   HTYPE = constants.HTYPE_CLUSTER
1459   _OP_REQP = []
1460   REQ_BGL = False
1461
1462   def CheckArguments(self):
1463     """Check parameters
1464
1465     """
1466     if not hasattr(self.op, "candidate_pool_size"):
1467       self.op.candidate_pool_size = None
1468     if self.op.candidate_pool_size is not None:
1469       try:
1470         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1471       except (ValueError, TypeError), err:
1472         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1473                                    str(err))
1474       if self.op.candidate_pool_size < 1:
1475         raise errors.OpPrereqError("At least one master candidate needed")
1476
1477   def ExpandNames(self):
1478     # FIXME: in the future maybe other cluster params won't require checking on
1479     # all nodes to be modified.
1480     self.needed_locks = {
1481       locking.LEVEL_NODE: locking.ALL_SET,
1482     }
1483     self.share_locks[locking.LEVEL_NODE] = 1
1484
1485   def BuildHooksEnv(self):
1486     """Build hooks env.
1487
1488     """
1489     env = {
1490       "OP_TARGET": self.cfg.GetClusterName(),
1491       "NEW_VG_NAME": self.op.vg_name,
1492       }
1493     mn = self.cfg.GetMasterNode()
1494     return env, [mn], [mn]
1495
1496   def CheckPrereq(self):
1497     """Check prerequisites.
1498
1499     This checks whether the given params don't conflict and
1500     if the given volume group is valid.
1501
1502     """
1503     if self.op.vg_name is not None and not self.op.vg_name:
1504       instances = self.cfg.GetAllInstancesInfo().values()
1505       for inst in instances:
1506         for disk in inst.disks:
1507           if _RecursiveCheckIfLVMBased(disk):
1508             raise errors.OpPrereqError("Cannot disable lvm storage while"
1509                                        " lvm-based instances exist")
1510
1511     node_list = self.acquired_locks[locking.LEVEL_NODE]
1512
1513     # if vg_name not None, checks given volume group on all nodes
1514     if self.op.vg_name:
1515       vglist = self.rpc.call_vg_list(node_list)
1516       for node in node_list:
1517         msg = vglist[node].RemoteFailMsg()
1518         if msg:
1519           # ignoring down node
1520           self.LogWarning("Error while gathering data on node %s"
1521                           " (ignoring node): %s", node, msg)
1522           continue
1523         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1524                                               self.op.vg_name,
1525                                               constants.MIN_VG_SIZE)
1526         if vgstatus:
1527           raise errors.OpPrereqError("Error on node '%s': %s" %
1528                                      (node, vgstatus))
1529
1530     self.cluster = cluster = self.cfg.GetClusterInfo()
1531     # validate params changes
1532     if self.op.beparams:
1533       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1534       self.new_beparams = objects.FillDict(
1535         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1536
1537     if self.op.nicparams:
1538       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1539       self.new_nicparams = objects.FillDict(
1540         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1541       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1542
1543     # hypervisor list/parameters
1544     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1545     if self.op.hvparams:
1546       if not isinstance(self.op.hvparams, dict):
1547         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1548       for hv_name, hv_dict in self.op.hvparams.items():
1549         if hv_name not in self.new_hvparams:
1550           self.new_hvparams[hv_name] = hv_dict
1551         else:
1552           self.new_hvparams[hv_name].update(hv_dict)
1553
1554     if self.op.enabled_hypervisors is not None:
1555       self.hv_list = self.op.enabled_hypervisors
1556     else:
1557       self.hv_list = cluster.enabled_hypervisors
1558
1559     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1560       # either the enabled list has changed, or the parameters have, validate
1561       for hv_name, hv_params in self.new_hvparams.items():
1562         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1563             (self.op.enabled_hypervisors and
1564              hv_name in self.op.enabled_hypervisors)):
1565           # either this is a new hypervisor, or its parameters have changed
1566           hv_class = hypervisor.GetHypervisor(hv_name)
1567           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1568           hv_class.CheckParameterSyntax(hv_params)
1569           _CheckHVParams(self, node_list, hv_name, hv_params)
1570
1571   def Exec(self, feedback_fn):
1572     """Change the parameters of the cluster.
1573
1574     """
1575     if self.op.vg_name is not None:
1576       new_volume = self.op.vg_name
1577       if not new_volume:
1578         new_volume = None
1579       if new_volume != self.cfg.GetVGName():
1580         self.cfg.SetVGName(new_volume)
1581       else:
1582         feedback_fn("Cluster LVM configuration already in desired"
1583                     " state, not changing")
1584     if self.op.hvparams:
1585       self.cluster.hvparams = self.new_hvparams
1586     if self.op.enabled_hypervisors is not None:
1587       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1588     if self.op.beparams:
1589       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1590     if self.op.nicparams:
1591       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1592
1593     if self.op.candidate_pool_size is not None:
1594       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1595
1596     self.cfg.Update(self.cluster)
1597
1598     # we want to update nodes after the cluster so that if any errors
1599     # happen, we have recorded and saved the cluster info
1600     if self.op.candidate_pool_size is not None:
1601       _AdjustCandidatePool(self)
1602
1603
1604 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1605   """Distribute additional files which are part of the cluster configuration.
1606
1607   ConfigWriter takes care of distributing the config and ssconf files, but
1608   there are more files which should be distributed to all nodes. This function
1609   makes sure those are copied.
1610
1611   @param lu: calling logical unit
1612   @param additional_nodes: list of nodes not in the config to distribute to
1613
1614   """
1615   # 1. Gather target nodes
1616   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1617   dist_nodes = lu.cfg.GetNodeList()
1618   if additional_nodes is not None:
1619     dist_nodes.extend(additional_nodes)
1620   if myself.name in dist_nodes:
1621     dist_nodes.remove(myself.name)
1622   # 2. Gather files to distribute
1623   dist_files = set([constants.ETC_HOSTS,
1624                     constants.SSH_KNOWN_HOSTS_FILE,
1625                     constants.RAPI_CERT_FILE,
1626                     constants.RAPI_USERS_FILE,
1627                    ])
1628
1629   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1630   for hv_name in enabled_hypervisors:
1631     hv_class = hypervisor.GetHypervisor(hv_name)
1632     dist_files.update(hv_class.GetAncillaryFiles())
1633
1634   # 3. Perform the files upload
1635   for fname in dist_files:
1636     if os.path.exists(fname):
1637       result = lu.rpc.call_upload_file(dist_nodes, fname)
1638       for to_node, to_result in result.items():
1639          msg = to_result.RemoteFailMsg()
1640          if msg:
1641            msg = ("Copy of file %s to node %s failed: %s" %
1642                    (fname, to_node, msg))
1643            lu.proc.LogWarning(msg)
1644
1645
1646 class LURedistributeConfig(NoHooksLU):
1647   """Force the redistribution of cluster configuration.
1648
1649   This is a very simple LU.
1650
1651   """
1652   _OP_REQP = []
1653   REQ_BGL = False
1654
1655   def ExpandNames(self):
1656     self.needed_locks = {
1657       locking.LEVEL_NODE: locking.ALL_SET,
1658     }
1659     self.share_locks[locking.LEVEL_NODE] = 1
1660
1661   def CheckPrereq(self):
1662     """Check prerequisites.
1663
1664     """
1665
1666   def Exec(self, feedback_fn):
1667     """Redistribute the configuration.
1668
1669     """
1670     self.cfg.Update(self.cfg.GetClusterInfo())
1671     _RedistributeAncillaryFiles(self)
1672
1673
1674 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1675   """Sleep and poll for an instance's disk to sync.
1676
1677   """
1678   if not instance.disks:
1679     return True
1680
1681   if not oneshot:
1682     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1683
1684   node = instance.primary_node
1685
1686   for dev in instance.disks:
1687     lu.cfg.SetDiskID(dev, node)
1688
1689   retries = 0
1690   while True:
1691     max_time = 0
1692     done = True
1693     cumul_degraded = False
1694     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1695     msg = rstats.RemoteFailMsg()
1696     if msg:
1697       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1698       retries += 1
1699       if retries >= 10:
1700         raise errors.RemoteError("Can't contact node %s for mirror data,"
1701                                  " aborting." % node)
1702       time.sleep(6)
1703       continue
1704     rstats = rstats.payload
1705     retries = 0
1706     for i, mstat in enumerate(rstats):
1707       if mstat is None:
1708         lu.LogWarning("Can't compute data for node %s/%s",
1709                            node, instance.disks[i].iv_name)
1710         continue
1711       # we ignore the ldisk parameter
1712       perc_done, est_time, is_degraded, _ = mstat
1713       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1714       if perc_done is not None:
1715         done = False
1716         if est_time is not None:
1717           rem_time = "%d estimated seconds remaining" % est_time
1718           max_time = est_time
1719         else:
1720           rem_time = "no time estimate"
1721         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1722                         (instance.disks[i].iv_name, perc_done, rem_time))
1723     if done or oneshot:
1724       break
1725
1726     time.sleep(min(60, max_time))
1727
1728   if done:
1729     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1730   return not cumul_degraded
1731
1732
1733 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1734   """Check that mirrors are not degraded.
1735
1736   The ldisk parameter, if True, will change the test from the
1737   is_degraded attribute (which represents overall non-ok status for
1738   the device(s)) to the ldisk (representing the local storage status).
1739
1740   """
1741   lu.cfg.SetDiskID(dev, node)
1742   if ldisk:
1743     idx = 6
1744   else:
1745     idx = 5
1746
1747   result = True
1748   if on_primary or dev.AssembleOnSecondary():
1749     rstats = lu.rpc.call_blockdev_find(node, dev)
1750     msg = rstats.RemoteFailMsg()
1751     if msg:
1752       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1753       result = False
1754     elif not rstats.payload:
1755       lu.LogWarning("Can't find disk on node %s", node)
1756       result = False
1757     else:
1758       result = result and (not rstats.payload[idx])
1759   if dev.children:
1760     for child in dev.children:
1761       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1762
1763   return result
1764
1765
1766 class LUDiagnoseOS(NoHooksLU):
1767   """Logical unit for OS diagnose/query.
1768
1769   """
1770   _OP_REQP = ["output_fields", "names"]
1771   REQ_BGL = False
1772   _FIELDS_STATIC = utils.FieldSet()
1773   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1774
1775   def ExpandNames(self):
1776     if self.op.names:
1777       raise errors.OpPrereqError("Selective OS query not supported")
1778
1779     _CheckOutputFields(static=self._FIELDS_STATIC,
1780                        dynamic=self._FIELDS_DYNAMIC,
1781                        selected=self.op.output_fields)
1782
1783     # Lock all nodes, in shared mode
1784     # Temporary removal of locks, should be reverted later
1785     # TODO: reintroduce locks when they are lighter-weight
1786     self.needed_locks = {}
1787     #self.share_locks[locking.LEVEL_NODE] = 1
1788     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1789
1790   def CheckPrereq(self):
1791     """Check prerequisites.
1792
1793     """
1794
1795   @staticmethod
1796   def _DiagnoseByOS(node_list, rlist):
1797     """Remaps a per-node return list into an a per-os per-node dictionary
1798
1799     @param node_list: a list with the names of all nodes
1800     @param rlist: a map with node names as keys and OS objects as values
1801
1802     @rtype: dict
1803     @return: a dictionary with osnames as keys and as value another map, with
1804         nodes as keys and list of OS objects as values, eg::
1805
1806           {"debian-etch": {"node1": [<object>,...],
1807                            "node2": [<object>,]}
1808           }
1809
1810     """
1811     all_os = {}
1812     # we build here the list of nodes that didn't fail the RPC (at RPC
1813     # level), so that nodes with a non-responding node daemon don't
1814     # make all OSes invalid
1815     good_nodes = [node_name for node_name in rlist
1816                   if not rlist[node_name].RemoteFailMsg()]
1817     for node_name, nr in rlist.items():
1818       if nr.RemoteFailMsg() or not nr.payload:
1819         continue
1820       for os_serialized in nr.payload:
1821         os_obj = objects.OS.FromDict(os_serialized)
1822         if os_obj.name not in all_os:
1823           # build a list of nodes for this os containing empty lists
1824           # for each node in node_list
1825           all_os[os_obj.name] = {}
1826           for nname in good_nodes:
1827             all_os[os_obj.name][nname] = []
1828         all_os[os_obj.name][node_name].append(os_obj)
1829     return all_os
1830
1831   def Exec(self, feedback_fn):
1832     """Compute the list of OSes.
1833
1834     """
1835     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1836     node_data = self.rpc.call_os_diagnose(valid_nodes)
1837     pol = self._DiagnoseByOS(valid_nodes, node_data)
1838     output = []
1839     for os_name, os_data in pol.items():
1840       row = []
1841       for field in self.op.output_fields:
1842         if field == "name":
1843           val = os_name
1844         elif field == "valid":
1845           val = utils.all([osl and osl[0] for osl in os_data.values()])
1846         elif field == "node_status":
1847           val = {}
1848           for node_name, nos_list in os_data.iteritems():
1849             val[node_name] = [(v.status, v.path) for v in nos_list]
1850         else:
1851           raise errors.ParameterError(field)
1852         row.append(val)
1853       output.append(row)
1854
1855     return output
1856
1857
1858 class LURemoveNode(LogicalUnit):
1859   """Logical unit for removing a node.
1860
1861   """
1862   HPATH = "node-remove"
1863   HTYPE = constants.HTYPE_NODE
1864   _OP_REQP = ["node_name"]
1865
1866   def BuildHooksEnv(self):
1867     """Build hooks env.
1868
1869     This doesn't run on the target node in the pre phase as a failed
1870     node would then be impossible to remove.
1871
1872     """
1873     env = {
1874       "OP_TARGET": self.op.node_name,
1875       "NODE_NAME": self.op.node_name,
1876       }
1877     all_nodes = self.cfg.GetNodeList()
1878     all_nodes.remove(self.op.node_name)
1879     return env, all_nodes, all_nodes
1880
1881   def CheckPrereq(self):
1882     """Check prerequisites.
1883
1884     This checks:
1885      - the node exists in the configuration
1886      - it does not have primary or secondary instances
1887      - it's not the master
1888
1889     Any errors are signalled by raising errors.OpPrereqError.
1890
1891     """
1892     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1893     if node is None:
1894       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1895
1896     instance_list = self.cfg.GetInstanceList()
1897
1898     masternode = self.cfg.GetMasterNode()
1899     if node.name == masternode:
1900       raise errors.OpPrereqError("Node is the master node,"
1901                                  " you need to failover first.")
1902
1903     for instance_name in instance_list:
1904       instance = self.cfg.GetInstanceInfo(instance_name)
1905       if node.name in instance.all_nodes:
1906         raise errors.OpPrereqError("Instance %s is still running on the node,"
1907                                    " please remove first." % instance_name)
1908     self.op.node_name = node.name
1909     self.node = node
1910
1911   def Exec(self, feedback_fn):
1912     """Removes the node from the cluster.
1913
1914     """
1915     node = self.node
1916     logging.info("Stopping the node daemon and removing configs from node %s",
1917                  node.name)
1918
1919     self.context.RemoveNode(node.name)
1920
1921     result = self.rpc.call_node_leave_cluster(node.name)
1922     msg = result.RemoteFailMsg()
1923     if msg:
1924       self.LogWarning("Errors encountered on the remote node while leaving"
1925                       " the cluster: %s", msg)
1926
1927     # Promote nodes to master candidate as needed
1928     _AdjustCandidatePool(self)
1929
1930
1931 class LUQueryNodes(NoHooksLU):
1932   """Logical unit for querying nodes.
1933
1934   """
1935   _OP_REQP = ["output_fields", "names", "use_locking"]
1936   REQ_BGL = False
1937   _FIELDS_DYNAMIC = utils.FieldSet(
1938     "dtotal", "dfree",
1939     "mtotal", "mnode", "mfree",
1940     "bootid",
1941     "ctotal", "cnodes", "csockets",
1942     )
1943
1944   _FIELDS_STATIC = utils.FieldSet(
1945     "name", "pinst_cnt", "sinst_cnt",
1946     "pinst_list", "sinst_list",
1947     "pip", "sip", "tags",
1948     "serial_no",
1949     "master_candidate",
1950     "master",
1951     "offline",
1952     "drained",
1953     )
1954
1955   def ExpandNames(self):
1956     _CheckOutputFields(static=self._FIELDS_STATIC,
1957                        dynamic=self._FIELDS_DYNAMIC,
1958                        selected=self.op.output_fields)
1959
1960     self.needed_locks = {}
1961     self.share_locks[locking.LEVEL_NODE] = 1
1962
1963     if self.op.names:
1964       self.wanted = _GetWantedNodes(self, self.op.names)
1965     else:
1966       self.wanted = locking.ALL_SET
1967
1968     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1969     self.do_locking = self.do_node_query and self.op.use_locking
1970     if self.do_locking:
1971       # if we don't request only static fields, we need to lock the nodes
1972       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1973
1974
1975   def CheckPrereq(self):
1976     """Check prerequisites.
1977
1978     """
1979     # The validation of the node list is done in the _GetWantedNodes,
1980     # if non empty, and if empty, there's no validation to do
1981     pass
1982
1983   def Exec(self, feedback_fn):
1984     """Computes the list of nodes and their attributes.
1985
1986     """
1987     all_info = self.cfg.GetAllNodesInfo()
1988     if self.do_locking:
1989       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1990     elif self.wanted != locking.ALL_SET:
1991       nodenames = self.wanted
1992       missing = set(nodenames).difference(all_info.keys())
1993       if missing:
1994         raise errors.OpExecError(
1995           "Some nodes were removed before retrieving their data: %s" % missing)
1996     else:
1997       nodenames = all_info.keys()
1998
1999     nodenames = utils.NiceSort(nodenames)
2000     nodelist = [all_info[name] for name in nodenames]
2001
2002     # begin data gathering
2003
2004     if self.do_node_query:
2005       live_data = {}
2006       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2007                                           self.cfg.GetHypervisorType())
2008       for name in nodenames:
2009         nodeinfo = node_data[name]
2010         if not nodeinfo.RemoteFailMsg() and nodeinfo.payload:
2011           nodeinfo = nodeinfo.payload
2012           fn = utils.TryConvert
2013           live_data[name] = {
2014             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2015             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2016             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2017             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2018             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2019             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2020             "bootid": nodeinfo.get('bootid', None),
2021             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2022             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2023             }
2024         else:
2025           live_data[name] = {}
2026     else:
2027       live_data = dict.fromkeys(nodenames, {})
2028
2029     node_to_primary = dict([(name, set()) for name in nodenames])
2030     node_to_secondary = dict([(name, set()) for name in nodenames])
2031
2032     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2033                              "sinst_cnt", "sinst_list"))
2034     if inst_fields & frozenset(self.op.output_fields):
2035       instancelist = self.cfg.GetInstanceList()
2036
2037       for instance_name in instancelist:
2038         inst = self.cfg.GetInstanceInfo(instance_name)
2039         if inst.primary_node in node_to_primary:
2040           node_to_primary[inst.primary_node].add(inst.name)
2041         for secnode in inst.secondary_nodes:
2042           if secnode in node_to_secondary:
2043             node_to_secondary[secnode].add(inst.name)
2044
2045     master_node = self.cfg.GetMasterNode()
2046
2047     # end data gathering
2048
2049     output = []
2050     for node in nodelist:
2051       node_output = []
2052       for field in self.op.output_fields:
2053         if field == "name":
2054           val = node.name
2055         elif field == "pinst_list":
2056           val = list(node_to_primary[node.name])
2057         elif field == "sinst_list":
2058           val = list(node_to_secondary[node.name])
2059         elif field == "pinst_cnt":
2060           val = len(node_to_primary[node.name])
2061         elif field == "sinst_cnt":
2062           val = len(node_to_secondary[node.name])
2063         elif field == "pip":
2064           val = node.primary_ip
2065         elif field == "sip":
2066           val = node.secondary_ip
2067         elif field == "tags":
2068           val = list(node.GetTags())
2069         elif field == "serial_no":
2070           val = node.serial_no
2071         elif field == "master_candidate":
2072           val = node.master_candidate
2073         elif field == "master":
2074           val = node.name == master_node
2075         elif field == "offline":
2076           val = node.offline
2077         elif field == "drained":
2078           val = node.drained
2079         elif self._FIELDS_DYNAMIC.Matches(field):
2080           val = live_data[node.name].get(field, None)
2081         else:
2082           raise errors.ParameterError(field)
2083         node_output.append(val)
2084       output.append(node_output)
2085
2086     return output
2087
2088
2089 class LUQueryNodeVolumes(NoHooksLU):
2090   """Logical unit for getting volumes on node(s).
2091
2092   """
2093   _OP_REQP = ["nodes", "output_fields"]
2094   REQ_BGL = False
2095   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2096   _FIELDS_STATIC = utils.FieldSet("node")
2097
2098   def ExpandNames(self):
2099     _CheckOutputFields(static=self._FIELDS_STATIC,
2100                        dynamic=self._FIELDS_DYNAMIC,
2101                        selected=self.op.output_fields)
2102
2103     self.needed_locks = {}
2104     self.share_locks[locking.LEVEL_NODE] = 1
2105     if not self.op.nodes:
2106       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2107     else:
2108       self.needed_locks[locking.LEVEL_NODE] = \
2109         _GetWantedNodes(self, self.op.nodes)
2110
2111   def CheckPrereq(self):
2112     """Check prerequisites.
2113
2114     This checks that the fields required are valid output fields.
2115
2116     """
2117     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2118
2119   def Exec(self, feedback_fn):
2120     """Computes the list of nodes and their attributes.
2121
2122     """
2123     nodenames = self.nodes
2124     volumes = self.rpc.call_node_volumes(nodenames)
2125
2126     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2127              in self.cfg.GetInstanceList()]
2128
2129     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2130
2131     output = []
2132     for node in nodenames:
2133       nresult = volumes[node]
2134       if nresult.offline:
2135         continue
2136       msg = nresult.RemoteFailMsg()
2137       if msg:
2138         self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2139         continue
2140
2141       node_vols = nresult.payload[:]
2142       node_vols.sort(key=lambda vol: vol['dev'])
2143
2144       for vol in node_vols:
2145         node_output = []
2146         for field in self.op.output_fields:
2147           if field == "node":
2148             val = node
2149           elif field == "phys":
2150             val = vol['dev']
2151           elif field == "vg":
2152             val = vol['vg']
2153           elif field == "name":
2154             val = vol['name']
2155           elif field == "size":
2156             val = int(float(vol['size']))
2157           elif field == "instance":
2158             for inst in ilist:
2159               if node not in lv_by_node[inst]:
2160                 continue
2161               if vol['name'] in lv_by_node[inst][node]:
2162                 val = inst.name
2163                 break
2164             else:
2165               val = '-'
2166           else:
2167             raise errors.ParameterError(field)
2168           node_output.append(str(val))
2169
2170         output.append(node_output)
2171
2172     return output
2173
2174
2175 class LUAddNode(LogicalUnit):
2176   """Logical unit for adding node to the cluster.
2177
2178   """
2179   HPATH = "node-add"
2180   HTYPE = constants.HTYPE_NODE
2181   _OP_REQP = ["node_name"]
2182
2183   def BuildHooksEnv(self):
2184     """Build hooks env.
2185
2186     This will run on all nodes before, and on all nodes + the new node after.
2187
2188     """
2189     env = {
2190       "OP_TARGET": self.op.node_name,
2191       "NODE_NAME": self.op.node_name,
2192       "NODE_PIP": self.op.primary_ip,
2193       "NODE_SIP": self.op.secondary_ip,
2194       }
2195     nodes_0 = self.cfg.GetNodeList()
2196     nodes_1 = nodes_0 + [self.op.node_name, ]
2197     return env, nodes_0, nodes_1
2198
2199   def CheckPrereq(self):
2200     """Check prerequisites.
2201
2202     This checks:
2203      - the new node is not already in the config
2204      - it is resolvable
2205      - its parameters (single/dual homed) matches the cluster
2206
2207     Any errors are signalled by raising errors.OpPrereqError.
2208
2209     """
2210     node_name = self.op.node_name
2211     cfg = self.cfg
2212
2213     dns_data = utils.HostInfo(node_name)
2214
2215     node = dns_data.name
2216     primary_ip = self.op.primary_ip = dns_data.ip
2217     secondary_ip = getattr(self.op, "secondary_ip", None)
2218     if secondary_ip is None:
2219       secondary_ip = primary_ip
2220     if not utils.IsValidIP(secondary_ip):
2221       raise errors.OpPrereqError("Invalid secondary IP given")
2222     self.op.secondary_ip = secondary_ip
2223
2224     node_list = cfg.GetNodeList()
2225     if not self.op.readd and node in node_list:
2226       raise errors.OpPrereqError("Node %s is already in the configuration" %
2227                                  node)
2228     elif self.op.readd and node not in node_list:
2229       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2230
2231     for existing_node_name in node_list:
2232       existing_node = cfg.GetNodeInfo(existing_node_name)
2233
2234       if self.op.readd and node == existing_node_name:
2235         if (existing_node.primary_ip != primary_ip or
2236             existing_node.secondary_ip != secondary_ip):
2237           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2238                                      " address configuration as before")
2239         continue
2240
2241       if (existing_node.primary_ip == primary_ip or
2242           existing_node.secondary_ip == primary_ip or
2243           existing_node.primary_ip == secondary_ip or
2244           existing_node.secondary_ip == secondary_ip):
2245         raise errors.OpPrereqError("New node ip address(es) conflict with"
2246                                    " existing node %s" % existing_node.name)
2247
2248     # check that the type of the node (single versus dual homed) is the
2249     # same as for the master
2250     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2251     master_singlehomed = myself.secondary_ip == myself.primary_ip
2252     newbie_singlehomed = secondary_ip == primary_ip
2253     if master_singlehomed != newbie_singlehomed:
2254       if master_singlehomed:
2255         raise errors.OpPrereqError("The master has no private ip but the"
2256                                    " new node has one")
2257       else:
2258         raise errors.OpPrereqError("The master has a private ip but the"
2259                                    " new node doesn't have one")
2260
2261     # checks reachablity
2262     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2263       raise errors.OpPrereqError("Node not reachable by ping")
2264
2265     if not newbie_singlehomed:
2266       # check reachability from my secondary ip to newbie's secondary ip
2267       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2268                            source=myself.secondary_ip):
2269         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2270                                    " based ping to noded port")
2271
2272     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2273     mc_now, _ = self.cfg.GetMasterCandidateStats()
2274     master_candidate = mc_now < cp_size
2275
2276     self.new_node = objects.Node(name=node,
2277                                  primary_ip=primary_ip,
2278                                  secondary_ip=secondary_ip,
2279                                  master_candidate=master_candidate,
2280                                  offline=False, drained=False)
2281
2282   def Exec(self, feedback_fn):
2283     """Adds the new node to the cluster.
2284
2285     """
2286     new_node = self.new_node
2287     node = new_node.name
2288
2289     # check connectivity
2290     result = self.rpc.call_version([node])[node]
2291     msg = result.RemoteFailMsg()
2292     if msg:
2293       raise errors.OpExecError("Can't get version information from"
2294                                " node %s: %s" % (node, msg))
2295     if constants.PROTOCOL_VERSION == result.payload:
2296       logging.info("Communication to node %s fine, sw version %s match",
2297                    node, result.payload)
2298     else:
2299       raise errors.OpExecError("Version mismatch master version %s,"
2300                                " node version %s" %
2301                                (constants.PROTOCOL_VERSION, result.payload))
2302
2303     # setup ssh on node
2304     logging.info("Copy ssh key to node %s", node)
2305     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2306     keyarray = []
2307     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2308                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2309                 priv_key, pub_key]
2310
2311     for i in keyfiles:
2312       f = open(i, 'r')
2313       try:
2314         keyarray.append(f.read())
2315       finally:
2316         f.close()
2317
2318     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2319                                     keyarray[2],
2320                                     keyarray[3], keyarray[4], keyarray[5])
2321
2322     msg = result.RemoteFailMsg()
2323     if msg:
2324       raise errors.OpExecError("Cannot transfer ssh keys to the"
2325                                " new node: %s" % msg)
2326
2327     # Add node to our /etc/hosts, and add key to known_hosts
2328     if self.cfg.GetClusterInfo().modify_etc_hosts:
2329       utils.AddHostToEtcHosts(new_node.name)
2330
2331     if new_node.secondary_ip != new_node.primary_ip:
2332       result = self.rpc.call_node_has_ip_address(new_node.name,
2333                                                  new_node.secondary_ip)
2334       msg = result.RemoteFailMsg()
2335       if msg:
2336         raise errors.OpPrereqError("Failure checking secondary ip"
2337                                    " on node %s: %s" % (new_node.name, msg))
2338       if not result.payload:
2339         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2340                                  " you gave (%s). Please fix and re-run this"
2341                                  " command." % new_node.secondary_ip)
2342
2343     node_verify_list = [self.cfg.GetMasterNode()]
2344     node_verify_param = {
2345       'nodelist': [node],
2346       # TODO: do a node-net-test as well?
2347     }
2348
2349     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2350                                        self.cfg.GetClusterName())
2351     for verifier in node_verify_list:
2352       msg = result[verifier].RemoteFailMsg()
2353       if msg:
2354         raise errors.OpExecError("Cannot communicate with node %s: %s" %
2355                                  (verifier, msg))
2356       nl_payload = result[verifier].payload['nodelist']
2357       if nl_payload:
2358         for failed in nl_payload:
2359           feedback_fn("ssh/hostname verification failed %s -> %s" %
2360                       (verifier, nl_payload[failed]))
2361         raise errors.OpExecError("ssh/hostname verification failed.")
2362
2363     if self.op.readd:
2364       _RedistributeAncillaryFiles(self)
2365       self.context.ReaddNode(new_node)
2366     else:
2367       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2368       self.context.AddNode(new_node)
2369
2370
2371 class LUSetNodeParams(LogicalUnit):
2372   """Modifies the parameters of a node.
2373
2374   """
2375   HPATH = "node-modify"
2376   HTYPE = constants.HTYPE_NODE
2377   _OP_REQP = ["node_name"]
2378   REQ_BGL = False
2379
2380   def CheckArguments(self):
2381     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2382     if node_name is None:
2383       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2384     self.op.node_name = node_name
2385     _CheckBooleanOpField(self.op, 'master_candidate')
2386     _CheckBooleanOpField(self.op, 'offline')
2387     _CheckBooleanOpField(self.op, 'drained')
2388     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2389     if all_mods.count(None) == 3:
2390       raise errors.OpPrereqError("Please pass at least one modification")
2391     if all_mods.count(True) > 1:
2392       raise errors.OpPrereqError("Can't set the node into more than one"
2393                                  " state at the same time")
2394
2395   def ExpandNames(self):
2396     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2397
2398   def BuildHooksEnv(self):
2399     """Build hooks env.
2400
2401     This runs on the master node.
2402
2403     """
2404     env = {
2405       "OP_TARGET": self.op.node_name,
2406       "MASTER_CANDIDATE": str(self.op.master_candidate),
2407       "OFFLINE": str(self.op.offline),
2408       "DRAINED": str(self.op.drained),
2409       }
2410     nl = [self.cfg.GetMasterNode(),
2411           self.op.node_name]
2412     return env, nl, nl
2413
2414   def CheckPrereq(self):
2415     """Check prerequisites.
2416
2417     This only checks the instance list against the existing names.
2418
2419     """
2420     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2421
2422     if ((self.op.master_candidate == False or self.op.offline == True or
2423          self.op.drained == True) and node.master_candidate):
2424       # we will demote the node from master_candidate
2425       if self.op.node_name == self.cfg.GetMasterNode():
2426         raise errors.OpPrereqError("The master node has to be a"
2427                                    " master candidate, online and not drained")
2428       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2429       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2430       if num_candidates <= cp_size:
2431         msg = ("Not enough master candidates (desired"
2432                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2433         if self.op.force:
2434           self.LogWarning(msg)
2435         else:
2436           raise errors.OpPrereqError(msg)
2437
2438     if (self.op.master_candidate == True and
2439         ((node.offline and not self.op.offline == False) or
2440          (node.drained and not self.op.drained == False))):
2441       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2442                                  " to master_candidate" % node.name)
2443
2444     return
2445
2446   def Exec(self, feedback_fn):
2447     """Modifies a node.
2448
2449     """
2450     node = self.node
2451
2452     result = []
2453     changed_mc = False
2454
2455     if self.op.offline is not None:
2456       node.offline = self.op.offline
2457       result.append(("offline", str(self.op.offline)))
2458       if self.op.offline == True:
2459         if node.master_candidate:
2460           node.master_candidate = False
2461           changed_mc = True
2462           result.append(("master_candidate", "auto-demotion due to offline"))
2463         if node.drained:
2464           node.drained = False
2465           result.append(("drained", "clear drained status due to offline"))
2466
2467     if self.op.master_candidate is not None:
2468       node.master_candidate = self.op.master_candidate
2469       changed_mc = True
2470       result.append(("master_candidate", str(self.op.master_candidate)))
2471       if self.op.master_candidate == False:
2472         rrc = self.rpc.call_node_demote_from_mc(node.name)
2473         msg = rrc.RemoteFailMsg()
2474         if msg:
2475           self.LogWarning("Node failed to demote itself: %s" % msg)
2476
2477     if self.op.drained is not None:
2478       node.drained = self.op.drained
2479       result.append(("drained", str(self.op.drained)))
2480       if self.op.drained == True:
2481         if node.master_candidate:
2482           node.master_candidate = False
2483           changed_mc = True
2484           result.append(("master_candidate", "auto-demotion due to drain"))
2485         if node.offline:
2486           node.offline = False
2487           result.append(("offline", "clear offline status due to drain"))
2488
2489     # this will trigger configuration file update, if needed
2490     self.cfg.Update(node)
2491     # this will trigger job queue propagation or cleanup
2492     if changed_mc:
2493       self.context.ReaddNode(node)
2494
2495     return result
2496
2497
2498 class LUPowercycleNode(NoHooksLU):
2499   """Powercycles a node.
2500
2501   """
2502   _OP_REQP = ["node_name", "force"]
2503   REQ_BGL = False
2504
2505   def CheckArguments(self):
2506     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2507     if node_name is None:
2508       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2509     self.op.node_name = node_name
2510     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2511       raise errors.OpPrereqError("The node is the master and the force"
2512                                  " parameter was not set")
2513
2514   def ExpandNames(self):
2515     """Locking for PowercycleNode.
2516
2517     This is a last-resource option and shouldn't block on other
2518     jobs. Therefore, we grab no locks.
2519
2520     """
2521     self.needed_locks = {}
2522
2523   def CheckPrereq(self):
2524     """Check prerequisites.
2525
2526     This LU has no prereqs.
2527
2528     """
2529     pass
2530
2531   def Exec(self, feedback_fn):
2532     """Reboots a node.
2533
2534     """
2535     result = self.rpc.call_node_powercycle(self.op.node_name,
2536                                            self.cfg.GetHypervisorType())
2537     msg = result.RemoteFailMsg()
2538     if msg:
2539       raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2540     return result.payload
2541
2542
2543 class LUQueryClusterInfo(NoHooksLU):
2544   """Query cluster configuration.
2545
2546   """
2547   _OP_REQP = []
2548   REQ_BGL = False
2549
2550   def ExpandNames(self):
2551     self.needed_locks = {}
2552
2553   def CheckPrereq(self):
2554     """No prerequsites needed for this LU.
2555
2556     """
2557     pass
2558
2559   def Exec(self, feedback_fn):
2560     """Return cluster config.
2561
2562     """
2563     cluster = self.cfg.GetClusterInfo()
2564     result = {
2565       "software_version": constants.RELEASE_VERSION,
2566       "protocol_version": constants.PROTOCOL_VERSION,
2567       "config_version": constants.CONFIG_VERSION,
2568       "os_api_version": constants.OS_API_VERSION,
2569       "export_version": constants.EXPORT_VERSION,
2570       "architecture": (platform.architecture()[0], platform.machine()),
2571       "name": cluster.cluster_name,
2572       "master": cluster.master_node,
2573       "default_hypervisor": cluster.default_hypervisor,
2574       "enabled_hypervisors": cluster.enabled_hypervisors,
2575       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2576                         for hypervisor in cluster.enabled_hypervisors]),
2577       "beparams": cluster.beparams,
2578       "nicparams": cluster.nicparams,
2579       "candidate_pool_size": cluster.candidate_pool_size,
2580       "master_netdev": cluster.master_netdev,
2581       "volume_group_name": cluster.volume_group_name,
2582       "file_storage_dir": cluster.file_storage_dir,
2583       }
2584
2585     return result
2586
2587
2588 class LUQueryConfigValues(NoHooksLU):
2589   """Return configuration values.
2590
2591   """
2592   _OP_REQP = []
2593   REQ_BGL = False
2594   _FIELDS_DYNAMIC = utils.FieldSet()
2595   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2596
2597   def ExpandNames(self):
2598     self.needed_locks = {}
2599
2600     _CheckOutputFields(static=self._FIELDS_STATIC,
2601                        dynamic=self._FIELDS_DYNAMIC,
2602                        selected=self.op.output_fields)
2603
2604   def CheckPrereq(self):
2605     """No prerequisites.
2606
2607     """
2608     pass
2609
2610   def Exec(self, feedback_fn):
2611     """Dump a representation of the cluster config to the standard output.
2612
2613     """
2614     values = []
2615     for field in self.op.output_fields:
2616       if field == "cluster_name":
2617         entry = self.cfg.GetClusterName()
2618       elif field == "master_node":
2619         entry = self.cfg.GetMasterNode()
2620       elif field == "drain_flag":
2621         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2622       else:
2623         raise errors.ParameterError(field)
2624       values.append(entry)
2625     return values
2626
2627
2628 class LUActivateInstanceDisks(NoHooksLU):
2629   """Bring up an instance's disks.
2630
2631   """
2632   _OP_REQP = ["instance_name"]
2633   REQ_BGL = False
2634
2635   def ExpandNames(self):
2636     self._ExpandAndLockInstance()
2637     self.needed_locks[locking.LEVEL_NODE] = []
2638     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2639
2640   def DeclareLocks(self, level):
2641     if level == locking.LEVEL_NODE:
2642       self._LockInstancesNodes()
2643
2644   def CheckPrereq(self):
2645     """Check prerequisites.
2646
2647     This checks that the instance is in the cluster.
2648
2649     """
2650     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2651     assert self.instance is not None, \
2652       "Cannot retrieve locked instance %s" % self.op.instance_name
2653     _CheckNodeOnline(self, self.instance.primary_node)
2654
2655   def Exec(self, feedback_fn):
2656     """Activate the disks.
2657
2658     """
2659     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2660     if not disks_ok:
2661       raise errors.OpExecError("Cannot activate block devices")
2662
2663     return disks_info
2664
2665
2666 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2667   """Prepare the block devices for an instance.
2668
2669   This sets up the block devices on all nodes.
2670
2671   @type lu: L{LogicalUnit}
2672   @param lu: the logical unit on whose behalf we execute
2673   @type instance: L{objects.Instance}
2674   @param instance: the instance for whose disks we assemble
2675   @type ignore_secondaries: boolean
2676   @param ignore_secondaries: if true, errors on secondary nodes
2677       won't result in an error return from the function
2678   @return: False if the operation failed, otherwise a list of
2679       (host, instance_visible_name, node_visible_name)
2680       with the mapping from node devices to instance devices
2681
2682   """
2683   device_info = []
2684   disks_ok = True
2685   iname = instance.name
2686   # With the two passes mechanism we try to reduce the window of
2687   # opportunity for the race condition of switching DRBD to primary
2688   # before handshaking occured, but we do not eliminate it
2689
2690   # The proper fix would be to wait (with some limits) until the
2691   # connection has been made and drbd transitions from WFConnection
2692   # into any other network-connected state (Connected, SyncTarget,
2693   # SyncSource, etc.)
2694
2695   # 1st pass, assemble on all nodes in secondary mode
2696   for inst_disk in instance.disks:
2697     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2698       lu.cfg.SetDiskID(node_disk, node)
2699       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2700       msg = result.RemoteFailMsg()
2701       if msg:
2702         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2703                            " (is_primary=False, pass=1): %s",
2704                            inst_disk.iv_name, node, msg)
2705         if not ignore_secondaries:
2706           disks_ok = False
2707
2708   # FIXME: race condition on drbd migration to primary
2709
2710   # 2nd pass, do only the primary node
2711   for inst_disk in instance.disks:
2712     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2713       if node != instance.primary_node:
2714         continue
2715       lu.cfg.SetDiskID(node_disk, node)
2716       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2717       msg = result.RemoteFailMsg()
2718       if msg:
2719         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2720                            " (is_primary=True, pass=2): %s",
2721                            inst_disk.iv_name, node, msg)
2722         disks_ok = False
2723     device_info.append((instance.primary_node, inst_disk.iv_name,
2724                         result.payload))
2725
2726   # leave the disks configured for the primary node
2727   # this is a workaround that would be fixed better by
2728   # improving the logical/physical id handling
2729   for disk in instance.disks:
2730     lu.cfg.SetDiskID(disk, instance.primary_node)
2731
2732   return disks_ok, device_info
2733
2734
2735 def _StartInstanceDisks(lu, instance, force):
2736   """Start the disks of an instance.
2737
2738   """
2739   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2740                                            ignore_secondaries=force)
2741   if not disks_ok:
2742     _ShutdownInstanceDisks(lu, instance)
2743     if force is not None and not force:
2744       lu.proc.LogWarning("", hint="If the message above refers to a"
2745                          " secondary node,"
2746                          " you can retry the operation using '--force'.")
2747     raise errors.OpExecError("Disk consistency error")
2748
2749
2750 class LUDeactivateInstanceDisks(NoHooksLU):
2751   """Shutdown an instance's disks.
2752
2753   """
2754   _OP_REQP = ["instance_name"]
2755   REQ_BGL = False
2756
2757   def ExpandNames(self):
2758     self._ExpandAndLockInstance()
2759     self.needed_locks[locking.LEVEL_NODE] = []
2760     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2761
2762   def DeclareLocks(self, level):
2763     if level == locking.LEVEL_NODE:
2764       self._LockInstancesNodes()
2765
2766   def CheckPrereq(self):
2767     """Check prerequisites.
2768
2769     This checks that the instance is in the cluster.
2770
2771     """
2772     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2773     assert self.instance is not None, \
2774       "Cannot retrieve locked instance %s" % self.op.instance_name
2775
2776   def Exec(self, feedback_fn):
2777     """Deactivate the disks
2778
2779     """
2780     instance = self.instance
2781     _SafeShutdownInstanceDisks(self, instance)
2782
2783
2784 def _SafeShutdownInstanceDisks(lu, instance):
2785   """Shutdown block devices of an instance.
2786
2787   This function checks if an instance is running, before calling
2788   _ShutdownInstanceDisks.
2789
2790   """
2791   pnode = instance.primary_node
2792   ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])
2793   ins_l = ins_l[pnode]
2794   msg = ins_l.RemoteFailMsg()
2795   if msg:
2796     raise errors.OpExecError("Can't contact node %s: %s" % (pnode, msg))
2797
2798   if instance.name in ins_l.payload:
2799     raise errors.OpExecError("Instance is running, can't shutdown"
2800                              " block devices.")
2801
2802   _ShutdownInstanceDisks(lu, instance)
2803
2804
2805 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2806   """Shutdown block devices of an instance.
2807
2808   This does the shutdown on all nodes of the instance.
2809
2810   If the ignore_primary is false, errors on the primary node are
2811   ignored.
2812
2813   """
2814   all_result = True
2815   for disk in instance.disks:
2816     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2817       lu.cfg.SetDiskID(top_disk, node)
2818       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2819       msg = result.RemoteFailMsg()
2820       if msg:
2821         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2822                       disk.iv_name, node, msg)
2823         if not ignore_primary or node != instance.primary_node:
2824           all_result = False
2825   return all_result
2826
2827
2828 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2829   """Checks if a node has enough free memory.
2830
2831   This function check if a given node has the needed amount of free
2832   memory. In case the node has less memory or we cannot get the
2833   information from the node, this function raise an OpPrereqError
2834   exception.
2835
2836   @type lu: C{LogicalUnit}
2837   @param lu: a logical unit from which we get configuration data
2838   @type node: C{str}
2839   @param node: the node to check
2840   @type reason: C{str}
2841   @param reason: string to use in the error message
2842   @type requested: C{int}
2843   @param requested: the amount of memory in MiB to check for
2844   @type hypervisor_name: C{str}
2845   @param hypervisor_name: the hypervisor to ask for memory stats
2846   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2847       we cannot check the node
2848
2849   """
2850   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2851   msg = nodeinfo[node].RemoteFailMsg()
2852   if msg:
2853     raise errors.OpPrereqError("Can't get data from node %s: %s" % (node, msg))
2854   free_mem = nodeinfo[node].payload.get('memory_free', None)
2855   if not isinstance(free_mem, int):
2856     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2857                                " was '%s'" % (node, free_mem))
2858   if requested > free_mem:
2859     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2860                                " needed %s MiB, available %s MiB" %
2861                                (node, reason, requested, free_mem))
2862
2863
2864 class LUStartupInstance(LogicalUnit):
2865   """Starts an instance.
2866
2867   """
2868   HPATH = "instance-start"
2869   HTYPE = constants.HTYPE_INSTANCE
2870   _OP_REQP = ["instance_name", "force"]
2871   REQ_BGL = False
2872
2873   def ExpandNames(self):
2874     self._ExpandAndLockInstance()
2875
2876   def BuildHooksEnv(self):
2877     """Build hooks env.
2878
2879     This runs on master, primary and secondary nodes of the instance.
2880
2881     """
2882     env = {
2883       "FORCE": self.op.force,
2884       }
2885     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2886     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2887     return env, nl, nl
2888
2889   def CheckPrereq(self):
2890     """Check prerequisites.
2891
2892     This checks that the instance is in the cluster.
2893
2894     """
2895     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2896     assert self.instance is not None, \
2897       "Cannot retrieve locked instance %s" % self.op.instance_name
2898
2899     # extra beparams
2900     self.beparams = getattr(self.op, "beparams", {})
2901     if self.beparams:
2902       if not isinstance(self.beparams, dict):
2903         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2904                                    " dict" % (type(self.beparams), ))
2905       # fill the beparams dict
2906       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2907       self.op.beparams = self.beparams
2908
2909     # extra hvparams
2910     self.hvparams = getattr(self.op, "hvparams", {})
2911     if self.hvparams:
2912       if not isinstance(self.hvparams, dict):
2913         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2914                                    " dict" % (type(self.hvparams), ))
2915
2916       # check hypervisor parameter syntax (locally)
2917       cluster = self.cfg.GetClusterInfo()
2918       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2919       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2920                                     instance.hvparams)
2921       filled_hvp.update(self.hvparams)
2922       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2923       hv_type.CheckParameterSyntax(filled_hvp)
2924       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2925       self.op.hvparams = self.hvparams
2926
2927     _CheckNodeOnline(self, instance.primary_node)
2928
2929     bep = self.cfg.GetClusterInfo().FillBE(instance)
2930     # check bridges existance
2931     _CheckInstanceBridgesExist(self, instance)
2932
2933     remote_info = self.rpc.call_instance_info(instance.primary_node,
2934                                               instance.name,
2935                                               instance.hypervisor)
2936     msg = remote_info.RemoteFailMsg()
2937     if msg:
2938       raise errors.OpPrereqError("Error checking node %s: %s" %
2939                                  (instance.primary_node, msg))
2940     if not remote_info.payload: # not running already
2941       _CheckNodeFreeMemory(self, instance.primary_node,
2942                            "starting instance %s" % instance.name,
2943                            bep[constants.BE_MEMORY], instance.hypervisor)
2944
2945   def Exec(self, feedback_fn):
2946     """Start the instance.
2947
2948     """
2949     instance = self.instance
2950     force = self.op.force
2951
2952     self.cfg.MarkInstanceUp(instance.name)
2953
2954     node_current = instance.primary_node
2955
2956     _StartInstanceDisks(self, instance, force)
2957
2958     result = self.rpc.call_instance_start(node_current, instance,
2959                                           self.hvparams, self.beparams)
2960     msg = result.RemoteFailMsg()
2961     if msg:
2962       _ShutdownInstanceDisks(self, instance)
2963       raise errors.OpExecError("Could not start instance: %s" % msg)
2964
2965
2966 class LURebootInstance(LogicalUnit):
2967   """Reboot an instance.
2968
2969   """
2970   HPATH = "instance-reboot"
2971   HTYPE = constants.HTYPE_INSTANCE
2972   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2973   REQ_BGL = False
2974
2975   def ExpandNames(self):
2976     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2977                                    constants.INSTANCE_REBOOT_HARD,
2978                                    constants.INSTANCE_REBOOT_FULL]:
2979       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2980                                   (constants.INSTANCE_REBOOT_SOFT,
2981                                    constants.INSTANCE_REBOOT_HARD,
2982                                    constants.INSTANCE_REBOOT_FULL))
2983     self._ExpandAndLockInstance()
2984
2985   def BuildHooksEnv(self):
2986     """Build hooks env.
2987
2988     This runs on master, primary and secondary nodes of the instance.
2989
2990     """
2991     env = {
2992       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2993       "REBOOT_TYPE": self.op.reboot_type,
2994       }
2995     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2996     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2997     return env, nl, nl
2998
2999   def CheckPrereq(self):
3000     """Check prerequisites.
3001
3002     This checks that the instance is in the cluster.
3003
3004     """
3005     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3006     assert self.instance is not None, \
3007       "Cannot retrieve locked instance %s" % self.op.instance_name
3008
3009     _CheckNodeOnline(self, instance.primary_node)
3010
3011     # check bridges existance
3012     _CheckInstanceBridgesExist(self, instance)
3013
3014   def Exec(self, feedback_fn):
3015     """Reboot the instance.
3016
3017     """
3018     instance = self.instance
3019     ignore_secondaries = self.op.ignore_secondaries
3020     reboot_type = self.op.reboot_type
3021
3022     node_current = instance.primary_node
3023
3024     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3025                        constants.INSTANCE_REBOOT_HARD]:
3026       for disk in instance.disks:
3027         self.cfg.SetDiskID(disk, node_current)
3028       result = self.rpc.call_instance_reboot(node_current, instance,
3029                                              reboot_type)
3030       msg = result.RemoteFailMsg()
3031       if msg:
3032         raise errors.OpExecError("Could not reboot instance: %s" % msg)
3033     else:
3034       result = self.rpc.call_instance_shutdown(node_current, instance)
3035       msg = result.RemoteFailMsg()
3036       if msg:
3037         raise errors.OpExecError("Could not shutdown instance for"
3038                                  " full reboot: %s" % msg)
3039       _ShutdownInstanceDisks(self, instance)
3040       _StartInstanceDisks(self, instance, ignore_secondaries)
3041       result = self.rpc.call_instance_start(node_current, instance, None, None)
3042       msg = result.RemoteFailMsg()
3043       if msg:
3044         _ShutdownInstanceDisks(self, instance)
3045         raise errors.OpExecError("Could not start instance for"
3046                                  " full reboot: %s" % msg)
3047
3048     self.cfg.MarkInstanceUp(instance.name)
3049
3050
3051 class LUShutdownInstance(LogicalUnit):
3052   """Shutdown an instance.
3053
3054   """
3055   HPATH = "instance-stop"
3056   HTYPE = constants.HTYPE_INSTANCE
3057   _OP_REQP = ["instance_name"]
3058   REQ_BGL = False
3059
3060   def ExpandNames(self):
3061     self._ExpandAndLockInstance()
3062
3063   def BuildHooksEnv(self):
3064     """Build hooks env.
3065
3066     This runs on master, primary and secondary nodes of the instance.
3067
3068     """
3069     env = _BuildInstanceHookEnvByObject(self, self.instance)
3070     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3071     return env, nl, nl
3072
3073   def CheckPrereq(self):
3074     """Check prerequisites.
3075
3076     This checks that the instance is in the cluster.
3077
3078     """
3079     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3080     assert self.instance is not None, \
3081       "Cannot retrieve locked instance %s" % self.op.instance_name
3082     _CheckNodeOnline(self, self.instance.primary_node)
3083
3084   def Exec(self, feedback_fn):
3085     """Shutdown the instance.
3086
3087     """
3088     instance = self.instance
3089     node_current = instance.primary_node
3090     self.cfg.MarkInstanceDown(instance.name)
3091     result = self.rpc.call_instance_shutdown(node_current, instance)
3092     msg = result.RemoteFailMsg()
3093     if msg:
3094       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3095
3096     _ShutdownInstanceDisks(self, instance)
3097
3098
3099 class LUReinstallInstance(LogicalUnit):
3100   """Reinstall an instance.
3101
3102   """
3103   HPATH = "instance-reinstall"
3104   HTYPE = constants.HTYPE_INSTANCE
3105   _OP_REQP = ["instance_name"]
3106   REQ_BGL = False
3107
3108   def ExpandNames(self):
3109     self._ExpandAndLockInstance()
3110
3111   def BuildHooksEnv(self):
3112     """Build hooks env.
3113
3114     This runs on master, primary and secondary nodes of the instance.
3115
3116     """
3117     env = _BuildInstanceHookEnvByObject(self, self.instance)
3118     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3119     return env, nl, nl
3120
3121   def CheckPrereq(self):
3122     """Check prerequisites.
3123
3124     This checks that the instance is in the cluster and is not running.
3125
3126     """
3127     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3128     assert instance is not None, \
3129       "Cannot retrieve locked instance %s" % self.op.instance_name
3130     _CheckNodeOnline(self, instance.primary_node)
3131
3132     if instance.disk_template == constants.DT_DISKLESS:
3133       raise errors.OpPrereqError("Instance '%s' has no disks" %
3134                                  self.op.instance_name)
3135     if instance.admin_up:
3136       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3137                                  self.op.instance_name)
3138     remote_info = self.rpc.call_instance_info(instance.primary_node,
3139                                               instance.name,
3140                                               instance.hypervisor)
3141     msg = remote_info.RemoteFailMsg()
3142     if msg:
3143       raise errors.OpPrereqError("Error checking node %s: %s" %
3144                                  (instance.primary_node, msg))
3145     if remote_info.payload:
3146       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3147                                  (self.op.instance_name,
3148                                   instance.primary_node))
3149
3150     self.op.os_type = getattr(self.op, "os_type", None)
3151     if self.op.os_type is not None:
3152       # OS verification
3153       pnode = self.cfg.GetNodeInfo(
3154         self.cfg.ExpandNodeName(instance.primary_node))
3155       if pnode is None:
3156         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3157                                    self.op.pnode)
3158       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3159       result.Raise()
3160       if not isinstance(result.data, objects.OS):
3161         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3162                                    " primary node"  % self.op.os_type)
3163
3164     self.instance = instance
3165
3166   def Exec(self, feedback_fn):
3167     """Reinstall the instance.
3168
3169     """
3170     inst = self.instance
3171
3172     if self.op.os_type is not None:
3173       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3174       inst.os = self.op.os_type
3175       self.cfg.Update(inst)
3176
3177     _StartInstanceDisks(self, inst, None)
3178     try:
3179       feedback_fn("Running the instance OS create scripts...")
3180       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3181       msg = result.RemoteFailMsg()
3182       if msg:
3183         raise errors.OpExecError("Could not install OS for instance %s"
3184                                  " on node %s: %s" %
3185                                  (inst.name, inst.primary_node, msg))
3186     finally:
3187       _ShutdownInstanceDisks(self, inst)
3188
3189
3190 class LURenameInstance(LogicalUnit):
3191   """Rename an instance.
3192
3193   """
3194   HPATH = "instance-rename"
3195   HTYPE = constants.HTYPE_INSTANCE
3196   _OP_REQP = ["instance_name", "new_name"]
3197
3198   def BuildHooksEnv(self):
3199     """Build hooks env.
3200
3201     This runs on master, primary and secondary nodes of the instance.
3202
3203     """
3204     env = _BuildInstanceHookEnvByObject(self, self.instance)
3205     env["INSTANCE_NEW_NAME"] = self.op.new_name
3206     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3207     return env, nl, nl
3208
3209   def CheckPrereq(self):
3210     """Check prerequisites.
3211
3212     This checks that the instance is in the cluster and is not running.
3213
3214     """
3215     instance = self.cfg.GetInstanceInfo(
3216       self.cfg.ExpandInstanceName(self.op.instance_name))
3217     if instance is None:
3218       raise errors.OpPrereqError("Instance '%s' not known" %
3219                                  self.op.instance_name)
3220     _CheckNodeOnline(self, instance.primary_node)
3221
3222     if instance.admin_up:
3223       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3224                                  self.op.instance_name)
3225     remote_info = self.rpc.call_instance_info(instance.primary_node,
3226                                               instance.name,
3227                                               instance.hypervisor)
3228     msg = remote_info.RemoteFailMsg()
3229     if msg:
3230       raise errors.OpPrereqError("Error checking node %s: %s" %
3231                                  (instance.primary_node, msg))
3232     if remote_info.payload:
3233       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3234                                  (self.op.instance_name,
3235                                   instance.primary_node))
3236     self.instance = instance
3237
3238     # new name verification
3239     name_info = utils.HostInfo(self.op.new_name)
3240
3241     self.op.new_name = new_name = name_info.name
3242     instance_list = self.cfg.GetInstanceList()
3243     if new_name in instance_list:
3244       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3245                                  new_name)
3246
3247     if not getattr(self.op, "ignore_ip", False):
3248       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3249         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3250                                    (name_info.ip, new_name))
3251
3252
3253   def Exec(self, feedback_fn):
3254     """Reinstall the instance.
3255
3256     """
3257     inst = self.instance
3258     old_name = inst.name
3259
3260     if inst.disk_template == constants.DT_FILE:
3261       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3262
3263     self.cfg.RenameInstance(inst.name, self.op.new_name)
3264     # Change the instance lock. This is definitely safe while we hold the BGL
3265     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3266     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3267
3268     # re-read the instance from the configuration after rename
3269     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3270
3271     if inst.disk_template == constants.DT_FILE:
3272       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3273       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3274                                                      old_file_storage_dir,
3275                                                      new_file_storage_dir)
3276       result.Raise()
3277       if not result.data:
3278         raise errors.OpExecError("Could not connect to node '%s' to rename"
3279                                  " directory '%s' to '%s' (but the instance"
3280                                  " has been renamed in Ganeti)" % (
3281                                  inst.primary_node, old_file_storage_dir,
3282                                  new_file_storage_dir))
3283
3284       if not result.data[0]:
3285         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3286                                  " (but the instance has been renamed in"
3287                                  " Ganeti)" % (old_file_storage_dir,
3288                                                new_file_storage_dir))
3289
3290     _StartInstanceDisks(self, inst, None)
3291     try:
3292       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3293                                                  old_name)
3294       msg = result.RemoteFailMsg()
3295       if msg:
3296         msg = ("Could not run OS rename script for instance %s on node %s"
3297                " (but the instance has been renamed in Ganeti): %s" %
3298                (inst.name, inst.primary_node, msg))
3299         self.proc.LogWarning(msg)
3300     finally:
3301       _ShutdownInstanceDisks(self, inst)
3302
3303
3304 class LURemoveInstance(LogicalUnit):
3305   """Remove an instance.
3306
3307   """
3308   HPATH = "instance-remove"
3309   HTYPE = constants.HTYPE_INSTANCE
3310   _OP_REQP = ["instance_name", "ignore_failures"]
3311   REQ_BGL = False
3312
3313   def ExpandNames(self):
3314     self._ExpandAndLockInstance()
3315     self.needed_locks[locking.LEVEL_NODE] = []
3316     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3317
3318   def DeclareLocks(self, level):
3319     if level == locking.LEVEL_NODE:
3320       self._LockInstancesNodes()
3321
3322   def BuildHooksEnv(self):
3323     """Build hooks env.
3324
3325     This runs on master, primary and secondary nodes of the instance.
3326
3327     """
3328     env = _BuildInstanceHookEnvByObject(self, self.instance)
3329     nl = [self.cfg.GetMasterNode()]
3330     return env, nl, nl
3331
3332   def CheckPrereq(self):
3333     """Check prerequisites.
3334
3335     This checks that the instance is in the cluster.
3336
3337     """
3338     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3339     assert self.instance is not None, \
3340       "Cannot retrieve locked instance %s" % self.op.instance_name
3341
3342   def Exec(self, feedback_fn):
3343     """Remove the instance.
3344
3345     """
3346     instance = self.instance
3347     logging.info("Shutting down instance %s on node %s",
3348                  instance.name, instance.primary_node)
3349
3350     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3351     msg = result.RemoteFailMsg()
3352     if msg:
3353       if self.op.ignore_failures:
3354         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3355       else:
3356         raise errors.OpExecError("Could not shutdown instance %s on"
3357                                  " node %s: %s" %
3358                                  (instance.name, instance.primary_node, msg))
3359
3360     logging.info("Removing block devices for instance %s", instance.name)
3361
3362     if not _RemoveDisks(self, instance):
3363       if self.op.ignore_failures:
3364         feedback_fn("Warning: can't remove instance's disks")
3365       else:
3366         raise errors.OpExecError("Can't remove instance's disks")
3367
3368     logging.info("Removing instance %s out of cluster config", instance.name)
3369
3370     self.cfg.RemoveInstance(instance.name)
3371     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3372
3373
3374 class LUQueryInstances(NoHooksLU):
3375   """Logical unit for querying instances.
3376
3377   """
3378   _OP_REQP = ["output_fields", "names", "use_locking"]
3379   REQ_BGL = False
3380   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3381                                     "admin_state",
3382                                     "disk_template", "ip", "mac", "bridge",
3383                                     "sda_size", "sdb_size", "vcpus", "tags",
3384                                     "network_port", "beparams",
3385                                     r"(disk)\.(size)/([0-9]+)",
3386                                     r"(disk)\.(sizes)", "disk_usage",
3387                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3388                                     r"(nic)\.(macs|ips|bridges)",
3389                                     r"(disk|nic)\.(count)",
3390                                     "serial_no", "hypervisor", "hvparams",] +
3391                                   ["hv/%s" % name
3392                                    for name in constants.HVS_PARAMETERS] +
3393                                   ["be/%s" % name
3394                                    for name in constants.BES_PARAMETERS])
3395   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3396
3397
3398   def ExpandNames(self):
3399     _CheckOutputFields(static=self._FIELDS_STATIC,
3400                        dynamic=self._FIELDS_DYNAMIC,
3401                        selected=self.op.output_fields)
3402
3403     self.needed_locks = {}
3404     self.share_locks[locking.LEVEL_INSTANCE] = 1
3405     self.share_locks[locking.LEVEL_NODE] = 1
3406
3407     if self.op.names:
3408       self.wanted = _GetWantedInstances(self, self.op.names)
3409     else:
3410       self.wanted = locking.ALL_SET
3411
3412     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3413     self.do_locking = self.do_node_query and self.op.use_locking
3414     if self.do_locking:
3415       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3416       self.needed_locks[locking.LEVEL_NODE] = []
3417       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3418
3419   def DeclareLocks(self, level):
3420     if level == locking.LEVEL_NODE and self.do_locking:
3421       self._LockInstancesNodes()
3422
3423   def CheckPrereq(self):
3424     """Check prerequisites.
3425
3426     """
3427     pass
3428
3429   def Exec(self, feedback_fn):
3430     """Computes the list of nodes and their attributes.
3431
3432     """
3433     all_info = self.cfg.GetAllInstancesInfo()
3434     if self.wanted == locking.ALL_SET:
3435       # caller didn't specify instance names, so ordering is not important
3436       if self.do_locking:
3437         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3438       else:
3439         instance_names = all_info.keys()
3440       instance_names = utils.NiceSort(instance_names)
3441     else:
3442       # caller did specify names, so we must keep the ordering
3443       if self.do_locking:
3444         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3445       else:
3446         tgt_set = all_info.keys()
3447       missing = set(self.wanted).difference(tgt_set)
3448       if missing:
3449         raise errors.OpExecError("Some instances were removed before"
3450                                  " retrieving their data: %s" % missing)
3451       instance_names = self.wanted
3452
3453     instance_list = [all_info[iname] for iname in instance_names]
3454
3455     # begin data gathering
3456
3457     nodes = frozenset([inst.primary_node for inst in instance_list])
3458     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3459
3460     bad_nodes = []
3461     off_nodes = []
3462     if self.do_node_query:
3463       live_data = {}
3464       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3465       for name in nodes:
3466         result = node_data[name]
3467         if result.offline:
3468           # offline nodes will be in both lists
3469           off_nodes.append(name)
3470         if result.failed or result.RemoteFailMsg():
3471           bad_nodes.append(name)
3472         else:
3473           if result.payload:
3474             live_data.update(result.payload)
3475           # else no instance is alive
3476     else:
3477       live_data = dict([(name, {}) for name in instance_names])
3478
3479     # end data gathering
3480
3481     HVPREFIX = "hv/"
3482     BEPREFIX = "be/"
3483     output = []
3484     for instance in instance_list:
3485       iout = []
3486       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3487       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3488       for field in self.op.output_fields:
3489         st_match = self._FIELDS_STATIC.Matches(field)
3490         if field == "name":
3491           val = instance.name
3492         elif field == "os":
3493           val = instance.os
3494         elif field == "pnode":
3495           val = instance.primary_node
3496         elif field == "snodes":
3497           val = list(instance.secondary_nodes)
3498         elif field == "admin_state":
3499           val = instance.admin_up
3500         elif field == "oper_state":
3501           if instance.primary_node in bad_nodes:
3502             val = None
3503           else:
3504             val = bool(live_data.get(instance.name))
3505         elif field == "status":
3506           if instance.primary_node in off_nodes:
3507             val = "ERROR_nodeoffline"
3508           elif instance.primary_node in bad_nodes:
3509             val = "ERROR_nodedown"
3510           else:
3511             running = bool(live_data.get(instance.name))
3512             if running:
3513               if instance.admin_up:
3514                 val = "running"
3515               else:
3516                 val = "ERROR_up"
3517             else:
3518               if instance.admin_up:
3519                 val = "ERROR_down"
3520               else:
3521                 val = "ADMIN_down"
3522         elif field == "oper_ram":
3523           if instance.primary_node in bad_nodes:
3524             val = None
3525           elif instance.name in live_data:
3526             val = live_data[instance.name].get("memory", "?")
3527           else:
3528             val = "-"
3529         elif field == "disk_template":
3530           val = instance.disk_template
3531         elif field == "ip":
3532           val = instance.nics[0].ip
3533         elif field == "bridge":
3534           val = instance.nics[0].bridge
3535         elif field == "mac":
3536           val = instance.nics[0].mac
3537         elif field == "sda_size" or field == "sdb_size":
3538           idx = ord(field[2]) - ord('a')
3539           try:
3540             val = instance.FindDisk(idx).size
3541           except errors.OpPrereqError:
3542             val = None
3543         elif field == "disk_usage": # total disk usage per node
3544           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3545           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3546         elif field == "tags":
3547           val = list(instance.GetTags())
3548         elif field == "serial_no":
3549           val = instance.serial_no
3550         elif field == "network_port":
3551           val = instance.network_port
3552         elif field == "hypervisor":
3553           val = instance.hypervisor
3554         elif field == "hvparams":
3555           val = i_hv
3556         elif (field.startswith(HVPREFIX) and
3557               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3558           val = i_hv.get(field[len(HVPREFIX):], None)
3559         elif field == "beparams":
3560           val = i_be
3561         elif (field.startswith(BEPREFIX) and
3562               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3563           val = i_be.get(field[len(BEPREFIX):], None)
3564         elif st_match and st_match.groups():
3565           # matches a variable list
3566           st_groups = st_match.groups()
3567           if st_groups and st_groups[0] == "disk":
3568             if st_groups[1] == "count":
3569               val = len(instance.disks)
3570             elif st_groups[1] == "sizes":
3571               val = [disk.size for disk in instance.disks]
3572             elif st_groups[1] == "size":
3573               try:
3574                 val = instance.FindDisk(st_groups[2]).size
3575               except errors.OpPrereqError:
3576                 val = None
3577             else:
3578               assert False, "Unhandled disk parameter"
3579           elif st_groups[0] == "nic":
3580             if st_groups[1] == "count":
3581               val = len(instance.nics)
3582             elif st_groups[1] == "macs":
3583               val = [nic.mac for nic in instance.nics]
3584             elif st_groups[1] == "ips":
3585               val = [nic.ip for nic in instance.nics]
3586             elif st_groups[1] == "bridges":
3587               val = [nic.bridge for nic in instance.nics]
3588             else:
3589               # index-based item
3590               nic_idx = int(st_groups[2])
3591               if nic_idx >= len(instance.nics):
3592                 val = None
3593               else:
3594                 if st_groups[1] == "mac":
3595                   val = instance.nics[nic_idx].mac
3596                 elif st_groups[1] == "ip":
3597                   val = instance.nics[nic_idx].ip
3598                 elif st_groups[1] == "bridge":
3599                   val = instance.nics[nic_idx].bridge
3600                 else:
3601                   assert False, "Unhandled NIC parameter"
3602           else:
3603             assert False, "Unhandled variable parameter"
3604         else:
3605           raise errors.ParameterError(field)
3606         iout.append(val)
3607       output.append(iout)
3608
3609     return output
3610
3611
3612 class LUFailoverInstance(LogicalUnit):
3613   """Failover an instance.
3614
3615   """
3616   HPATH = "instance-failover"
3617   HTYPE = constants.HTYPE_INSTANCE
3618   _OP_REQP = ["instance_name", "ignore_consistency"]
3619   REQ_BGL = False
3620
3621   def ExpandNames(self):
3622     self._ExpandAndLockInstance()
3623     self.needed_locks[locking.LEVEL_NODE] = []
3624     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3625
3626   def DeclareLocks(self, level):
3627     if level == locking.LEVEL_NODE:
3628       self._LockInstancesNodes()
3629
3630   def BuildHooksEnv(self):
3631     """Build hooks env.
3632
3633     This runs on master, primary and secondary nodes of the instance.
3634
3635     """
3636     env = {
3637       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3638       }
3639     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3640     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3641     return env, nl, nl
3642
3643   def CheckPrereq(self):
3644     """Check prerequisites.
3645
3646     This checks that the instance is in the cluster.
3647
3648     """
3649     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3650     assert self.instance is not None, \
3651       "Cannot retrieve locked instance %s" % self.op.instance_name
3652
3653     bep = self.cfg.GetClusterInfo().FillBE(instance)
3654     if instance.disk_template not in constants.DTS_NET_MIRROR:
3655       raise errors.OpPrereqError("Instance's disk layout is not"
3656                                  " network mirrored, cannot failover.")
3657
3658     secondary_nodes = instance.secondary_nodes
3659     if not secondary_nodes:
3660       raise errors.ProgrammerError("no secondary node but using "
3661                                    "a mirrored disk template")
3662
3663     target_node = secondary_nodes[0]
3664     _CheckNodeOnline(self, target_node)
3665     _CheckNodeNotDrained(self, target_node)
3666     # check memory requirements on the secondary node
3667     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3668                          instance.name, bep[constants.BE_MEMORY],
3669                          instance.hypervisor)
3670     # check bridge existance
3671     _CheckInstanceBridgesExist(self, instance, node=target_node)
3672
3673   def Exec(self, feedback_fn):
3674     """Failover an instance.
3675
3676     The failover is done by shutting it down on its present node and
3677     starting it on the secondary.
3678
3679     """
3680     instance = self.instance
3681
3682     source_node = instance.primary_node
3683     target_node = instance.secondary_nodes[0]
3684
3685     feedback_fn("* checking disk consistency between source and target")
3686     for dev in instance.disks:
3687       # for drbd, these are drbd over lvm
3688       if not _CheckDiskConsistency(self, dev, target_node, False):
3689         if instance.admin_up and not self.op.ignore_consistency:
3690           raise errors.OpExecError("Disk %s is degraded on target node,"
3691                                    " aborting failover." % dev.iv_name)
3692
3693     feedback_fn("* shutting down instance on source node")
3694     logging.info("Shutting down instance %s on node %s",
3695                  instance.name, source_node)
3696
3697     result = self.rpc.call_instance_shutdown(source_node, instance)
3698     msg = result.RemoteFailMsg()
3699     if msg:
3700       if self.op.ignore_consistency:
3701         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3702                              " Proceeding anyway. Please make sure node"
3703                              " %s is down. Error details: %s",
3704                              instance.name, source_node, source_node, msg)
3705       else:
3706         raise errors.OpExecError("Could not shutdown instance %s on"
3707                                  " node %s: %s" %
3708                                  (instance.name, source_node, msg))
3709
3710     feedback_fn("* deactivating the instance's disks on source node")
3711     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3712       raise errors.OpExecError("Can't shut down the instance's disks.")
3713
3714     instance.primary_node = target_node
3715     # distribute new instance config to the other nodes
3716     self.cfg.Update(instance)
3717
3718     # Only start the instance if it's marked as up
3719     if instance.admin_up:
3720       feedback_fn("* activating the instance's disks on target node")
3721       logging.info("Starting instance %s on node %s",
3722                    instance.name, target_node)
3723
3724       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3725                                                ignore_secondaries=True)
3726       if not disks_ok:
3727         _ShutdownInstanceDisks(self, instance)
3728         raise errors.OpExecError("Can't activate the instance's disks")
3729
3730       feedback_fn("* starting the instance on the target node")
3731       result = self.rpc.call_instance_start(target_node, instance, None, None)
3732       msg = result.RemoteFailMsg()
3733       if msg:
3734         _ShutdownInstanceDisks(self, instance)
3735         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3736                                  (instance.name, target_node, msg))
3737
3738
3739 class LUMigrateInstance(LogicalUnit):
3740   """Migrate an instance.
3741
3742   This is migration without shutting down, compared to the failover,
3743   which is done with shutdown.
3744
3745   """
3746   HPATH = "instance-migrate"
3747   HTYPE = constants.HTYPE_INSTANCE
3748   _OP_REQP = ["instance_name", "live", "cleanup"]
3749
3750   REQ_BGL = False
3751
3752   def ExpandNames(self):
3753     self._ExpandAndLockInstance()
3754     self.needed_locks[locking.LEVEL_NODE] = []
3755     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3756
3757   def DeclareLocks(self, level):
3758     if level == locking.LEVEL_NODE:
3759       self._LockInstancesNodes()
3760
3761   def BuildHooksEnv(self):
3762     """Build hooks env.
3763
3764     This runs on master, primary and secondary nodes of the instance.
3765
3766     """
3767     env = _BuildInstanceHookEnvByObject(self, self.instance)
3768     env["MIGRATE_LIVE"] = self.op.live
3769     env["MIGRATE_CLEANUP"] = self.op.cleanup
3770     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3771     return env, nl, nl
3772
3773   def CheckPrereq(self):
3774     """Check prerequisites.
3775
3776     This checks that the instance is in the cluster.
3777
3778     """
3779     instance = self.cfg.GetInstanceInfo(
3780       self.cfg.ExpandInstanceName(self.op.instance_name))
3781     if instance is None:
3782       raise errors.OpPrereqError("Instance '%s' not known" %
3783                                  self.op.instance_name)
3784
3785     if instance.disk_template != constants.DT_DRBD8:
3786       raise errors.OpPrereqError("Instance's disk layout is not"
3787                                  " drbd8, cannot migrate.")
3788
3789     secondary_nodes = instance.secondary_nodes
3790     if not secondary_nodes:
3791       raise errors.ConfigurationError("No secondary node but using"
3792                                       " drbd8 disk template")
3793
3794     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3795
3796     target_node = secondary_nodes[0]
3797     # check memory requirements on the secondary node
3798     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3799                          instance.name, i_be[constants.BE_MEMORY],
3800                          instance.hypervisor)
3801
3802     # check bridge existance
3803     _CheckInstanceBridgesExist(self, instance, node=target_node)
3804
3805     if not self.op.cleanup:
3806       _CheckNodeNotDrained(self, target_node)
3807       result = self.rpc.call_instance_migratable(instance.primary_node,
3808                                                  instance)
3809       msg = result.RemoteFailMsg()
3810       if msg:
3811         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3812                                    msg)
3813
3814     self.instance = instance
3815
3816   def _WaitUntilSync(self):
3817     """Poll with custom rpc for disk sync.
3818
3819     This uses our own step-based rpc call.
3820
3821     """
3822     self.feedback_fn("* wait until resync is done")
3823     all_done = False
3824     while not all_done:
3825       all_done = True
3826       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3827                                             self.nodes_ip,
3828                                             self.instance.disks)
3829       min_percent = 100
3830       for node, nres in result.items():
3831         msg = nres.RemoteFailMsg()
3832         if msg:
3833           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3834                                    (node, msg))
3835         node_done, node_percent = nres.payload
3836         all_done = all_done and node_done
3837         if node_percent is not None:
3838           min_percent = min(min_percent, node_percent)
3839       if not all_done:
3840         if min_percent < 100:
3841           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3842         time.sleep(2)
3843
3844   def _EnsureSecondary(self, node):
3845     """Demote a node to secondary.
3846
3847     """
3848     self.feedback_fn("* switching node %s to secondary mode" % node)
3849
3850     for dev in self.instance.disks:
3851       self.cfg.SetDiskID(dev, node)
3852
3853     result = self.rpc.call_blockdev_close(node, self.instance.name,
3854                                           self.instance.disks)
3855     msg = result.RemoteFailMsg()
3856     if msg:
3857       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3858                                " error %s" % (node, msg))
3859
3860   def _GoStandalone(self):
3861     """Disconnect from the network.
3862
3863     """
3864     self.feedback_fn("* changing into standalone mode")
3865     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3866                                                self.instance.disks)
3867     for node, nres in result.items():
3868       msg = nres.RemoteFailMsg()
3869       if msg:
3870         raise errors.OpExecError("Cannot disconnect disks node %s,"
3871                                  " error %s" % (node, msg))
3872
3873   def _GoReconnect(self, multimaster):
3874     """Reconnect to the network.
3875
3876     """
3877     if multimaster:
3878       msg = "dual-master"
3879     else:
3880       msg = "single-master"
3881     self.feedback_fn("* changing disks into %s mode" % msg)
3882     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3883                                            self.instance.disks,
3884                                            self.instance.name, multimaster)
3885     for node, nres in result.items():
3886       msg = nres.RemoteFailMsg()
3887       if msg:
3888         raise errors.OpExecError("Cannot change disks config on node %s,"
3889                                  " error: %s" % (node, msg))
3890
3891   def _ExecCleanup(self):
3892     """Try to cleanup after a failed migration.
3893
3894     The cleanup is done by:
3895       - check that the instance is running only on one node
3896         (and update the config if needed)
3897       - change disks on its secondary node to secondary
3898       - wait until disks are fully synchronized
3899       - disconnect from the network
3900       - change disks into single-master mode
3901       - wait again until disks are fully synchronized
3902
3903     """
3904     instance = self.instance
3905     target_node = self.target_node
3906     source_node = self.source_node
3907
3908     # check running on only one node
3909     self.feedback_fn("* checking where the instance actually runs"
3910                      " (if this hangs, the hypervisor might be in"
3911                      " a bad state)")
3912     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3913     for node, result in ins_l.items():
3914       msg = result.RemoteFailMsg()
3915       if msg:
3916         raise errors.OpExecError("Can't contact node %s: %s" % (node, msg))
3917
3918     runningon_source = instance.name in ins_l[source_node].payload
3919     runningon_target = instance.name in ins_l[target_node].payload
3920
3921     if runningon_source and runningon_target:
3922       raise errors.OpExecError("Instance seems to be running on two nodes,"
3923                                " or the hypervisor is confused. You will have"
3924                                " to ensure manually that it runs only on one"
3925                                " and restart this operation.")
3926
3927     if not (runningon_source or runningon_target):
3928       raise errors.OpExecError("Instance does not seem to be running at all."
3929                                " In this case, it's safer to repair by"
3930                                " running 'gnt-instance stop' to ensure disk"
3931                                " shutdown, and then restarting it.")
3932
3933     if runningon_target:
3934       # the migration has actually succeeded, we need to update the config
3935       self.feedback_fn("* instance running on secondary node (%s),"
3936                        " updating config" % target_node)
3937       instance.primary_node = target_node
3938       self.cfg.Update(instance)
3939       demoted_node = source_node
3940     else:
3941       self.feedback_fn("* instance confirmed to be running on its"
3942                        " primary node (%s)" % source_node)
3943       demoted_node = target_node
3944
3945     self._EnsureSecondary(demoted_node)
3946     try:
3947       self._WaitUntilSync()
3948     except errors.OpExecError:
3949       # we ignore here errors, since if the device is standalone, it
3950       # won't be able to sync
3951       pass
3952     self._GoStandalone()
3953     self._GoReconnect(False)
3954     self._WaitUntilSync()
3955
3956     self.feedback_fn("* done")
3957
3958   def _RevertDiskStatus(self):
3959     """Try to revert the disk status after a failed migration.
3960
3961     """
3962     target_node = self.target_node
3963     try:
3964       self._EnsureSecondary(target_node)
3965       self._GoStandalone()
3966       self._GoReconnect(False)
3967       self._WaitUntilSync()
3968     except errors.OpExecError, err:
3969       self.LogWarning("Migration failed and I can't reconnect the"
3970                       " drives: error '%s'\n"
3971                       "Please look and recover the instance status" %
3972                       str(err))
3973
3974   def _AbortMigration(self):
3975     """Call the hypervisor code to abort a started migration.
3976
3977     """
3978     instance = self.instance
3979     target_node = self.target_node
3980     migration_info = self.migration_info
3981
3982     abort_result = self.rpc.call_finalize_migration(target_node,
3983                                                     instance,
3984                                                     migration_info,
3985                                                     False)
3986     abort_msg = abort_result.RemoteFailMsg()
3987     if abort_msg:
3988       logging.error("Aborting migration failed on target node %s: %s" %
3989                     (target_node, abort_msg))
3990       # Don't raise an exception here, as we stil have to try to revert the
3991       # disk status, even if this step failed.
3992
3993   def _ExecMigration(self):
3994     """Migrate an instance.
3995
3996     The migrate is done by:
3997       - change the disks into dual-master mode
3998       - wait until disks are fully synchronized again
3999       - migrate the instance
4000       - change disks on the new secondary node (the old primary) to secondary
4001       - wait until disks are fully synchronized
4002       - change disks into single-master mode
4003
4004     """
4005     instance = self.instance
4006     target_node = self.target_node
4007     source_node = self.source_node
4008
4009     self.feedback_fn("* checking disk consistency between source and target")
4010     for dev in instance.disks:
4011       if not _CheckDiskConsistency(self, dev, target_node, False):
4012         raise errors.OpExecError("Disk %s is degraded or not fully"
4013                                  " synchronized on target node,"
4014                                  " aborting migrate." % dev.iv_name)
4015
4016     # First get the migration information from the remote node
4017     result = self.rpc.call_migration_info(source_node, instance)
4018     msg = result.RemoteFailMsg()
4019     if msg:
4020       log_err = ("Failed fetching source migration information from %s: %s" %
4021                  (source_node, msg))
4022       logging.error(log_err)
4023       raise errors.OpExecError(log_err)
4024
4025     self.migration_info = migration_info = result.payload
4026
4027     # Then switch the disks to master/master mode
4028     self._EnsureSecondary(target_node)
4029     self._GoStandalone()
4030     self._GoReconnect(True)
4031     self._WaitUntilSync()
4032
4033     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4034     result = self.rpc.call_accept_instance(target_node,
4035                                            instance,
4036                                            migration_info,
4037                                            self.nodes_ip[target_node])
4038
4039     msg = result.RemoteFailMsg()
4040     if msg:
4041       logging.error("Instance pre-migration failed, trying to revert"
4042                     " disk status: %s", msg)
4043       self._AbortMigration()
4044       self._RevertDiskStatus()
4045       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4046                                (instance.name, msg))
4047
4048     self.feedback_fn("* migrating instance to %s" % target_node)
4049     time.sleep(10)
4050     result = self.rpc.call_instance_migrate(source_node, instance,
4051                                             self.nodes_ip[target_node],
4052                                             self.op.live)
4053     msg = result.RemoteFailMsg()
4054     if msg:
4055       logging.error("Instance migration failed, trying to revert"
4056                     " disk status: %s", msg)
4057       self._AbortMigration()
4058       self._RevertDiskStatus()
4059       raise errors.OpExecError("Could not migrate instance %s: %s" %
4060                                (instance.name, msg))
4061     time.sleep(10)
4062
4063     instance.primary_node = target_node
4064     # distribute new instance config to the other nodes
4065     self.cfg.Update(instance)
4066
4067     result = self.rpc.call_finalize_migration(target_node,
4068                                               instance,
4069                                               migration_info,
4070                                               True)
4071     msg = result.RemoteFailMsg()
4072     if msg:
4073       logging.error("Instance migration succeeded, but finalization failed:"
4074                     " %s" % msg)
4075       raise errors.OpExecError("Could not finalize instance migration: %s" %
4076                                msg)
4077
4078     self._EnsureSecondary(source_node)
4079     self._WaitUntilSync()
4080     self._GoStandalone()
4081     self._GoReconnect(False)
4082     self._WaitUntilSync()
4083
4084     self.feedback_fn("* done")
4085
4086   def Exec(self, feedback_fn):
4087     """Perform the migration.
4088
4089     """
4090     self.feedback_fn = feedback_fn
4091
4092     self.source_node = self.instance.primary_node
4093     self.target_node = self.instance.secondary_nodes[0]
4094     self.all_nodes = [self.source_node, self.target_node]
4095     self.nodes_ip = {
4096       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4097       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4098       }
4099     if self.op.cleanup:
4100       return self._ExecCleanup()
4101     else:
4102       return self._ExecMigration()
4103
4104
4105 def _CreateBlockDev(lu, node, instance, device, force_create,
4106                     info, force_open):
4107   """Create a tree of block devices on a given node.
4108
4109   If this device type has to be created on secondaries, create it and
4110   all its children.
4111
4112   If not, just recurse to children keeping the same 'force' value.
4113
4114   @param lu: the lu on whose behalf we execute
4115   @param node: the node on which to create the device
4116   @type instance: L{objects.Instance}
4117   @param instance: the instance which owns the device
4118   @type device: L{objects.Disk}
4119   @param device: the device to create
4120   @type force_create: boolean
4121   @param force_create: whether to force creation of this device; this
4122       will be change to True whenever we find a device which has
4123       CreateOnSecondary() attribute
4124   @param info: the extra 'metadata' we should attach to the device
4125       (this will be represented as a LVM tag)
4126   @type force_open: boolean
4127   @param force_open: this parameter will be passes to the
4128       L{backend.BlockdevCreate} function where it specifies
4129       whether we run on primary or not, and it affects both
4130       the child assembly and the device own Open() execution
4131
4132   """
4133   if device.CreateOnSecondary():
4134     force_create = True
4135
4136   if device.children:
4137     for child in device.children:
4138       _CreateBlockDev(lu, node, instance, child, force_create,
4139                       info, force_open)
4140
4141   if not force_create:
4142     return
4143
4144   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4145
4146
4147 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4148   """Create a single block device on a given node.
4149
4150   This will not recurse over children of the device, so they must be
4151   created in advance.
4152
4153   @param lu: the lu on whose behalf we execute
4154   @param node: the node on which to create the device
4155   @type instance: L{objects.Instance}
4156   @param instance: the instance which owns the device
4157   @type device: L{objects.Disk}
4158   @param device: the device to create
4159   @param info: the extra 'metadata' we should attach to the device
4160       (this will be represented as a LVM tag)
4161   @type force_open: boolean
4162   @param force_open: this parameter will be passes to the
4163       L{backend.BlockdevCreate} function where it specifies
4164       whether we run on primary or not, and it affects both
4165       the child assembly and the device own Open() execution
4166
4167   """
4168   lu.cfg.SetDiskID(device, node)
4169   result = lu.rpc.call_blockdev_create(node, device, device.size,
4170                                        instance.name, force_open, info)
4171   msg = result.RemoteFailMsg()
4172   if msg:
4173     raise errors.OpExecError("Can't create block device %s on"
4174                              " node %s for instance %s: %s" %
4175                              (device, node, instance.name, msg))
4176   if device.physical_id is None:
4177     device.physical_id = result.payload
4178
4179
4180 def _GenerateUniqueNames(lu, exts):
4181   """Generate a suitable LV name.
4182
4183   This will generate a logical volume name for the given instance.
4184
4185   """
4186   results = []
4187   for val in exts:
4188     new_id = lu.cfg.GenerateUniqueID()
4189     results.append("%s%s" % (new_id, val))
4190   return results
4191
4192
4193 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4194                          p_minor, s_minor):
4195   """Generate a drbd8 device complete with its children.
4196
4197   """
4198   port = lu.cfg.AllocatePort()
4199   vgname = lu.cfg.GetVGName()
4200   shared_secret = lu.cfg.GenerateDRBDSecret()
4201   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4202                           logical_id=(vgname, names[0]))
4203   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4204                           logical_id=(vgname, names[1]))
4205   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4206                           logical_id=(primary, secondary, port,
4207                                       p_minor, s_minor,
4208                                       shared_secret),
4209                           children=[dev_data, dev_meta],
4210                           iv_name=iv_name)
4211   return drbd_dev
4212
4213
4214 def _GenerateDiskTemplate(lu, template_name,
4215                           instance_name, primary_node,
4216                           secondary_nodes, disk_info,
4217                           file_storage_dir, file_driver,
4218                           base_index):
4219   """Generate the entire disk layout for a given template type.
4220
4221   """
4222   #TODO: compute space requirements
4223
4224   vgname = lu.cfg.GetVGName()
4225   disk_count = len(disk_info)
4226   disks = []
4227   if template_name == constants.DT_DISKLESS:
4228     pass
4229   elif template_name == constants.DT_PLAIN:
4230     if len(secondary_nodes) != 0:
4231       raise errors.ProgrammerError("Wrong template configuration")
4232
4233     names = _GenerateUniqueNames(lu, [".disk%d" % i
4234                                       for i in range(disk_count)])
4235     for idx, disk in enumerate(disk_info):
4236       disk_index = idx + base_index
4237       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4238                               logical_id=(vgname, names[idx]),
4239                               iv_name="disk/%d" % disk_index,
4240                               mode=disk["mode"])
4241       disks.append(disk_dev)
4242   elif template_name == constants.DT_DRBD8:
4243     if len(secondary_nodes) != 1:
4244       raise errors.ProgrammerError("Wrong template configuration")
4245     remote_node = secondary_nodes[0]
4246     minors = lu.cfg.AllocateDRBDMinor(
4247       [primary_node, remote_node] * len(disk_info), instance_name)
4248
4249     names = []
4250     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4251                                                for i in range(disk_count)]):
4252       names.append(lv_prefix + "_data")
4253       names.append(lv_prefix + "_meta")
4254     for idx, disk in enumerate(disk_info):
4255       disk_index = idx + base_index
4256       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4257                                       disk["size"], names[idx*2:idx*2+2],
4258                                       "disk/%d" % disk_index,
4259                                       minors[idx*2], minors[idx*2+1])
4260       disk_dev.mode = disk["mode"]
4261       disks.append(disk_dev)
4262   elif template_name == constants.DT_FILE:
4263     if len(secondary_nodes) != 0:
4264       raise errors.ProgrammerError("Wrong template configuration")
4265
4266     for idx, disk in enumerate(disk_info):
4267       disk_index = idx + base_index
4268       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4269                               iv_name="disk/%d" % disk_index,
4270                               logical_id=(file_driver,
4271                                           "%s/disk%d" % (file_storage_dir,
4272                                                          disk_index)),
4273                               mode=disk["mode"])
4274       disks.append(disk_dev)
4275   else:
4276     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4277   return disks
4278
4279
4280 def _GetInstanceInfoText(instance):
4281   """Compute that text that should be added to the disk's metadata.
4282
4283   """
4284   return "originstname+%s" % instance.name
4285
4286
4287 def _CreateDisks(lu, instance):
4288   """Create all disks for an instance.
4289
4290   This abstracts away some work from AddInstance.
4291
4292   @type lu: L{LogicalUnit}
4293   @param lu: the logical unit on whose behalf we execute
4294   @type instance: L{objects.Instance}
4295   @param instance: the instance whose disks we should create
4296   @rtype: boolean
4297   @return: the success of the creation
4298
4299   """
4300   info = _GetInstanceInfoText(instance)
4301   pnode = instance.primary_node
4302
4303   if instance.disk_template == constants.DT_FILE:
4304     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4305     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4306
4307     if result.failed or not result.data:
4308       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4309
4310     if not result.data[0]:
4311       raise errors.OpExecError("Failed to create directory '%s'" %
4312                                file_storage_dir)
4313
4314   # Note: this needs to be kept in sync with adding of disks in
4315   # LUSetInstanceParams
4316   for device in instance.disks:
4317     logging.info("Creating volume %s for instance %s",
4318                  device.iv_name, instance.name)
4319     #HARDCODE
4320     for node in instance.all_nodes:
4321       f_create = node == pnode
4322       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4323
4324
4325 def _RemoveDisks(lu, instance):
4326   """Remove all disks for an instance.
4327
4328   This abstracts away some work from `AddInstance()` and
4329   `RemoveInstance()`. Note that in case some of the devices couldn't
4330   be removed, the removal will continue with the other ones (compare
4331   with `_CreateDisks()`).
4332
4333   @type lu: L{LogicalUnit}
4334   @param lu: the logical unit on whose behalf we execute
4335   @type instance: L{objects.Instance}
4336   @param instance: the instance whose disks we should remove
4337   @rtype: boolean
4338   @return: the success of the removal
4339
4340   """
4341   logging.info("Removing block devices for instance %s", instance.name)
4342
4343   all_result = True
4344   for device in instance.disks:
4345     for node, disk in device.ComputeNodeTree(instance.primary_node):
4346       lu.cfg.SetDiskID(disk, node)
4347       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4348       if msg:
4349         lu.LogWarning("Could not remove block device %s on node %s,"
4350                       " continuing anyway: %s", device.iv_name, node, msg)
4351         all_result = False
4352
4353   if instance.disk_template == constants.DT_FILE:
4354     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4355     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4356                                                  file_storage_dir)
4357     if result.failed or not result.data:
4358       logging.error("Could not remove directory '%s'", file_storage_dir)
4359       all_result = False
4360
4361   return all_result
4362
4363
4364 def _ComputeDiskSize(disk_template, disks):
4365   """Compute disk size requirements in the volume group
4366
4367   """
4368   # Required free disk space as a function of disk and swap space
4369   req_size_dict = {
4370     constants.DT_DISKLESS: None,
4371     constants.DT_PLAIN: sum(d["size"] for d in disks),
4372     # 128 MB are added for drbd metadata for each disk
4373     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4374     constants.DT_FILE: None,
4375   }
4376
4377   if disk_template not in req_size_dict:
4378     raise errors.ProgrammerError("Disk template '%s' size requirement"
4379                                  " is unknown" %  disk_template)
4380
4381   return req_size_dict[disk_template]
4382
4383
4384 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4385   """Hypervisor parameter validation.
4386
4387   This function abstract the hypervisor parameter validation to be
4388   used in both instance create and instance modify.
4389
4390   @type lu: L{LogicalUnit}
4391   @param lu: the logical unit for which we check
4392   @type nodenames: list
4393   @param nodenames: the list of nodes on which we should check
4394   @type hvname: string
4395   @param hvname: the name of the hypervisor we should use
4396   @type hvparams: dict
4397   @param hvparams: the parameters which we need to check
4398   @raise errors.OpPrereqError: if the parameters are not valid
4399
4400   """
4401   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4402                                                   hvname,
4403                                                   hvparams)
4404   for node in nodenames:
4405     info = hvinfo[node]
4406     if info.offline:
4407       continue
4408     msg = info.RemoteFailMsg()
4409     if msg:
4410       raise errors.OpPrereqError("Hypervisor parameter validation"
4411                                  " failed on node %s: %s" % (node, msg))
4412
4413
4414 class LUCreateInstance(LogicalUnit):
4415   """Create an instance.
4416
4417   """
4418   HPATH = "instance-add"
4419   HTYPE = constants.HTYPE_INSTANCE
4420   _OP_REQP = ["instance_name", "disks", "disk_template",
4421               "mode", "start",
4422               "wait_for_sync", "ip_check", "nics",
4423               "hvparams", "beparams"]
4424   REQ_BGL = False
4425
4426   def _ExpandNode(self, node):
4427     """Expands and checks one node name.
4428
4429     """
4430     node_full = self.cfg.ExpandNodeName(node)
4431     if node_full is None:
4432       raise errors.OpPrereqError("Unknown node %s" % node)
4433     return node_full
4434
4435   def ExpandNames(self):
4436     """ExpandNames for CreateInstance.
4437
4438     Figure out the right locks for instance creation.
4439
4440     """
4441     self.needed_locks = {}
4442
4443     # set optional parameters to none if they don't exist
4444     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4445       if not hasattr(self.op, attr):
4446         setattr(self.op, attr, None)
4447
4448     # cheap checks, mostly valid constants given
4449
4450     # verify creation mode
4451     if self.op.mode not in (constants.INSTANCE_CREATE,
4452                             constants.INSTANCE_IMPORT):
4453       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4454                                  self.op.mode)
4455
4456     # disk template and mirror node verification
4457     if self.op.disk_template not in constants.DISK_TEMPLATES:
4458       raise errors.OpPrereqError("Invalid disk template name")
4459
4460     if self.op.hypervisor is None:
4461       self.op.hypervisor = self.cfg.GetHypervisorType()
4462
4463     cluster = self.cfg.GetClusterInfo()
4464     enabled_hvs = cluster.enabled_hypervisors
4465     if self.op.hypervisor not in enabled_hvs:
4466       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4467                                  " cluster (%s)" % (self.op.hypervisor,
4468                                   ",".join(enabled_hvs)))
4469
4470     # check hypervisor parameter syntax (locally)
4471     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4472     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4473                                   self.op.hvparams)
4474     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4475     hv_type.CheckParameterSyntax(filled_hvp)
4476
4477     # fill and remember the beparams dict
4478     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4479     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4480                                     self.op.beparams)
4481
4482     #### instance parameters check
4483
4484     # instance name verification
4485     hostname1 = utils.HostInfo(self.op.instance_name)
4486     self.op.instance_name = instance_name = hostname1.name
4487
4488     # this is just a preventive check, but someone might still add this
4489     # instance in the meantime, and creation will fail at lock-add time
4490     if instance_name in self.cfg.GetInstanceList():
4491       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4492                                  instance_name)
4493
4494     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4495
4496     # NIC buildup
4497     self.nics = []
4498     for idx, nic in enumerate(self.op.nics):
4499       nic_mode_req = nic.get("mode", None)
4500       nic_mode = nic_mode_req
4501       if nic_mode is None:
4502         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4503
4504       # in routed mode, for the first nic, the default ip is 'auto'
4505       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4506         default_ip_mode = constants.VALUE_AUTO
4507       else:
4508         default_ip_mode = constants.VALUE_NONE
4509
4510       # ip validity checks
4511       ip = nic.get("ip", default_ip_mode)
4512       if ip is None or ip.lower() == constants.VALUE_NONE:
4513         nic_ip = None
4514       elif ip.lower() == constants.VALUE_AUTO:
4515         nic_ip = hostname1.ip
4516       else:
4517         if not utils.IsValidIP(ip):
4518           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4519                                      " like a valid IP" % ip)
4520         nic_ip = ip
4521
4522       # TODO: check the ip for uniqueness !!
4523       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4524         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4525
4526       # MAC address verification
4527       mac = nic.get("mac", constants.VALUE_AUTO)
4528       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4529         if not utils.IsValidMac(mac.lower()):
4530           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4531                                      mac)
4532       # bridge verification
4533       bridge = nic.get("bridge", None)
4534       link = nic.get("link", None)
4535       if bridge and link:
4536         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4537       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4538         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4539       elif bridge:
4540         link = bridge
4541
4542       nicparams = {}
4543       if nic_mode_req:
4544         nicparams[constants.NIC_MODE] = nic_mode_req
4545       if link:
4546         nicparams[constants.NIC_LINK] = link
4547
4548       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4549                                       nicparams)
4550       objects.NIC.CheckParameterSyntax(check_params)
4551       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4552
4553     # disk checks/pre-build
4554     self.disks = []
4555     for disk in self.op.disks:
4556       mode = disk.get("mode", constants.DISK_RDWR)
4557       if mode not in constants.DISK_ACCESS_SET:
4558         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4559                                    mode)
4560       size = disk.get("size", None)
4561       if size is None:
4562         raise errors.OpPrereqError("Missing disk size")
4563       try:
4564         size = int(size)
4565       except ValueError:
4566         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4567       self.disks.append({"size": size, "mode": mode})
4568
4569     # used in CheckPrereq for ip ping check
4570     self.check_ip = hostname1.ip
4571
4572     # file storage checks
4573     if (self.op.file_driver and
4574         not self.op.file_driver in constants.FILE_DRIVER):
4575       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4576                                  self.op.file_driver)
4577
4578     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4579       raise errors.OpPrereqError("File storage directory path not absolute")
4580
4581     ### Node/iallocator related checks
4582     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4583       raise errors.OpPrereqError("One and only one of iallocator and primary"
4584                                  " node must be given")
4585
4586     if self.op.iallocator:
4587       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4588     else:
4589       self.op.pnode = self._ExpandNode(self.op.pnode)
4590       nodelist = [self.op.pnode]
4591       if self.op.snode is not None:
4592         self.op.snode = self._ExpandNode(self.op.snode)
4593         nodelist.append(self.op.snode)
4594       self.needed_locks[locking.LEVEL_NODE] = nodelist
4595
4596     # in case of import lock the source node too
4597     if self.op.mode == constants.INSTANCE_IMPORT:
4598       src_node = getattr(self.op, "src_node", None)
4599       src_path = getattr(self.op, "src_path", None)
4600
4601       if src_path is None:
4602         self.op.src_path = src_path = self.op.instance_name
4603
4604       if src_node is None:
4605         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4606         self.op.src_node = None
4607         if os.path.isabs(src_path):
4608           raise errors.OpPrereqError("Importing an instance from an absolute"
4609                                      " path requires a source node option.")
4610       else:
4611         self.op.src_node = src_node = self._ExpandNode(src_node)
4612         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4613           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4614         if not os.path.isabs(src_path):
4615           self.op.src_path = src_path = \
4616             os.path.join(constants.EXPORT_DIR, src_path)
4617
4618     else: # INSTANCE_CREATE
4619       if getattr(self.op, "os_type", None) is None:
4620         raise errors.OpPrereqError("No guest OS specified")
4621
4622   def _RunAllocator(self):
4623     """Run the allocator based on input opcode.
4624
4625     """
4626     nics = [n.ToDict() for n in self.nics]
4627     ial = IAllocator(self,
4628                      mode=constants.IALLOCATOR_MODE_ALLOC,
4629                      name=self.op.instance_name,
4630                      disk_template=self.op.disk_template,
4631                      tags=[],
4632                      os=self.op.os_type,
4633                      vcpus=self.be_full[constants.BE_VCPUS],
4634                      mem_size=self.be_full[constants.BE_MEMORY],
4635                      disks=self.disks,
4636                      nics=nics,
4637                      hypervisor=self.op.hypervisor,
4638                      )
4639
4640     ial.Run(self.op.iallocator)
4641
4642     if not ial.success:
4643       raise errors.OpPrereqError("Can't compute nodes using"
4644                                  " iallocator '%s': %s" % (self.op.iallocator,
4645                                                            ial.info))
4646     if len(ial.nodes) != ial.required_nodes:
4647       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4648                                  " of nodes (%s), required %s" %
4649                                  (self.op.iallocator, len(ial.nodes),
4650                                   ial.required_nodes))
4651     self.op.pnode = ial.nodes[0]
4652     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4653                  self.op.instance_name, self.op.iallocator,
4654                  ", ".join(ial.nodes))
4655     if ial.required_nodes == 2:
4656       self.op.snode = ial.nodes[1]
4657
4658   def BuildHooksEnv(self):
4659     """Build hooks env.
4660
4661     This runs on master, primary and secondary nodes of the instance.
4662
4663     """
4664     env = {
4665       "ADD_MODE": self.op.mode,
4666       }
4667     if self.op.mode == constants.INSTANCE_IMPORT:
4668       env["SRC_NODE"] = self.op.src_node
4669       env["SRC_PATH"] = self.op.src_path
4670       env["SRC_IMAGES"] = self.src_images
4671
4672     env.update(_BuildInstanceHookEnv(
4673       name=self.op.instance_name,
4674       primary_node=self.op.pnode,
4675       secondary_nodes=self.secondaries,
4676       status=self.op.start,
4677       os_type=self.op.os_type,
4678       memory=self.be_full[constants.BE_MEMORY],
4679       vcpus=self.be_full[constants.BE_VCPUS],
4680       nics=_PreBuildNICHooksList(self, self.nics),
4681       disk_template=self.op.disk_template,
4682       disks=[(d["size"], d["mode"]) for d in self.disks],
4683     ))
4684
4685     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4686           self.secondaries)
4687     return env, nl, nl
4688
4689
4690   def CheckPrereq(self):
4691     """Check prerequisites.
4692
4693     """
4694     if (not self.cfg.GetVGName() and
4695         self.op.disk_template not in constants.DTS_NOT_LVM):
4696       raise errors.OpPrereqError("Cluster does not support lvm-based"
4697                                  " instances")
4698
4699     if self.op.mode == constants.INSTANCE_IMPORT:
4700       src_node = self.op.src_node
4701       src_path = self.op.src_path
4702
4703       if src_node is None:
4704         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4705         exp_list = self.rpc.call_export_list(locked_nodes)
4706         found = False
4707         for node in exp_list:
4708           if exp_list[node].RemoteFailMsg():
4709             continue
4710           if src_path in exp_list[node].payload:
4711             found = True
4712             self.op.src_node = src_node = node
4713             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4714                                                        src_path)
4715             break
4716         if not found:
4717           raise errors.OpPrereqError("No export found for relative path %s" %
4718                                       src_path)
4719
4720       _CheckNodeOnline(self, src_node)
4721       result = self.rpc.call_export_info(src_node, src_path)
4722       msg = result.RemoteFailMsg()
4723       if msg:
4724         raise errors.OpPrereqError("No export or invalid export found in"
4725                                    " dir %s: %s" % (src_path, msg))
4726
4727       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4728       if not export_info.has_section(constants.INISECT_EXP):
4729         raise errors.ProgrammerError("Corrupted export config")
4730
4731       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4732       if (int(ei_version) != constants.EXPORT_VERSION):
4733         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4734                                    (ei_version, constants.EXPORT_VERSION))
4735
4736       # Check that the new instance doesn't have less disks than the export
4737       instance_disks = len(self.disks)
4738       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4739       if instance_disks < export_disks:
4740         raise errors.OpPrereqError("Not enough disks to import."
4741                                    " (instance: %d, export: %d)" %
4742                                    (instance_disks, export_disks))
4743
4744       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4745       disk_images = []
4746       for idx in range(export_disks):
4747         option = 'disk%d_dump' % idx
4748         if export_info.has_option(constants.INISECT_INS, option):
4749           # FIXME: are the old os-es, disk sizes, etc. useful?
4750           export_name = export_info.get(constants.INISECT_INS, option)
4751           image = os.path.join(src_path, export_name)
4752           disk_images.append(image)
4753         else:
4754           disk_images.append(False)
4755
4756       self.src_images = disk_images
4757
4758       old_name = export_info.get(constants.INISECT_INS, 'name')
4759       # FIXME: int() here could throw a ValueError on broken exports
4760       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4761       if self.op.instance_name == old_name:
4762         for idx, nic in enumerate(self.nics):
4763           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4764             nic_mac_ini = 'nic%d_mac' % idx
4765             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4766
4767     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4768     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4769     if self.op.start and not self.op.ip_check:
4770       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4771                                  " adding an instance in start mode")
4772
4773     if self.op.ip_check:
4774       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4775         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4776                                    (self.check_ip, self.op.instance_name))
4777
4778     #### mac address generation
4779     # By generating here the mac address both the allocator and the hooks get
4780     # the real final mac address rather than the 'auto' or 'generate' value.
4781     # There is a race condition between the generation and the instance object
4782     # creation, which means that we know the mac is valid now, but we're not
4783     # sure it will be when we actually add the instance. If things go bad
4784     # adding the instance will abort because of a duplicate mac, and the
4785     # creation job will fail.
4786     for nic in self.nics:
4787       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4788         nic.mac = self.cfg.GenerateMAC()
4789
4790     #### allocator run
4791
4792     if self.op.iallocator is not None:
4793       self._RunAllocator()
4794
4795     #### node related checks
4796
4797     # check primary node
4798     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4799     assert self.pnode is not None, \
4800       "Cannot retrieve locked node %s" % self.op.pnode
4801     if pnode.offline:
4802       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4803                                  pnode.name)
4804     if pnode.drained:
4805       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4806                                  pnode.name)
4807
4808     self.secondaries = []
4809
4810     # mirror node verification
4811     if self.op.disk_template in constants.DTS_NET_MIRROR:
4812       if self.op.snode is None:
4813         raise errors.OpPrereqError("The networked disk templates need"
4814                                    " a mirror node")
4815       if self.op.snode == pnode.name:
4816         raise errors.OpPrereqError("The secondary node cannot be"
4817                                    " the primary node.")
4818       _CheckNodeOnline(self, self.op.snode)
4819       _CheckNodeNotDrained(self, self.op.snode)
4820       self.secondaries.append(self.op.snode)
4821
4822     nodenames = [pnode.name] + self.secondaries
4823
4824     req_size = _ComputeDiskSize(self.op.disk_template,
4825                                 self.disks)
4826
4827     # Check lv size requirements
4828     if req_size is not None:
4829       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4830                                          self.op.hypervisor)
4831       for node in nodenames:
4832         info = nodeinfo[node]
4833         msg = info.RemoteFailMsg()
4834         if msg:
4835           raise errors.OpPrereqError("Cannot get current information"
4836                                      " from node %s: %s" % (node, msg))
4837         info = info.payload
4838         vg_free = info.get('vg_free', None)
4839         if not isinstance(vg_free, int):
4840           raise errors.OpPrereqError("Can't compute free disk space on"
4841                                      " node %s" % node)
4842         if req_size > vg_free:
4843           raise errors.OpPrereqError("Not enough disk space on target node %s."
4844                                      " %d MB available, %d MB required" %
4845                                      (node, vg_free, req_size))
4846
4847     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4848
4849     # os verification
4850     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4851     result.Raise()
4852     if not isinstance(result.data, objects.OS):
4853       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4854                                  " primary node"  % self.op.os_type)
4855
4856     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4857
4858     # memory check on primary node
4859     if self.op.start:
4860       _CheckNodeFreeMemory(self, self.pnode.name,
4861                            "creating instance %s" % self.op.instance_name,
4862                            self.be_full[constants.BE_MEMORY],
4863                            self.op.hypervisor)
4864
4865   def Exec(self, feedback_fn):
4866     """Create and add the instance to the cluster.
4867
4868     """
4869     instance = self.op.instance_name
4870     pnode_name = self.pnode.name
4871
4872     ht_kind = self.op.hypervisor
4873     if ht_kind in constants.HTS_REQ_PORT:
4874       network_port = self.cfg.AllocatePort()
4875     else:
4876       network_port = None
4877
4878     ##if self.op.vnc_bind_address is None:
4879     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4880
4881     # this is needed because os.path.join does not accept None arguments
4882     if self.op.file_storage_dir is None:
4883       string_file_storage_dir = ""
4884     else:
4885       string_file_storage_dir = self.op.file_storage_dir
4886
4887     # build the full file storage dir path
4888     file_storage_dir = os.path.normpath(os.path.join(
4889                                         self.cfg.GetFileStorageDir(),
4890                                         string_file_storage_dir, instance))
4891
4892
4893     disks = _GenerateDiskTemplate(self,
4894                                   self.op.disk_template,
4895                                   instance, pnode_name,
4896                                   self.secondaries,
4897                                   self.disks,
4898                                   file_storage_dir,
4899                                   self.op.file_driver,
4900                                   0)
4901
4902     iobj = objects.Instance(name=instance, os=self.op.os_type,
4903                             primary_node=pnode_name,
4904                             nics=self.nics, disks=disks,
4905                             disk_template=self.op.disk_template,
4906                             admin_up=False,
4907                             network_port=network_port,
4908                             beparams=self.op.beparams,
4909                             hvparams=self.op.hvparams,
4910                             hypervisor=self.op.hypervisor,
4911                             )
4912
4913     feedback_fn("* creating instance disks...")
4914     try:
4915       _CreateDisks(self, iobj)
4916     except errors.OpExecError:
4917       self.LogWarning("Device creation failed, reverting...")
4918       try:
4919         _RemoveDisks(self, iobj)
4920       finally:
4921         self.cfg.ReleaseDRBDMinors(instance)
4922         raise
4923
4924     feedback_fn("adding instance %s to cluster config" % instance)
4925
4926     self.cfg.AddInstance(iobj)
4927     # Declare that we don't want to remove the instance lock anymore, as we've
4928     # added the instance to the config
4929     del self.remove_locks[locking.LEVEL_INSTANCE]
4930     # Unlock all the nodes
4931     if self.op.mode == constants.INSTANCE_IMPORT:
4932       nodes_keep = [self.op.src_node]
4933       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4934                        if node != self.op.src_node]
4935       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4936       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4937     else:
4938       self.context.glm.release(locking.LEVEL_NODE)
4939       del self.acquired_locks[locking.LEVEL_NODE]
4940
4941     if self.op.wait_for_sync:
4942       disk_abort = not _WaitForSync(self, iobj)
4943     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4944       # make sure the disks are not degraded (still sync-ing is ok)
4945       time.sleep(15)
4946       feedback_fn("* checking mirrors status")
4947       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4948     else:
4949       disk_abort = False
4950
4951     if disk_abort:
4952       _RemoveDisks(self, iobj)
4953       self.cfg.RemoveInstance(iobj.name)
4954       # Make sure the instance lock gets removed
4955       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4956       raise errors.OpExecError("There are some degraded disks for"
4957                                " this instance")
4958
4959     feedback_fn("creating os for instance %s on node %s" %
4960                 (instance, pnode_name))
4961
4962     if iobj.disk_template != constants.DT_DISKLESS:
4963       if self.op.mode == constants.INSTANCE_CREATE:
4964         feedback_fn("* running the instance OS create scripts...")
4965         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4966         msg = result.RemoteFailMsg()
4967         if msg:
4968           raise errors.OpExecError("Could not add os for instance %s"
4969                                    " on node %s: %s" %
4970                                    (instance, pnode_name, msg))
4971
4972       elif self.op.mode == constants.INSTANCE_IMPORT:
4973         feedback_fn("* running the instance OS import scripts...")
4974         src_node = self.op.src_node
4975         src_images = self.src_images
4976         cluster_name = self.cfg.GetClusterName()
4977         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4978                                                          src_node, src_images,
4979                                                          cluster_name)
4980         msg = import_result.RemoteFailMsg()
4981         if msg:
4982           self.LogWarning("Error while importing the disk images for instance"
4983                           " %s on node %s: %s" % (instance, pnode_name, msg))
4984       else:
4985         # also checked in the prereq part
4986         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4987                                      % self.op.mode)
4988
4989     if self.op.start:
4990       iobj.admin_up = True
4991       self.cfg.Update(iobj)
4992       logging.info("Starting instance %s on node %s", instance, pnode_name)
4993       feedback_fn("* starting instance...")
4994       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4995       msg = result.RemoteFailMsg()
4996       if msg:
4997         raise errors.OpExecError("Could not start instance: %s" % msg)
4998
4999
5000 class LUConnectConsole(NoHooksLU):
5001   """Connect to an instance's console.
5002
5003   This is somewhat special in that it returns the command line that
5004   you need to run on the master node in order to connect to the
5005   console.
5006
5007   """
5008   _OP_REQP = ["instance_name"]
5009   REQ_BGL = False
5010
5011   def ExpandNames(self):
5012     self._ExpandAndLockInstance()
5013
5014   def CheckPrereq(self):
5015     """Check prerequisites.
5016
5017     This checks that the instance is in the cluster.
5018
5019     """
5020     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5021     assert self.instance is not None, \
5022       "Cannot retrieve locked instance %s" % self.op.instance_name
5023     _CheckNodeOnline(self, self.instance.primary_node)
5024
5025   def Exec(self, feedback_fn):
5026     """Connect to the console of an instance
5027
5028     """
5029     instance = self.instance
5030     node = instance.primary_node
5031
5032     node_insts = self.rpc.call_instance_list([node],
5033                                              [instance.hypervisor])[node]
5034     msg = node_insts.RemoteFailMsg()
5035     if msg:
5036       raise errors.OpExecError("Can't get node information from %s: %s" %
5037                                (node, msg))
5038
5039     if instance.name not in node_insts.payload:
5040       raise errors.OpExecError("Instance %s is not running." % instance.name)
5041
5042     logging.debug("Connecting to console of %s on %s", instance.name, node)
5043
5044     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5045     cluster = self.cfg.GetClusterInfo()
5046     # beparams and hvparams are passed separately, to avoid editing the
5047     # instance and then saving the defaults in the instance itself.
5048     hvparams = cluster.FillHV(instance)
5049     beparams = cluster.FillBE(instance)
5050     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5051
5052     # build ssh cmdline
5053     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5054
5055
5056 class LUReplaceDisks(LogicalUnit):
5057   """Replace the disks of an instance.
5058
5059   """
5060   HPATH = "mirrors-replace"
5061   HTYPE = constants.HTYPE_INSTANCE
5062   _OP_REQP = ["instance_name", "mode", "disks"]
5063   REQ_BGL = False
5064
5065   def CheckArguments(self):
5066     if not hasattr(self.op, "remote_node"):
5067       self.op.remote_node = None
5068     if not hasattr(self.op, "iallocator"):
5069       self.op.iallocator = None
5070
5071     # check for valid parameter combination
5072     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5073     if self.op.mode == constants.REPLACE_DISK_CHG:
5074       if cnt == 2:
5075         raise errors.OpPrereqError("When changing the secondary either an"
5076                                    " iallocator script must be used or the"
5077                                    " new node given")
5078       elif cnt == 0:
5079         raise errors.OpPrereqError("Give either the iallocator or the new"
5080                                    " secondary, not both")
5081     else: # not replacing the secondary
5082       if cnt != 2:
5083         raise errors.OpPrereqError("The iallocator and new node options can"
5084                                    " be used only when changing the"
5085                                    " secondary node")
5086
5087   def ExpandNames(self):
5088     self._ExpandAndLockInstance()
5089
5090     if self.op.iallocator is not None:
5091       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5092     elif self.op.remote_node is not None:
5093       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5094       if remote_node is None:
5095         raise errors.OpPrereqError("Node '%s' not known" %
5096                                    self.op.remote_node)
5097       self.op.remote_node = remote_node
5098       # Warning: do not remove the locking of the new secondary here
5099       # unless DRBD8.AddChildren is changed to work in parallel;
5100       # currently it doesn't since parallel invocations of
5101       # FindUnusedMinor will conflict
5102       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5103       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5104     else:
5105       self.needed_locks[locking.LEVEL_NODE] = []
5106       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5107
5108   def DeclareLocks(self, level):
5109     # If we're not already locking all nodes in the set we have to declare the
5110     # instance's primary/secondary nodes.
5111     if (level == locking.LEVEL_NODE and
5112         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5113       self._LockInstancesNodes()
5114
5115   def _RunAllocator(self):
5116     """Compute a new secondary node using an IAllocator.
5117
5118     """
5119     ial = IAllocator(self,
5120                      mode=constants.IALLOCATOR_MODE_RELOC,
5121                      name=self.op.instance_name,
5122                      relocate_from=[self.sec_node])
5123
5124     ial.Run(self.op.iallocator)
5125
5126     if not ial.success:
5127       raise errors.OpPrereqError("Can't compute nodes using"
5128                                  " iallocator '%s': %s" % (self.op.iallocator,
5129                                                            ial.info))
5130     if len(ial.nodes) != ial.required_nodes:
5131       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5132                                  " of nodes (%s), required %s" %
5133                                  (len(ial.nodes), ial.required_nodes))
5134     self.op.remote_node = ial.nodes[0]
5135     self.LogInfo("Selected new secondary for the instance: %s",
5136                  self.op.remote_node)
5137
5138   def BuildHooksEnv(self):
5139     """Build hooks env.
5140
5141     This runs on the master, the primary and all the secondaries.
5142
5143     """
5144     env = {
5145       "MODE": self.op.mode,
5146       "NEW_SECONDARY": self.op.remote_node,
5147       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5148       }
5149     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5150     nl = [
5151       self.cfg.GetMasterNode(),
5152       self.instance.primary_node,
5153       ]
5154     if self.op.remote_node is not None:
5155       nl.append(self.op.remote_node)
5156     return env, nl, nl
5157
5158   def CheckPrereq(self):
5159     """Check prerequisites.
5160
5161     This checks that the instance is in the cluster.
5162
5163     """
5164     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5165     assert instance is not None, \
5166       "Cannot retrieve locked instance %s" % self.op.instance_name
5167     self.instance = instance
5168
5169     if instance.disk_template != constants.DT_DRBD8:
5170       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5171                                  " instances")
5172
5173     if len(instance.secondary_nodes) != 1:
5174       raise errors.OpPrereqError("The instance has a strange layout,"
5175                                  " expected one secondary but found %d" %
5176                                  len(instance.secondary_nodes))
5177
5178     self.sec_node = instance.secondary_nodes[0]
5179
5180     if self.op.iallocator is not None:
5181       self._RunAllocator()
5182
5183     remote_node = self.op.remote_node
5184     if remote_node is not None:
5185       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5186       assert self.remote_node_info is not None, \
5187         "Cannot retrieve locked node %s" % remote_node
5188     else:
5189       self.remote_node_info = None
5190     if remote_node == instance.primary_node:
5191       raise errors.OpPrereqError("The specified node is the primary node of"
5192                                  " the instance.")
5193     elif remote_node == self.sec_node:
5194       raise errors.OpPrereqError("The specified node is already the"
5195                                  " secondary node of the instance.")
5196
5197     if self.op.mode == constants.REPLACE_DISK_PRI:
5198       n1 = self.tgt_node = instance.primary_node
5199       n2 = self.oth_node = self.sec_node
5200     elif self.op.mode == constants.REPLACE_DISK_SEC:
5201       n1 = self.tgt_node = self.sec_node
5202       n2 = self.oth_node = instance.primary_node
5203     elif self.op.mode == constants.REPLACE_DISK_CHG:
5204       n1 = self.new_node = remote_node
5205       n2 = self.oth_node = instance.primary_node
5206       self.tgt_node = self.sec_node
5207       _CheckNodeNotDrained(self, remote_node)
5208     else:
5209       raise errors.ProgrammerError("Unhandled disk replace mode")
5210
5211     _CheckNodeOnline(self, n1)
5212     _CheckNodeOnline(self, n2)
5213
5214     if not self.op.disks:
5215       self.op.disks = range(len(instance.disks))
5216
5217     for disk_idx in self.op.disks:
5218       instance.FindDisk(disk_idx)
5219
5220   def _ExecD8DiskOnly(self, feedback_fn):
5221     """Replace a disk on the primary or secondary for dbrd8.
5222
5223     The algorithm for replace is quite complicated:
5224
5225       1. for each disk to be replaced:
5226
5227         1. create new LVs on the target node with unique names
5228         1. detach old LVs from the drbd device
5229         1. rename old LVs to name_replaced.<time_t>
5230         1. rename new LVs to old LVs
5231         1. attach the new LVs (with the old names now) to the drbd device
5232
5233       1. wait for sync across all devices
5234
5235       1. for each modified disk:
5236
5237         1. remove old LVs (which have the name name_replaces.<time_t>)
5238
5239     Failures are not very well handled.
5240
5241     """
5242     steps_total = 6
5243     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5244     instance = self.instance
5245     iv_names = {}
5246     vgname = self.cfg.GetVGName()
5247     # start of work
5248     cfg = self.cfg
5249     tgt_node = self.tgt_node
5250     oth_node = self.oth_node
5251
5252     # Step: check device activation
5253     self.proc.LogStep(1, steps_total, "check device existence")
5254     info("checking volume groups")
5255     my_vg = cfg.GetVGName()
5256     results = self.rpc.call_vg_list([oth_node, tgt_node])
5257     if not results:
5258       raise errors.OpExecError("Can't list volume groups on the nodes")
5259     for node in oth_node, tgt_node:
5260       res = results[node]
5261       msg = res.RemoteFailMsg()
5262       if msg:
5263         raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5264       if my_vg not in res.payload:
5265         raise errors.OpExecError("Volume group '%s' not found on %s" %
5266                                  (my_vg, node))
5267     for idx, dev in enumerate(instance.disks):
5268       if idx not in self.op.disks:
5269         continue
5270       for node in tgt_node, oth_node:
5271         info("checking disk/%d on %s" % (idx, node))
5272         cfg.SetDiskID(dev, node)
5273         result = self.rpc.call_blockdev_find(node, dev)
5274         msg = result.RemoteFailMsg()
5275         if not msg and not result.payload:
5276           msg = "disk not found"
5277         if msg:
5278           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5279                                    (idx, node, msg))
5280
5281     # Step: check other node consistency
5282     self.proc.LogStep(2, steps_total, "check peer consistency")
5283     for idx, dev in enumerate(instance.disks):
5284       if idx not in self.op.disks:
5285         continue
5286       info("checking disk/%d consistency on %s" % (idx, oth_node))
5287       if not _CheckDiskConsistency(self, dev, oth_node,
5288                                    oth_node==instance.primary_node):
5289         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5290                                  " to replace disks on this node (%s)" %
5291                                  (oth_node, tgt_node))
5292
5293     # Step: create new storage
5294     self.proc.LogStep(3, steps_total, "allocate new storage")
5295     for idx, dev in enumerate(instance.disks):
5296       if idx not in self.op.disks:
5297         continue
5298       size = dev.size
5299       cfg.SetDiskID(dev, tgt_node)
5300       lv_names = [".disk%d_%s" % (idx, suf)
5301                   for suf in ["data", "meta"]]
5302       names = _GenerateUniqueNames(self, lv_names)
5303       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5304                              logical_id=(vgname, names[0]))
5305       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5306                              logical_id=(vgname, names[1]))
5307       new_lvs = [lv_data, lv_meta]
5308       old_lvs = dev.children
5309       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5310       info("creating new local storage on %s for %s" %
5311            (tgt_node, dev.iv_name))
5312       # we pass force_create=True to force the LVM creation
5313       for new_lv in new_lvs:
5314         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5315                         _GetInstanceInfoText(instance), False)
5316
5317     # Step: for each lv, detach+rename*2+attach
5318     self.proc.LogStep(4, steps_total, "change drbd configuration")
5319     for dev, old_lvs, new_lvs in iv_names.itervalues():
5320       info("detaching %s drbd from local storage" % dev.iv_name)
5321       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5322       msg = result.RemoteFailMsg()
5323       if msg:
5324         raise errors.OpExecError("Can't detach drbd from local storage on node"
5325                                  " %s for device %s: %s" %
5326                                  (tgt_node, dev.iv_name, msg))
5327       #dev.children = []
5328       #cfg.Update(instance)
5329
5330       # ok, we created the new LVs, so now we know we have the needed
5331       # storage; as such, we proceed on the target node to rename
5332       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5333       # using the assumption that logical_id == physical_id (which in
5334       # turn is the unique_id on that node)
5335
5336       # FIXME(iustin): use a better name for the replaced LVs
5337       temp_suffix = int(time.time())
5338       ren_fn = lambda d, suff: (d.physical_id[0],
5339                                 d.physical_id[1] + "_replaced-%s" % suff)
5340       # build the rename list based on what LVs exist on the node
5341       rlist = []
5342       for to_ren in old_lvs:
5343         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5344         if not result.RemoteFailMsg() and result.payload:
5345           # device exists
5346           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5347
5348       info("renaming the old LVs on the target node")
5349       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5350       msg = result.RemoteFailMsg()
5351       if msg:
5352         raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5353                                  (tgt_node, msg))
5354       # now we rename the new LVs to the old LVs
5355       info("renaming the new LVs on the target node")
5356       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5357       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5358       msg = result.RemoteFailMsg()
5359       if msg:
5360         raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5361                                  (tgt_node, msg))
5362
5363       for old, new in zip(old_lvs, new_lvs):
5364         new.logical_id = old.logical_id
5365         cfg.SetDiskID(new, tgt_node)
5366
5367       for disk in old_lvs:
5368         disk.logical_id = ren_fn(disk, temp_suffix)
5369         cfg.SetDiskID(disk, tgt_node)
5370
5371       # now that the new lvs have the old name, we can add them to the device
5372       info("adding new mirror component on %s" % tgt_node)
5373       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5374       msg = result.RemoteFailMsg()
5375       if msg:
5376         for new_lv in new_lvs:
5377           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5378           if msg:
5379             warning("Can't rollback device %s: %s", dev, msg,
5380                     hint="cleanup manually the unused logical volumes")
5381         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5382
5383       dev.children = new_lvs
5384       cfg.Update(instance)
5385
5386     # Step: wait for sync
5387
5388     # this can fail as the old devices are degraded and _WaitForSync
5389     # does a combined result over all disks, so we don't check its
5390     # return value
5391     self.proc.LogStep(5, steps_total, "sync devices")
5392     _WaitForSync(self, instance, unlock=True)
5393
5394     # so check manually all the devices
5395     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5396       cfg.SetDiskID(dev, instance.primary_node)
5397       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5398       msg = result.RemoteFailMsg()
5399       if not msg and not result.payload:
5400         msg = "disk not found"
5401       if msg:
5402         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5403                                  (name, msg))
5404       if result.payload[5]:
5405         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5406
5407     # Step: remove old storage
5408     self.proc.LogStep(6, steps_total, "removing old storage")
5409     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5410       info("remove logical volumes for %s" % name)
5411       for lv in old_lvs:
5412         cfg.SetDiskID(lv, tgt_node)
5413         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5414         if msg:
5415           warning("Can't remove old LV: %s" % msg,
5416                   hint="manually remove unused LVs")
5417           continue
5418
5419   def _ExecD8Secondary(self, feedback_fn):
5420     """Replace the secondary node for drbd8.
5421
5422     The algorithm for replace is quite complicated:
5423       - for all disks of the instance:
5424         - create new LVs on the new node with same names
5425         - shutdown the drbd device on the old secondary
5426         - disconnect the drbd network on the primary
5427         - create the drbd device on the new secondary
5428         - network attach the drbd on the primary, using an artifice:
5429           the drbd code for Attach() will connect to the network if it
5430           finds a device which is connected to the good local disks but
5431           not network enabled
5432       - wait for sync across all devices
5433       - remove all disks from the old secondary
5434
5435     Failures are not very well handled.
5436
5437     """
5438     steps_total = 6
5439     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5440     instance = self.instance
5441     iv_names = {}
5442     # start of work
5443     cfg = self.cfg
5444     old_node = self.tgt_node
5445     new_node = self.new_node
5446     pri_node = instance.primary_node
5447     nodes_ip = {
5448       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5449       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5450       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5451       }
5452
5453     # Step: check device activation
5454     self.proc.LogStep(1, steps_total, "check device existence")
5455     info("checking volume groups")
5456     my_vg = cfg.GetVGName()
5457     results = self.rpc.call_vg_list([pri_node, new_node])
5458     for node in pri_node, new_node:
5459       res = results[node]
5460       msg = res.RemoteFailMsg()
5461       if msg:
5462         raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5463       if my_vg not in res.payload:
5464         raise errors.OpExecError("Volume group '%s' not found on %s" %
5465                                  (my_vg, node))
5466     for idx, dev in enumerate(instance.disks):
5467       if idx not in self.op.disks:
5468         continue
5469       info("checking disk/%d on %s" % (idx, pri_node))
5470       cfg.SetDiskID(dev, pri_node)
5471       result = self.rpc.call_blockdev_find(pri_node, dev)
5472       msg = result.RemoteFailMsg()
5473       if not msg and not result.payload:
5474         msg = "disk not found"
5475       if msg:
5476         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5477                                  (idx, pri_node, msg))
5478
5479     # Step: check other node consistency
5480     self.proc.LogStep(2, steps_total, "check peer consistency")
5481     for idx, dev in enumerate(instance.disks):
5482       if idx not in self.op.disks:
5483         continue
5484       info("checking disk/%d consistency on %s" % (idx, pri_node))
5485       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5486         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5487                                  " unsafe to replace the secondary" %
5488                                  pri_node)
5489
5490     # Step: create new storage
5491     self.proc.LogStep(3, steps_total, "allocate new storage")
5492     for idx, dev in enumerate(instance.disks):
5493       info("adding new local storage on %s for disk/%d" %
5494            (new_node, idx))
5495       # we pass force_create=True to force LVM creation
5496       for new_lv in dev.children:
5497         _CreateBlockDev(self, new_node, instance, new_lv, True,
5498                         _GetInstanceInfoText(instance), False)
5499
5500     # Step 4: dbrd minors and drbd setups changes
5501     # after this, we must manually remove the drbd minors on both the
5502     # error and the success paths
5503     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5504                                    instance.name)
5505     logging.debug("Allocated minors %s" % (minors,))
5506     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5507     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5508       size = dev.size
5509       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5510       # create new devices on new_node; note that we create two IDs:
5511       # one without port, so the drbd will be activated without
5512       # networking information on the new node at this stage, and one
5513       # with network, for the latter activation in step 4
5514       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5515       if pri_node == o_node1:
5516         p_minor = o_minor1
5517       else:
5518         p_minor = o_minor2
5519
5520       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5521       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5522
5523       iv_names[idx] = (dev, dev.children, new_net_id)
5524       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5525                     new_net_id)
5526       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5527                               logical_id=new_alone_id,
5528                               children=dev.children)
5529       try:
5530         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5531                               _GetInstanceInfoText(instance), False)
5532       except errors.GenericError:
5533         self.cfg.ReleaseDRBDMinors(instance.name)
5534         raise
5535
5536     for idx, dev in enumerate(instance.disks):
5537       # we have new devices, shutdown the drbd on the old secondary
5538       info("shutting down drbd for disk/%d on old node" % idx)
5539       cfg.SetDiskID(dev, old_node)
5540       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5541       if msg:
5542         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5543                 (idx, msg),
5544                 hint="Please cleanup this device manually as soon as possible")
5545
5546     info("detaching primary drbds from the network (=> standalone)")
5547     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5548                                                instance.disks)[pri_node]
5549
5550     msg = result.RemoteFailMsg()
5551     if msg:
5552       # detaches didn't succeed (unlikely)
5553       self.cfg.ReleaseDRBDMinors(instance.name)
5554       raise errors.OpExecError("Can't detach the disks from the network on"
5555                                " old node: %s" % (msg,))
5556
5557     # if we managed to detach at least one, we update all the disks of
5558     # the instance to point to the new secondary
5559     info("updating instance configuration")
5560     for dev, _, new_logical_id in iv_names.itervalues():
5561       dev.logical_id = new_logical_id
5562       cfg.SetDiskID(dev, pri_node)
5563     cfg.Update(instance)
5564
5565     # and now perform the drbd attach
5566     info("attaching primary drbds to new secondary (standalone => connected)")
5567     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5568                                            instance.disks, instance.name,
5569                                            False)
5570     for to_node, to_result in result.items():
5571       msg = to_result.RemoteFailMsg()
5572       if msg:
5573         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5574                 hint="please do a gnt-instance info to see the"
5575                 " status of disks")
5576
5577     # this can fail as the old devices are degraded and _WaitForSync
5578     # does a combined result over all disks, so we don't check its
5579     # return value
5580     self.proc.LogStep(5, steps_total, "sync devices")
5581     _WaitForSync(self, instance, unlock=True)
5582
5583     # so check manually all the devices
5584     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5585       cfg.SetDiskID(dev, pri_node)
5586       result = self.rpc.call_blockdev_find(pri_node, dev)
5587       msg = result.RemoteFailMsg()
5588       if not msg and not result.payload:
5589         msg = "disk not found"
5590       if msg:
5591         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5592                                  (idx, msg))
5593       if result.payload[5]:
5594         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5595
5596     self.proc.LogStep(6, steps_total, "removing old storage")
5597     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5598       info("remove logical volumes for disk/%d" % idx)
5599       for lv in old_lvs:
5600         cfg.SetDiskID(lv, old_node)
5601         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5602         if msg:
5603           warning("Can't remove LV on old secondary: %s", msg,
5604                   hint="Cleanup stale volumes by hand")
5605
5606   def Exec(self, feedback_fn):
5607     """Execute disk replacement.
5608
5609     This dispatches the disk replacement to the appropriate handler.
5610
5611     """
5612     instance = self.instance
5613
5614     # Activate the instance disks if we're replacing them on a down instance
5615     if not instance.admin_up:
5616       _StartInstanceDisks(self, instance, True)
5617
5618     if self.op.mode == constants.REPLACE_DISK_CHG:
5619       fn = self._ExecD8Secondary
5620     else:
5621       fn = self._ExecD8DiskOnly
5622
5623     ret = fn(feedback_fn)
5624
5625     # Deactivate the instance disks if we're replacing them on a down instance
5626     if not instance.admin_up:
5627       _SafeShutdownInstanceDisks(self, instance)
5628
5629     return ret
5630
5631
5632 class LUGrowDisk(LogicalUnit):
5633   """Grow a disk of an instance.
5634
5635   """
5636   HPATH = "disk-grow"
5637   HTYPE = constants.HTYPE_INSTANCE
5638   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5639   REQ_BGL = False
5640
5641   def ExpandNames(self):
5642     self._ExpandAndLockInstance()
5643     self.needed_locks[locking.LEVEL_NODE] = []
5644     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5645
5646   def DeclareLocks(self, level):
5647     if level == locking.LEVEL_NODE:
5648       self._LockInstancesNodes()
5649
5650   def BuildHooksEnv(self):
5651     """Build hooks env.
5652
5653     This runs on the master, the primary and all the secondaries.
5654
5655     """
5656     env = {
5657       "DISK": self.op.disk,
5658       "AMOUNT": self.op.amount,
5659       }
5660     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5661     nl = [
5662       self.cfg.GetMasterNode(),
5663       self.instance.primary_node,
5664       ]
5665     return env, nl, nl
5666
5667   def CheckPrereq(self):
5668     """Check prerequisites.
5669
5670     This checks that the instance is in the cluster.
5671
5672     """
5673     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5674     assert instance is not None, \
5675       "Cannot retrieve locked instance %s" % self.op.instance_name
5676     nodenames = list(instance.all_nodes)
5677     for node in nodenames:
5678       _CheckNodeOnline(self, node)
5679
5680
5681     self.instance = instance
5682
5683     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5684       raise errors.OpPrereqError("Instance's disk layout does not support"
5685                                  " growing.")
5686
5687     self.disk = instance.FindDisk(self.op.disk)
5688
5689     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5690                                        instance.hypervisor)
5691     for node in nodenames:
5692       info = nodeinfo[node]
5693       msg = info.RemoteFailMsg()
5694       if msg:
5695         raise errors.OpPrereqError("Cannot get current information"
5696                                    " from node %s:" % (node, msg))
5697       vg_free = info.payload.get('vg_free', None)
5698       if not isinstance(vg_free, int):
5699         raise errors.OpPrereqError("Can't compute free disk space on"
5700                                    " node %s" % node)
5701       if self.op.amount > vg_free:
5702         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5703                                    " %d MiB available, %d MiB required" %
5704                                    (node, vg_free, self.op.amount))
5705
5706   def Exec(self, feedback_fn):
5707     """Execute disk grow.
5708
5709     """
5710     instance = self.instance
5711     disk = self.disk
5712     for node in instance.all_nodes:
5713       self.cfg.SetDiskID(disk, node)
5714       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5715       msg = result.RemoteFailMsg()
5716       if msg:
5717         raise errors.OpExecError("Grow request failed to node %s: %s" %
5718                                  (node, msg))
5719     disk.RecordGrow(self.op.amount)
5720     self.cfg.Update(instance)
5721     if self.op.wait_for_sync:
5722       disk_abort = not _WaitForSync(self, instance)
5723       if disk_abort:
5724         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5725                              " status.\nPlease check the instance.")
5726
5727
5728 class LUQueryInstanceData(NoHooksLU):
5729   """Query runtime instance data.
5730
5731   """
5732   _OP_REQP = ["instances", "static"]
5733   REQ_BGL = False
5734
5735   def ExpandNames(self):
5736     self.needed_locks = {}
5737     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5738
5739     if not isinstance(self.op.instances, list):
5740       raise errors.OpPrereqError("Invalid argument type 'instances'")
5741
5742     if self.op.instances:
5743       self.wanted_names = []
5744       for name in self.op.instances:
5745         full_name = self.cfg.ExpandInstanceName(name)
5746         if full_name is None:
5747           raise errors.OpPrereqError("Instance '%s' not known" % name)
5748         self.wanted_names.append(full_name)
5749       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5750     else:
5751       self.wanted_names = None
5752       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5753
5754     self.needed_locks[locking.LEVEL_NODE] = []
5755     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5756
5757   def DeclareLocks(self, level):
5758     if level == locking.LEVEL_NODE:
5759       self._LockInstancesNodes()
5760
5761   def CheckPrereq(self):
5762     """Check prerequisites.
5763
5764     This only checks the optional instance list against the existing names.
5765
5766     """
5767     if self.wanted_names is None:
5768       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5769
5770     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5771                              in self.wanted_names]
5772     return
5773
5774   def _ComputeDiskStatus(self, instance, snode, dev):
5775     """Compute block device status.
5776
5777     """
5778     static = self.op.static
5779     if not static:
5780       self.cfg.SetDiskID(dev, instance.primary_node)
5781       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5782       if dev_pstatus.offline:
5783         dev_pstatus = None
5784       else:
5785         msg = dev_pstatus.RemoteFailMsg()
5786         if msg:
5787           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5788                                    (instance.name, msg))
5789         dev_pstatus = dev_pstatus.payload
5790     else:
5791       dev_pstatus = None
5792
5793     if dev.dev_type in constants.LDS_DRBD:
5794       # we change the snode then (otherwise we use the one passed in)
5795       if dev.logical_id[0] == instance.primary_node:
5796         snode = dev.logical_id[1]
5797       else:
5798         snode = dev.logical_id[0]
5799
5800     if snode and not static:
5801       self.cfg.SetDiskID(dev, snode)
5802       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5803       if dev_sstatus.offline:
5804         dev_sstatus = None
5805       else:
5806         msg = dev_sstatus.RemoteFailMsg()
5807         if msg:
5808           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5809                                    (instance.name, msg))
5810         dev_sstatus = dev_sstatus.payload
5811     else:
5812       dev_sstatus = None
5813
5814     if dev.children:
5815       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5816                       for child in dev.children]
5817     else:
5818       dev_children = []
5819
5820     data = {
5821       "iv_name": dev.iv_name,
5822       "dev_type": dev.dev_type,
5823       "logical_id": dev.logical_id,
5824       "physical_id": dev.physical_id,
5825       "pstatus": dev_pstatus,
5826       "sstatus": dev_sstatus,
5827       "children": dev_children,
5828       "mode": dev.mode,
5829       }
5830
5831     return data
5832
5833   def Exec(self, feedback_fn):
5834     """Gather and return data"""
5835     result = {}
5836
5837     cluster = self.cfg.GetClusterInfo()
5838
5839     for instance in self.wanted_instances:
5840       if not self.op.static:
5841         remote_info = self.rpc.call_instance_info(instance.primary_node,
5842                                                   instance.name,
5843                                                   instance.hypervisor)
5844         msg = remote_info.RemoteFailMsg()
5845         if msg:
5846           raise errors.OpExecError("Error checking node %s: %s" %
5847                                    (instance.primary_node, msg))
5848         remote_info = remote_info.payload
5849         if remote_info and "state" in remote_info:
5850           remote_state = "up"
5851         else:
5852           remote_state = "down"
5853       else:
5854         remote_state = None
5855       if instance.admin_up:
5856         config_state = "up"
5857       else:
5858         config_state = "down"
5859
5860       disks = [self._ComputeDiskStatus(instance, None, device)
5861                for device in instance.disks]
5862
5863       idict = {
5864         "name": instance.name,
5865         "config_state": config_state,
5866         "run_state": remote_state,
5867         "pnode": instance.primary_node,
5868         "snodes": instance.secondary_nodes,
5869         "os": instance.os,
5870         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5871         "disks": disks,
5872         "hypervisor": instance.hypervisor,
5873         "network_port": instance.network_port,
5874         "hv_instance": instance.hvparams,
5875         "hv_actual": cluster.FillHV(instance),
5876         "be_instance": instance.beparams,
5877         "be_actual": cluster.FillBE(instance),
5878         }
5879
5880       result[instance.name] = idict
5881
5882     return result
5883
5884
5885 class LUSetInstanceParams(LogicalUnit):
5886   """Modifies an instances's parameters.
5887
5888   """
5889   HPATH = "instance-modify"
5890   HTYPE = constants.HTYPE_INSTANCE
5891   _OP_REQP = ["instance_name"]
5892   REQ_BGL = False
5893
5894   def CheckArguments(self):
5895     if not hasattr(self.op, 'nics'):
5896       self.op.nics = []
5897     if not hasattr(self.op, 'disks'):
5898       self.op.disks = []
5899     if not hasattr(self.op, 'beparams'):
5900       self.op.beparams = {}
5901     if not hasattr(self.op, 'hvparams'):
5902       self.op.hvparams = {}
5903     self.op.force = getattr(self.op, "force", False)
5904     if not (self.op.nics or self.op.disks or
5905             self.op.hvparams or self.op.beparams):
5906       raise errors.OpPrereqError("No changes submitted")
5907
5908     # Disk validation
5909     disk_addremove = 0
5910     for disk_op, disk_dict in self.op.disks:
5911       if disk_op == constants.DDM_REMOVE:
5912         disk_addremove += 1
5913         continue
5914       elif disk_op == constants.DDM_ADD:
5915         disk_addremove += 1
5916       else:
5917         if not isinstance(disk_op, int):
5918           raise errors.OpPrereqError("Invalid disk index")
5919       if disk_op == constants.DDM_ADD:
5920         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5921         if mode not in constants.DISK_ACCESS_SET:
5922           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5923         size = disk_dict.get('size', None)
5924         if size is None:
5925           raise errors.OpPrereqError("Required disk parameter size missing")
5926         try:
5927           size = int(size)
5928         except ValueError, err:
5929           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5930                                      str(err))
5931         disk_dict['size'] = size
5932       else:
5933         # modification of disk
5934         if 'size' in disk_dict:
5935           raise errors.OpPrereqError("Disk size change not possible, use"
5936                                      " grow-disk")
5937
5938     if disk_addremove > 1:
5939       raise errors.OpPrereqError("Only one disk add or remove operation"
5940                                  " supported at a time")
5941
5942     # NIC validation
5943     nic_addremove = 0
5944     for nic_op, nic_dict in self.op.nics:
5945       if nic_op == constants.DDM_REMOVE:
5946         nic_addremove += 1
5947         continue
5948       elif nic_op == constants.DDM_ADD:
5949         nic_addremove += 1
5950       else:
5951         if not isinstance(nic_op, int):
5952           raise errors.OpPrereqError("Invalid nic index")
5953
5954       # nic_dict should be a dict
5955       nic_ip = nic_dict.get('ip', None)
5956       if nic_ip is not None:
5957         if nic_ip.lower() == constants.VALUE_NONE:
5958           nic_dict['ip'] = None
5959         else:
5960           if not utils.IsValidIP(nic_ip):
5961             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5962
5963       nic_bridge = nic_dict.get('bridge', None)
5964       nic_link = nic_dict.get('link', None)
5965       if nic_bridge and nic_link:
5966         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5967       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5968         nic_dict['bridge'] = None
5969       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5970         nic_dict['link'] = None
5971
5972       if nic_op == constants.DDM_ADD:
5973         nic_mac = nic_dict.get('mac', None)
5974         if nic_mac is None:
5975           nic_dict['mac'] = constants.VALUE_AUTO
5976
5977       if 'mac' in nic_dict:
5978         nic_mac = nic_dict['mac']
5979         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5980           if not utils.IsValidMac(nic_mac):
5981             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5982         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5983           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5984                                      " modifying an existing nic")
5985
5986     if nic_addremove > 1:
5987       raise errors.OpPrereqError("Only one NIC add or remove operation"
5988                                  " supported at a time")
5989
5990   def ExpandNames(self):
5991     self._ExpandAndLockInstance()
5992     self.needed_locks[locking.LEVEL_NODE] = []
5993     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5994
5995   def DeclareLocks(self, level):
5996     if level == locking.LEVEL_NODE:
5997       self._LockInstancesNodes()
5998
5999   def BuildHooksEnv(self):
6000     """Build hooks env.
6001
6002     This runs on the master, primary and secondaries.
6003
6004     """
6005     args = dict()
6006     if constants.BE_MEMORY in self.be_new:
6007       args['memory'] = self.be_new[constants.BE_MEMORY]
6008     if constants.BE_VCPUS in self.be_new:
6009       args['vcpus'] = self.be_new[constants.BE_VCPUS]
6010     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6011     # information at all.
6012     if self.op.nics:
6013       args['nics'] = []
6014       nic_override = dict(self.op.nics)
6015       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6016       for idx, nic in enumerate(self.instance.nics):
6017         if idx in nic_override:
6018           this_nic_override = nic_override[idx]
6019         else:
6020           this_nic_override = {}
6021         if 'ip' in this_nic_override:
6022           ip = this_nic_override['ip']
6023         else:
6024           ip = nic.ip
6025         if 'mac' in this_nic_override:
6026           mac = this_nic_override['mac']
6027         else:
6028           mac = nic.mac
6029         if idx in self.nic_pnew:
6030           nicparams = self.nic_pnew[idx]
6031         else:
6032           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6033         mode = nicparams[constants.NIC_MODE]
6034         link = nicparams[constants.NIC_LINK]
6035         args['nics'].append((ip, mac, mode, link))
6036       if constants.DDM_ADD in nic_override:
6037         ip = nic_override[constants.DDM_ADD].get('ip', None)
6038         mac = nic_override[constants.DDM_ADD]['mac']
6039         nicparams = self.nic_pnew[constants.DDM_ADD]
6040         mode = nicparams[constants.NIC_MODE]
6041         link = nicparams[constants.NIC_LINK]
6042         args['nics'].append((ip, mac, mode, link))
6043       elif constants.DDM_REMOVE in nic_override:
6044         del args['nics'][-1]
6045
6046     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6047     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6048     return env, nl, nl
6049
6050   def _GetUpdatedParams(self, old_params, update_dict,
6051                         default_values, parameter_types):
6052     """Return the new params dict for the given params.
6053
6054     @type old_params: dict
6055     @type old_params: old parameters
6056     @type update_dict: dict
6057     @type update_dict: dict containing new parameter values,
6058                        or constants.VALUE_DEFAULT to reset the
6059                        parameter to its default value
6060     @type default_values: dict
6061     @param default_values: default values for the filled parameters
6062     @type parameter_types: dict
6063     @param parameter_types: dict mapping target dict keys to types
6064                             in constants.ENFORCEABLE_TYPES
6065     @rtype: (dict, dict)
6066     @return: (new_parameters, filled_parameters)
6067
6068     """
6069     params_copy = copy.deepcopy(old_params)
6070     for key, val in update_dict.iteritems():
6071       if val == constants.VALUE_DEFAULT:
6072         try:
6073           del params_copy[key]
6074         except KeyError:
6075           pass
6076       else:
6077         params_copy[key] = val
6078     utils.ForceDictType(params_copy, parameter_types)
6079     params_filled = objects.FillDict(default_values, params_copy)
6080     return (params_copy, params_filled)
6081
6082   def CheckPrereq(self):
6083     """Check prerequisites.
6084
6085     This only checks the instance list against the existing names.
6086
6087     """
6088     force = self.force = self.op.force
6089
6090     # checking the new params on the primary/secondary nodes
6091
6092     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6093     cluster = self.cluster = self.cfg.GetClusterInfo()
6094     assert self.instance is not None, \
6095       "Cannot retrieve locked instance %s" % self.op.instance_name
6096     pnode = instance.primary_node
6097     nodelist = list(instance.all_nodes)
6098
6099     # hvparams processing
6100     if self.op.hvparams:
6101       i_hvdict, hv_new = self._GetUpdatedParams(
6102                              instance.hvparams, self.op.hvparams,
6103                              cluster.hvparams[instance.hypervisor],
6104                              constants.HVS_PARAMETER_TYPES)
6105       # local check
6106       hypervisor.GetHypervisor(
6107         instance.hypervisor).CheckParameterSyntax(hv_new)
6108       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6109       self.hv_new = hv_new # the new actual values
6110       self.hv_inst = i_hvdict # the new dict (without defaults)
6111     else:
6112       self.hv_new = self.hv_inst = {}
6113
6114     # beparams processing
6115     if self.op.beparams:
6116       i_bedict, be_new = self._GetUpdatedParams(
6117                              instance.beparams, self.op.beparams,
6118                              cluster.beparams[constants.PP_DEFAULT],
6119                              constants.BES_PARAMETER_TYPES)
6120       self.be_new = be_new # the new actual values
6121       self.be_inst = i_bedict # the new dict (without defaults)
6122     else:
6123       self.be_new = self.be_inst = {}
6124
6125     self.warn = []
6126
6127     if constants.BE_MEMORY in self.op.beparams and not self.force:
6128       mem_check_list = [pnode]
6129       if be_new[constants.BE_AUTO_BALANCE]:
6130         # either we changed auto_balance to yes or it was from before
6131         mem_check_list.extend(instance.secondary_nodes)
6132       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6133                                                   instance.hypervisor)
6134       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6135                                          instance.hypervisor)
6136       pninfo = nodeinfo[pnode]
6137       msg = pninfo.RemoteFailMsg()
6138       if msg:
6139         # Assume the primary node is unreachable and go ahead
6140         self.warn.append("Can't get info from primary node %s: %s" %
6141                          (pnode,  msg))
6142       elif not isinstance(pninfo.payload.get('memory_free', None), int):
6143         self.warn.append("Node data from primary node %s doesn't contain"
6144                          " free memory information" % pnode)
6145       elif instance_info.RemoteFailMsg():
6146         self.warn.append("Can't get instance runtime information: %s" %
6147                         instance_info.RemoteFailMsg())
6148       else:
6149         if instance_info.payload:
6150           current_mem = int(instance_info.payload['memory'])
6151         else:
6152           # Assume instance not running
6153           # (there is a slight race condition here, but it's not very probable,
6154           # and we have no other way to check)
6155           current_mem = 0
6156         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6157                     pninfo.payload['memory_free'])
6158         if miss_mem > 0:
6159           raise errors.OpPrereqError("This change will prevent the instance"
6160                                      " from starting, due to %d MB of memory"
6161                                      " missing on its primary node" % miss_mem)
6162
6163       if be_new[constants.BE_AUTO_BALANCE]:
6164         for node, nres in nodeinfo.items():
6165           if node not in instance.secondary_nodes:
6166             continue
6167           msg = nres.RemoteFailMsg()
6168           if msg:
6169             self.warn.append("Can't get info from secondary node %s: %s" %
6170                              (node, msg))
6171           elif not isinstance(nres.payload.get('memory_free', None), int):
6172             self.warn.append("Secondary node %s didn't return free"
6173                              " memory information" % node)
6174           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6175             self.warn.append("Not enough memory to failover instance to"
6176                              " secondary node %s" % node)
6177
6178     # NIC processing
6179     self.nic_pnew = {}
6180     self.nic_pinst = {}
6181     for nic_op, nic_dict in self.op.nics:
6182       if nic_op == constants.DDM_REMOVE:
6183         if not instance.nics:
6184           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6185         continue
6186       if nic_op != constants.DDM_ADD:
6187         # an existing nic
6188         if nic_op < 0 or nic_op >= len(instance.nics):
6189           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6190                                      " are 0 to %d" %
6191                                      (nic_op, len(instance.nics)))
6192         old_nic_params = instance.nics[nic_op].nicparams
6193         old_nic_ip = instance.nics[nic_op].ip
6194       else:
6195         old_nic_params = {}
6196         old_nic_ip = None
6197
6198       update_params_dict = dict([(key, nic_dict[key])
6199                                  for key in constants.NICS_PARAMETERS
6200                                  if key in nic_dict])
6201
6202       if 'bridge' in nic_dict:
6203         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6204
6205       new_nic_params, new_filled_nic_params = \
6206           self._GetUpdatedParams(old_nic_params, update_params_dict,
6207                                  cluster.nicparams[constants.PP_DEFAULT],
6208                                  constants.NICS_PARAMETER_TYPES)
6209       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6210       self.nic_pinst[nic_op] = new_nic_params
6211       self.nic_pnew[nic_op] = new_filled_nic_params
6212       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6213
6214       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6215         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6216         result = self.rpc.call_bridges_exist(pnode, [nic_bridge])
6217         msg = result.RemoteFailMsg()
6218         if msg:
6219           msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6220           if self.force:
6221             self.warn.append(msg)
6222           else:
6223             raise errors.OpPrereqError(msg)
6224       if new_nic_mode == constants.NIC_MODE_ROUTED:
6225         if 'ip' in nic_dict:
6226           nic_ip = nic_dict['ip']
6227         else:
6228           nic_ip = old_nic_ip
6229         if nic_ip is None:
6230           raise errors.OpPrereqError('Cannot set the nic ip to None'
6231                                      ' on a routed nic')
6232       if 'mac' in nic_dict:
6233         nic_mac = nic_dict['mac']
6234         if nic_mac is None:
6235           raise errors.OpPrereqError('Cannot set the nic mac to None')
6236         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6237           # otherwise generate the mac
6238           nic_dict['mac'] = self.cfg.GenerateMAC()
6239         else:
6240           # or validate/reserve the current one
6241           if self.cfg.IsMacInUse(nic_mac):
6242             raise errors.OpPrereqError("MAC address %s already in use"
6243                                        " in cluster" % nic_mac)
6244
6245     # DISK processing
6246     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6247       raise errors.OpPrereqError("Disk operations not supported for"
6248                                  " diskless instances")
6249     for disk_op, disk_dict in self.op.disks:
6250       if disk_op == constants.DDM_REMOVE:
6251         if len(instance.disks) == 1:
6252           raise errors.OpPrereqError("Cannot remove the last disk of"
6253                                      " an instance")
6254         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6255         ins_l = ins_l[pnode]
6256         msg = ins_l.RemoteFailMsg()
6257         if msg:
6258           raise errors.OpPrereqError("Can't contact node %s: %s" %
6259                                      (pnode, msg))
6260         if instance.name in ins_l.payload:
6261           raise errors.OpPrereqError("Instance is running, can't remove"
6262                                      " disks.")
6263
6264       if (disk_op == constants.DDM_ADD and
6265           len(instance.nics) >= constants.MAX_DISKS):
6266         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6267                                    " add more" % constants.MAX_DISKS)
6268       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6269         # an existing disk
6270         if disk_op < 0 or disk_op >= len(instance.disks):
6271           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6272                                      " are 0 to %d" %
6273                                      (disk_op, len(instance.disks)))
6274
6275     return
6276
6277   def Exec(self, feedback_fn):
6278     """Modifies an instance.
6279
6280     All parameters take effect only at the next restart of the instance.
6281
6282     """
6283     # Process here the warnings from CheckPrereq, as we don't have a
6284     # feedback_fn there.
6285     for warn in self.warn:
6286       feedback_fn("WARNING: %s" % warn)
6287
6288     result = []
6289     instance = self.instance
6290     cluster = self.cluster
6291     # disk changes
6292     for disk_op, disk_dict in self.op.disks:
6293       if disk_op == constants.DDM_REMOVE:
6294         # remove the last disk
6295         device = instance.disks.pop()
6296         device_idx = len(instance.disks)
6297         for node, disk in device.ComputeNodeTree(instance.primary_node):
6298           self.cfg.SetDiskID(disk, node)
6299           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6300           if msg:
6301             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6302                             " continuing anyway", device_idx, node, msg)
6303         result.append(("disk/%d" % device_idx, "remove"))
6304       elif disk_op == constants.DDM_ADD:
6305         # add a new disk
6306         if instance.disk_template == constants.DT_FILE:
6307           file_driver, file_path = instance.disks[0].logical_id
6308           file_path = os.path.dirname(file_path)
6309         else:
6310           file_driver = file_path = None
6311         disk_idx_base = len(instance.disks)
6312         new_disk = _GenerateDiskTemplate(self,
6313                                          instance.disk_template,
6314                                          instance.name, instance.primary_node,
6315                                          instance.secondary_nodes,
6316                                          [disk_dict],
6317                                          file_path,
6318                                          file_driver,
6319                                          disk_idx_base)[0]
6320         instance.disks.append(new_disk)
6321         info = _GetInstanceInfoText(instance)
6322
6323         logging.info("Creating volume %s for instance %s",
6324                      new_disk.iv_name, instance.name)
6325         # Note: this needs to be kept in sync with _CreateDisks
6326         #HARDCODE
6327         for node in instance.all_nodes:
6328           f_create = node == instance.primary_node
6329           try:
6330             _CreateBlockDev(self, node, instance, new_disk,
6331                             f_create, info, f_create)
6332           except errors.OpExecError, err:
6333             self.LogWarning("Failed to create volume %s (%s) on"
6334                             " node %s: %s",
6335                             new_disk.iv_name, new_disk, node, err)
6336         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6337                        (new_disk.size, new_disk.mode)))
6338       else:
6339         # change a given disk
6340         instance.disks[disk_op].mode = disk_dict['mode']
6341         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6342     # NIC changes
6343     for nic_op, nic_dict in self.op.nics:
6344       if nic_op == constants.DDM_REMOVE:
6345         # remove the last nic
6346         del instance.nics[-1]
6347         result.append(("nic.%d" % len(instance.nics), "remove"))
6348       elif nic_op == constants.DDM_ADD:
6349         # mac and bridge should be set, by now
6350         mac = nic_dict['mac']
6351         ip = nic_dict.get('ip', None)
6352         nicparams = self.nic_pinst[constants.DDM_ADD]
6353         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6354         instance.nics.append(new_nic)
6355         result.append(("nic.%d" % (len(instance.nics) - 1),
6356                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6357                        (new_nic.mac, new_nic.ip,
6358                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6359                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6360                        )))
6361       else:
6362         for key in 'mac', 'ip':
6363           if key in nic_dict:
6364             setattr(instance.nics[nic_op], key, nic_dict[key])
6365         if nic_op in self.nic_pnew:
6366           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6367         for key, val in nic_dict.iteritems():
6368           result.append(("nic.%s/%d" % (key, nic_op), val))
6369
6370     # hvparams changes
6371     if self.op.hvparams:
6372       instance.hvparams = self.hv_inst
6373       for key, val in self.op.hvparams.iteritems():
6374         result.append(("hv/%s" % key, val))
6375
6376     # beparams changes
6377     if self.op.beparams:
6378       instance.beparams = self.be_inst
6379       for key, val in self.op.beparams.iteritems():
6380         result.append(("be/%s" % key, val))
6381
6382     self.cfg.Update(instance)
6383
6384     return result
6385
6386
6387 class LUQueryExports(NoHooksLU):
6388   """Query the exports list
6389
6390   """
6391   _OP_REQP = ['nodes']
6392   REQ_BGL = False
6393
6394   def ExpandNames(self):
6395     self.needed_locks = {}
6396     self.share_locks[locking.LEVEL_NODE] = 1
6397     if not self.op.nodes:
6398       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6399     else:
6400       self.needed_locks[locking.LEVEL_NODE] = \
6401         _GetWantedNodes(self, self.op.nodes)
6402
6403   def CheckPrereq(self):
6404     """Check prerequisites.
6405
6406     """
6407     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6408
6409   def Exec(self, feedback_fn):
6410     """Compute the list of all the exported system images.
6411
6412     @rtype: dict
6413     @return: a dictionary with the structure node->(export-list)
6414         where export-list is a list of the instances exported on
6415         that node.
6416
6417     """
6418     rpcresult = self.rpc.call_export_list(self.nodes)
6419     result = {}
6420     for node in rpcresult:
6421       if rpcresult[node].RemoteFailMsg():
6422         result[node] = False
6423       else:
6424         result[node] = rpcresult[node].payload
6425
6426     return result
6427
6428
6429 class LUExportInstance(LogicalUnit):
6430   """Export an instance to an image in the cluster.
6431
6432   """
6433   HPATH = "instance-export"
6434   HTYPE = constants.HTYPE_INSTANCE
6435   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6436   REQ_BGL = False
6437
6438   def ExpandNames(self):
6439     self._ExpandAndLockInstance()
6440     # FIXME: lock only instance primary and destination node
6441     #
6442     # Sad but true, for now we have do lock all nodes, as we don't know where
6443     # the previous export might be, and and in this LU we search for it and
6444     # remove it from its current node. In the future we could fix this by:
6445     #  - making a tasklet to search (share-lock all), then create the new one,
6446     #    then one to remove, after
6447     #  - removing the removal operation altoghether
6448     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6449
6450   def DeclareLocks(self, level):
6451     """Last minute lock declaration."""
6452     # All nodes are locked anyway, so nothing to do here.
6453
6454   def BuildHooksEnv(self):
6455     """Build hooks env.
6456
6457     This will run on the master, primary node and target node.
6458
6459     """
6460     env = {
6461       "EXPORT_NODE": self.op.target_node,
6462       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6463       }
6464     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6465     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6466           self.op.target_node]
6467     return env, nl, nl
6468
6469   def CheckPrereq(self):
6470     """Check prerequisites.
6471
6472     This checks that the instance and node names are valid.
6473
6474     """
6475     instance_name = self.op.instance_name
6476     self.instance = self.cfg.GetInstanceInfo(instance_name)
6477     assert self.instance is not None, \
6478           "Cannot retrieve locked instance %s" % self.op.instance_name
6479     _CheckNodeOnline(self, self.instance.primary_node)
6480
6481     self.dst_node = self.cfg.GetNodeInfo(
6482       self.cfg.ExpandNodeName(self.op.target_node))
6483
6484     if self.dst_node is None:
6485       # This is wrong node name, not a non-locked node
6486       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6487     _CheckNodeOnline(self, self.dst_node.name)
6488     _CheckNodeNotDrained(self, self.dst_node.name)
6489
6490     # instance disk type verification
6491     for disk in self.instance.disks:
6492       if disk.dev_type == constants.LD_FILE:
6493         raise errors.OpPrereqError("Export not supported for instances with"
6494                                    " file-based disks")
6495
6496   def Exec(self, feedback_fn):
6497     """Export an instance to an image in the cluster.
6498
6499     """
6500     instance = self.instance
6501     dst_node = self.dst_node
6502     src_node = instance.primary_node
6503     if self.op.shutdown:
6504       # shutdown the instance, but not the disks
6505       result = self.rpc.call_instance_shutdown(src_node, instance)
6506       msg = result.RemoteFailMsg()
6507       if msg:
6508         raise errors.OpExecError("Could not shutdown instance %s on"
6509                                  " node %s: %s" %
6510                                  (instance.name, src_node, msg))
6511
6512     vgname = self.cfg.GetVGName()
6513
6514     snap_disks = []
6515
6516     # set the disks ID correctly since call_instance_start needs the
6517     # correct drbd minor to create the symlinks
6518     for disk in instance.disks:
6519       self.cfg.SetDiskID(disk, src_node)
6520
6521     try:
6522       for disk in instance.disks:
6523         # result.payload will be a snapshot of an lvm leaf of the one we passed
6524         result = self.rpc.call_blockdev_snapshot(src_node, disk)
6525         msg = result.RemoteFailMsg()
6526         if msg:
6527           self.LogWarning("Could not snapshot block device %s on node %s: %s",
6528                           disk.logical_id[1], src_node, msg)
6529           snap_disks.append(False)
6530         else:
6531           disk_id = (vgname, result.payload)
6532           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6533                                  logical_id=disk_id, physical_id=disk_id,
6534                                  iv_name=disk.iv_name)
6535           snap_disks.append(new_dev)
6536
6537     finally:
6538       if self.op.shutdown and instance.admin_up:
6539         result = self.rpc.call_instance_start(src_node, instance, None, None)
6540         msg = result.RemoteFailMsg()
6541         if msg:
6542           _ShutdownInstanceDisks(self, instance)
6543           raise errors.OpExecError("Could not start instance: %s" % msg)
6544
6545     # TODO: check for size
6546
6547     cluster_name = self.cfg.GetClusterName()
6548     for idx, dev in enumerate(snap_disks):
6549       if dev:
6550         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6551                                                instance, cluster_name, idx)
6552         msg = result.RemoteFailMsg()
6553         if msg:
6554           self.LogWarning("Could not export block device %s from node %s to"
6555                           " node %s: %s", dev.logical_id[1], src_node,
6556                           dst_node.name, msg)
6557         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6558         if msg:
6559           self.LogWarning("Could not remove snapshot block device %s from node"
6560                           " %s: %s", dev.logical_id[1], src_node, msg)
6561
6562     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6563     msg = result.RemoteFailMsg()
6564     if msg:
6565       self.LogWarning("Could not finalize export for instance %s"
6566                       " on node %s: %s", instance.name, dst_node.name, msg)
6567
6568     nodelist = self.cfg.GetNodeList()
6569     nodelist.remove(dst_node.name)
6570
6571     # on one-node clusters nodelist will be empty after the removal
6572     # if we proceed the backup would be removed because OpQueryExports
6573     # substitutes an empty list with the full cluster node list.
6574     iname = instance.name
6575     if nodelist:
6576       exportlist = self.rpc.call_export_list(nodelist)
6577       for node in exportlist:
6578         if exportlist[node].RemoteFailMsg():
6579           continue
6580         if iname in exportlist[node].payload:
6581           msg = self.rpc.call_export_remove(node, iname).RemoteFailMsg()
6582           if msg:
6583             self.LogWarning("Could not remove older export for instance %s"
6584                             " on node %s: %s", iname, node, msg)
6585
6586
6587 class LURemoveExport(NoHooksLU):
6588   """Remove exports related to the named instance.
6589
6590   """
6591   _OP_REQP = ["instance_name"]
6592   REQ_BGL = False
6593
6594   def ExpandNames(self):
6595     self.needed_locks = {}
6596     # We need all nodes to be locked in order for RemoveExport to work, but we
6597     # don't need to lock the instance itself, as nothing will happen to it (and
6598     # we can remove exports also for a removed instance)
6599     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6600
6601   def CheckPrereq(self):
6602     """Check prerequisites.
6603     """
6604     pass
6605
6606   def Exec(self, feedback_fn):
6607     """Remove any export.
6608
6609     """
6610     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6611     # If the instance was not found we'll try with the name that was passed in.
6612     # This will only work if it was an FQDN, though.
6613     fqdn_warn = False
6614     if not instance_name:
6615       fqdn_warn = True
6616       instance_name = self.op.instance_name
6617
6618     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6619     exportlist = self.rpc.call_export_list(locked_nodes)
6620     found = False
6621     for node in exportlist:
6622       msg = exportlist[node].RemoteFailMsg()
6623       if msg:
6624         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6625         continue
6626       if instance_name in exportlist[node].payload:
6627         found = True
6628         result = self.rpc.call_export_remove(node, instance_name)
6629         msg = result.RemoteFailMsg()
6630         if msg:
6631           logging.error("Could not remove export for instance %s"
6632                         " on node %s: %s", instance_name, node, msg)
6633
6634     if fqdn_warn and not found:
6635       feedback_fn("Export not found. If trying to remove an export belonging"
6636                   " to a deleted instance please use its Fully Qualified"
6637                   " Domain Name.")
6638
6639
6640 class TagsLU(NoHooksLU):
6641   """Generic tags LU.
6642
6643   This is an abstract class which is the parent of all the other tags LUs.
6644
6645   """
6646
6647   def ExpandNames(self):
6648     self.needed_locks = {}
6649     if self.op.kind == constants.TAG_NODE:
6650       name = self.cfg.ExpandNodeName(self.op.name)
6651       if name is None:
6652         raise errors.OpPrereqError("Invalid node name (%s)" %
6653                                    (self.op.name,))
6654       self.op.name = name
6655       self.needed_locks[locking.LEVEL_NODE] = name
6656     elif self.op.kind == constants.TAG_INSTANCE:
6657       name = self.cfg.ExpandInstanceName(self.op.name)
6658       if name is None:
6659         raise errors.OpPrereqError("Invalid instance name (%s)" %
6660                                    (self.op.name,))
6661       self.op.name = name
6662       self.needed_locks[locking.LEVEL_INSTANCE] = name
6663
6664   def CheckPrereq(self):
6665     """Check prerequisites.
6666
6667     """
6668     if self.op.kind == constants.TAG_CLUSTER:
6669       self.target = self.cfg.GetClusterInfo()
6670     elif self.op.kind == constants.TAG_NODE:
6671       self.target = self.cfg.GetNodeInfo(self.op.name)
6672     elif self.op.kind == constants.TAG_INSTANCE:
6673       self.target = self.cfg.GetInstanceInfo(self.op.name)
6674     else:
6675       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6676                                  str(self.op.kind))
6677
6678
6679 class LUGetTags(TagsLU):
6680   """Returns the tags of a given object.
6681
6682   """
6683   _OP_REQP = ["kind", "name"]
6684   REQ_BGL = False
6685
6686   def Exec(self, feedback_fn):
6687     """Returns the tag list.
6688
6689     """
6690     return list(self.target.GetTags())
6691
6692
6693 class LUSearchTags(NoHooksLU):
6694   """Searches the tags for a given pattern.
6695
6696   """
6697   _OP_REQP = ["pattern"]
6698   REQ_BGL = False
6699
6700   def ExpandNames(self):
6701     self.needed_locks = {}
6702
6703   def CheckPrereq(self):
6704     """Check prerequisites.
6705
6706     This checks the pattern passed for validity by compiling it.
6707
6708     """
6709     try:
6710       self.re = re.compile(self.op.pattern)
6711     except re.error, err:
6712       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6713                                  (self.op.pattern, err))
6714
6715   def Exec(self, feedback_fn):
6716     """Returns the tag list.
6717
6718     """
6719     cfg = self.cfg
6720     tgts = [("/cluster", cfg.GetClusterInfo())]
6721     ilist = cfg.GetAllInstancesInfo().values()
6722     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6723     nlist = cfg.GetAllNodesInfo().values()
6724     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6725     results = []
6726     for path, target in tgts:
6727       for tag in target.GetTags():
6728         if self.re.search(tag):
6729           results.append((path, tag))
6730     return results
6731
6732
6733 class LUAddTags(TagsLU):
6734   """Sets a tag on a given object.
6735
6736   """
6737   _OP_REQP = ["kind", "name", "tags"]
6738   REQ_BGL = False
6739
6740   def CheckPrereq(self):
6741     """Check prerequisites.
6742
6743     This checks the type and length of the tag name and value.
6744
6745     """
6746     TagsLU.CheckPrereq(self)
6747     for tag in self.op.tags:
6748       objects.TaggableObject.ValidateTag(tag)
6749
6750   def Exec(self, feedback_fn):
6751     """Sets the tag.
6752
6753     """
6754     try:
6755       for tag in self.op.tags:
6756         self.target.AddTag(tag)
6757     except errors.TagError, err:
6758       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6759     try:
6760       self.cfg.Update(self.target)
6761     except errors.ConfigurationError:
6762       raise errors.OpRetryError("There has been a modification to the"
6763                                 " config file and the operation has been"
6764                                 " aborted. Please retry.")
6765
6766
6767 class LUDelTags(TagsLU):
6768   """Delete a list of tags from a given object.
6769
6770   """
6771   _OP_REQP = ["kind", "name", "tags"]
6772   REQ_BGL = False
6773
6774   def CheckPrereq(self):
6775     """Check prerequisites.
6776
6777     This checks that we have the given tag.
6778
6779     """
6780     TagsLU.CheckPrereq(self)
6781     for tag in self.op.tags:
6782       objects.TaggableObject.ValidateTag(tag)
6783     del_tags = frozenset(self.op.tags)
6784     cur_tags = self.target.GetTags()
6785     if not del_tags <= cur_tags:
6786       diff_tags = del_tags - cur_tags
6787       diff_names = ["'%s'" % tag for tag in diff_tags]
6788       diff_names.sort()
6789       raise errors.OpPrereqError("Tag(s) %s not found" %
6790                                  (",".join(diff_names)))
6791
6792   def Exec(self, feedback_fn):
6793     """Remove the tag from the object.
6794
6795     """
6796     for tag in self.op.tags:
6797       self.target.RemoveTag(tag)
6798     try:
6799       self.cfg.Update(self.target)
6800     except errors.ConfigurationError:
6801       raise errors.OpRetryError("There has been a modification to the"
6802                                 " config file and the operation has been"
6803                                 " aborted. Please retry.")
6804
6805
6806 class LUTestDelay(NoHooksLU):
6807   """Sleep for a specified amount of time.
6808
6809   This LU sleeps on the master and/or nodes for a specified amount of
6810   time.
6811
6812   """
6813   _OP_REQP = ["duration", "on_master", "on_nodes"]
6814   REQ_BGL = False
6815
6816   def ExpandNames(self):
6817     """Expand names and set required locks.
6818
6819     This expands the node list, if any.
6820
6821     """
6822     self.needed_locks = {}
6823     if self.op.on_nodes:
6824       # _GetWantedNodes can be used here, but is not always appropriate to use
6825       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6826       # more information.
6827       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6828       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6829
6830   def CheckPrereq(self):
6831     """Check prerequisites.
6832
6833     """
6834
6835   def Exec(self, feedback_fn):
6836     """Do the actual sleep.
6837
6838     """
6839     if self.op.on_master:
6840       if not utils.TestDelay(self.op.duration):
6841         raise errors.OpExecError("Error during master delay test")
6842     if self.op.on_nodes:
6843       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6844       if not result:
6845         raise errors.OpExecError("Complete failure from rpc call")
6846       for node, node_result in result.items():
6847         node_result.Raise()
6848         if not node_result.data:
6849           raise errors.OpExecError("Failure during rpc call to node %s,"
6850                                    " result: %s" % (node, node_result.data))
6851
6852
6853 class IAllocator(object):
6854   """IAllocator framework.
6855
6856   An IAllocator instance has three sets of attributes:
6857     - cfg that is needed to query the cluster
6858     - input data (all members of the _KEYS class attribute are required)
6859     - four buffer attributes (in|out_data|text), that represent the
6860       input (to the external script) in text and data structure format,
6861       and the output from it, again in two formats
6862     - the result variables from the script (success, info, nodes) for
6863       easy usage
6864
6865   """
6866   _ALLO_KEYS = [
6867     "mem_size", "disks", "disk_template",
6868     "os", "tags", "nics", "vcpus", "hypervisor",
6869     ]
6870   _RELO_KEYS = [
6871     "relocate_from",
6872     ]
6873
6874   def __init__(self, lu, mode, name, **kwargs):
6875     self.lu = lu
6876     # init buffer variables
6877     self.in_text = self.out_text = self.in_data = self.out_data = None
6878     # init all input fields so that pylint is happy
6879     self.mode = mode
6880     self.name = name
6881     self.mem_size = self.disks = self.disk_template = None
6882     self.os = self.tags = self.nics = self.vcpus = None
6883     self.hypervisor = None
6884     self.relocate_from = None
6885     # computed fields
6886     self.required_nodes = None
6887     # init result fields
6888     self.success = self.info = self.nodes = None
6889     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6890       keyset = self._ALLO_KEYS
6891     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6892       keyset = self._RELO_KEYS
6893     else:
6894       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6895                                    " IAllocator" % self.mode)
6896     for key in kwargs:
6897       if key not in keyset:
6898         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6899                                      " IAllocator" % key)
6900       setattr(self, key, kwargs[key])
6901     for key in keyset:
6902       if key not in kwargs:
6903         raise errors.ProgrammerError("Missing input parameter '%s' to"
6904                                      " IAllocator" % key)
6905     self._BuildInputData()
6906
6907   def _ComputeClusterData(self):
6908     """Compute the generic allocator input data.
6909
6910     This is the data that is independent of the actual operation.
6911
6912     """
6913     cfg = self.lu.cfg
6914     cluster_info = cfg.GetClusterInfo()
6915     # cluster data
6916     data = {
6917       "version": constants.IALLOCATOR_VERSION,
6918       "cluster_name": cfg.GetClusterName(),
6919       "cluster_tags": list(cluster_info.GetTags()),
6920       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6921       # we don't have job IDs
6922       }
6923     iinfo = cfg.GetAllInstancesInfo().values()
6924     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6925
6926     # node data
6927     node_results = {}
6928     node_list = cfg.GetNodeList()
6929
6930     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6931       hypervisor_name = self.hypervisor
6932     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6933       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6934
6935     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6936                                            hypervisor_name)
6937     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6938                        cluster_info.enabled_hypervisors)
6939     for nname, nresult in node_data.items():
6940       # first fill in static (config-based) values
6941       ninfo = cfg.GetNodeInfo(nname)
6942       pnr = {
6943         "tags": list(ninfo.GetTags()),
6944         "primary_ip": ninfo.primary_ip,
6945         "secondary_ip": ninfo.secondary_ip,
6946         "offline": ninfo.offline,
6947         "drained": ninfo.drained,
6948         "master_candidate": ninfo.master_candidate,
6949         }
6950
6951       if not ninfo.offline:
6952         msg = nresult.RemoteFailMsg()
6953         if msg:
6954           raise errors.OpExecError("Can't get data for node %s: %s" %
6955                                    (nname, msg))
6956         msg = node_iinfo[nname].RemoteFailMsg()
6957         if msg:
6958           raise errors.OpExecError("Can't get node instance info"
6959                                    " from node %s: %s" % (nname, msg))
6960         remote_info = nresult.payload
6961         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6962                      'vg_size', 'vg_free', 'cpu_total']:
6963           if attr not in remote_info:
6964             raise errors.OpExecError("Node '%s' didn't return attribute"
6965                                      " '%s'" % (nname, attr))
6966           if not isinstance(remote_info[attr], int):
6967             raise errors.OpExecError("Node '%s' returned invalid value"
6968                                      " for '%s': %s" %
6969                                      (nname, attr, remote_info[attr]))
6970         # compute memory used by primary instances
6971         i_p_mem = i_p_up_mem = 0
6972         for iinfo, beinfo in i_list:
6973           if iinfo.primary_node == nname:
6974             i_p_mem += beinfo[constants.BE_MEMORY]
6975             if iinfo.name not in node_iinfo[nname].payload:
6976               i_used_mem = 0
6977             else:
6978               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6979             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6980             remote_info['memory_free'] -= max(0, i_mem_diff)
6981
6982             if iinfo.admin_up:
6983               i_p_up_mem += beinfo[constants.BE_MEMORY]
6984
6985         # compute memory used by instances
6986         pnr_dyn = {
6987           "total_memory": remote_info['memory_total'],
6988           "reserved_memory": remote_info['memory_dom0'],
6989           "free_memory": remote_info['memory_free'],
6990           "total_disk": remote_info['vg_size'],
6991           "free_disk": remote_info['vg_free'],
6992           "total_cpus": remote_info['cpu_total'],
6993           "i_pri_memory": i_p_mem,
6994           "i_pri_up_memory": i_p_up_mem,
6995           }
6996         pnr.update(pnr_dyn)
6997
6998       node_results[nname] = pnr
6999     data["nodes"] = node_results
7000
7001     # instance data
7002     instance_data = {}
7003     for iinfo, beinfo in i_list:
7004       nic_data = []
7005       for nic in iinfo.nics:
7006         filled_params = objects.FillDict(
7007             cluster_info.nicparams[constants.PP_DEFAULT],
7008             nic.nicparams)
7009         nic_dict = {"mac": nic.mac,
7010                     "ip": nic.ip,
7011                     "mode": filled_params[constants.NIC_MODE],
7012                     "link": filled_params[constants.NIC_LINK],
7013                    }
7014         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7015           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7016         nic_data.append(nic_dict)
7017       pir = {
7018         "tags": list(iinfo.GetTags()),
7019         "admin_up": iinfo.admin_up,
7020         "vcpus": beinfo[constants.BE_VCPUS],
7021         "memory": beinfo[constants.BE_MEMORY],
7022         "os": iinfo.os,
7023         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7024         "nics": nic_data,
7025         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7026         "disk_template": iinfo.disk_template,
7027         "hypervisor": iinfo.hypervisor,
7028         }
7029       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7030                                                  pir["disks"])
7031       instance_data[iinfo.name] = pir
7032
7033     data["instances"] = instance_data
7034
7035     self.in_data = data
7036
7037   def _AddNewInstance(self):
7038     """Add new instance data to allocator structure.
7039
7040     This in combination with _AllocatorGetClusterData will create the
7041     correct structure needed as input for the allocator.
7042
7043     The checks for the completeness of the opcode must have already been
7044     done.
7045
7046     """
7047     data = self.in_data
7048
7049     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7050
7051     if self.disk_template in constants.DTS_NET_MIRROR:
7052       self.required_nodes = 2
7053     else:
7054       self.required_nodes = 1
7055     request = {
7056       "type": "allocate",
7057       "name": self.name,
7058       "disk_template": self.disk_template,
7059       "tags": self.tags,
7060       "os": self.os,
7061       "vcpus": self.vcpus,
7062       "memory": self.mem_size,
7063       "disks": self.disks,
7064       "disk_space_total": disk_space,
7065       "nics": self.nics,
7066       "required_nodes": self.required_nodes,
7067       }
7068     data["request"] = request
7069
7070   def _AddRelocateInstance(self):
7071     """Add relocate instance data to allocator structure.
7072
7073     This in combination with _IAllocatorGetClusterData will create the
7074     correct structure needed as input for the allocator.
7075
7076     The checks for the completeness of the opcode must have already been
7077     done.
7078
7079     """
7080     instance = self.lu.cfg.GetInstanceInfo(self.name)
7081     if instance is None:
7082       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7083                                    " IAllocator" % self.name)
7084
7085     if instance.disk_template not in constants.DTS_NET_MIRROR:
7086       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7087
7088     if len(instance.secondary_nodes) != 1:
7089       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7090
7091     self.required_nodes = 1
7092     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7093     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7094
7095     request = {
7096       "type": "relocate",
7097       "name": self.name,
7098       "disk_space_total": disk_space,
7099       "required_nodes": self.required_nodes,
7100       "relocate_from": self.relocate_from,
7101       }
7102     self.in_data["request"] = request
7103
7104   def _BuildInputData(self):
7105     """Build input data structures.
7106
7107     """
7108     self._ComputeClusterData()
7109
7110     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7111       self._AddNewInstance()
7112     else:
7113       self._AddRelocateInstance()
7114
7115     self.in_text = serializer.Dump(self.in_data)
7116
7117   def Run(self, name, validate=True, call_fn=None):
7118     """Run an instance allocator and return the results.
7119
7120     """
7121     if call_fn is None:
7122       call_fn = self.lu.rpc.call_iallocator_runner
7123     data = self.in_text
7124
7125     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7126     msg = result.RemoteFailMsg()
7127     if msg:
7128       raise errors.OpExecError("Failure while running the iallocator"
7129                                " script: %s" % msg)
7130
7131     self.out_text = result.payload
7132     if validate:
7133       self._ValidateResult()
7134
7135   def _ValidateResult(self):
7136     """Process the allocator results.
7137
7138     This will process and if successful save the result in
7139     self.out_data and the other parameters.
7140
7141     """
7142     try:
7143       rdict = serializer.Load(self.out_text)
7144     except Exception, err:
7145       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7146
7147     if not isinstance(rdict, dict):
7148       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7149
7150     for key in "success", "info", "nodes":
7151       if key not in rdict:
7152         raise errors.OpExecError("Can't parse iallocator results:"
7153                                  " missing key '%s'" % key)
7154       setattr(self, key, rdict[key])
7155
7156     if not isinstance(rdict["nodes"], list):
7157       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7158                                " is not a list")
7159     self.out_data = rdict
7160
7161
7162 class LUTestAllocator(NoHooksLU):
7163   """Run allocator tests.
7164
7165   This LU runs the allocator tests
7166
7167   """
7168   _OP_REQP = ["direction", "mode", "name"]
7169
7170   def CheckPrereq(self):
7171     """Check prerequisites.
7172
7173     This checks the opcode parameters depending on the director and mode test.
7174
7175     """
7176     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7177       for attr in ["name", "mem_size", "disks", "disk_template",
7178                    "os", "tags", "nics", "vcpus"]:
7179         if not hasattr(self.op, attr):
7180           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7181                                      attr)
7182       iname = self.cfg.ExpandInstanceName(self.op.name)
7183       if iname is not None:
7184         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7185                                    iname)
7186       if not isinstance(self.op.nics, list):
7187         raise errors.OpPrereqError("Invalid parameter 'nics'")
7188       for row in self.op.nics:
7189         if (not isinstance(row, dict) or
7190             "mac" not in row or
7191             "ip" not in row or
7192             "bridge" not in row):
7193           raise errors.OpPrereqError("Invalid contents of the"
7194                                      " 'nics' parameter")
7195       if not isinstance(self.op.disks, list):
7196         raise errors.OpPrereqError("Invalid parameter 'disks'")
7197       for row in self.op.disks:
7198         if (not isinstance(row, dict) or
7199             "size" not in row or
7200             not isinstance(row["size"], int) or
7201             "mode" not in row or
7202             row["mode"] not in ['r', 'w']):
7203           raise errors.OpPrereqError("Invalid contents of the"
7204                                      " 'disks' parameter")
7205       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7206         self.op.hypervisor = self.cfg.GetHypervisorType()
7207     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7208       if not hasattr(self.op, "name"):
7209         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7210       fname = self.cfg.ExpandInstanceName(self.op.name)
7211       if fname is None:
7212         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7213                                    self.op.name)
7214       self.op.name = fname
7215       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7216     else:
7217       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7218                                  self.op.mode)
7219
7220     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7221       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7222         raise errors.OpPrereqError("Missing allocator name")
7223     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7224       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7225                                  self.op.direction)
7226
7227   def Exec(self, feedback_fn):
7228     """Run the allocator test.
7229
7230     """
7231     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7232       ial = IAllocator(self,
7233                        mode=self.op.mode,
7234                        name=self.op.name,
7235                        mem_size=self.op.mem_size,
7236                        disks=self.op.disks,
7237                        disk_template=self.op.disk_template,
7238                        os=self.op.os,
7239                        tags=self.op.tags,
7240                        nics=self.op.nics,
7241                        vcpus=self.op.vcpus,
7242                        hypervisor=self.op.hypervisor,
7243                        )
7244     else:
7245       ial = IAllocator(self,
7246                        mode=self.op.mode,
7247                        name=self.op.name,
7248                        relocate_from=list(self.relocate_from),
7249                        )
7250
7251     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7252       result = ial.in_text
7253     else:
7254       ial.Run(self.op.allocator, validate=False)
7255       result = ial.out_text
7256     return result