Convert instance_list rpc to new style result
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, mac, mode, link) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_MAC" % idx] = mac
509       env["INSTANCE_NIC%d_MODE" % idx] = mode
510       env["INSTANCE_NIC%d_LINK" % idx] = link
511       if mode == constants.NIC_MODE_BRIDGED:
512         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
513   else:
514     nic_count = 0
515
516   env["INSTANCE_NIC_COUNT"] = nic_count
517
518   if disks:
519     disk_count = len(disks)
520     for idx, (size, mode) in enumerate(disks):
521       env["INSTANCE_DISK%d_SIZE" % idx] = size
522       env["INSTANCE_DISK%d_MODE" % idx] = mode
523   else:
524     disk_count = 0
525
526   env["INSTANCE_DISK_COUNT"] = disk_count
527
528   return env
529
530 def _PreBuildNICHooksList(lu, nics):
531   """Build a list of nic information tuples.
532
533   This list is suitable to be passed to _BuildInstanceHookEnv.
534
535   @type lu:  L{LogicalUnit}
536   @param lu: the logical unit on whose behalf we execute
537   @type nics: list of L{objects.NIC}
538   @param nics: list of nics to convert to hooks tuples
539
540   """
541   hooks_nics = []
542   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
543   for nic in nics:
544     ip = nic.ip
545     mac = nic.mac
546     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547     mode = filled_params[constants.NIC_MODE]
548     link = filled_params[constants.NIC_LINK]
549     hooks_nics.append((ip, mac, mode, link))
550   return hooks_nics
551
552 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553   """Builds instance related env variables for hooks from an object.
554
555   @type lu: L{LogicalUnit}
556   @param lu: the logical unit on whose behalf we execute
557   @type instance: L{objects.Instance}
558   @param instance: the instance for which we should build the
559       environment
560   @type override: dict
561   @param override: dictionary with key/values that will override
562       our values
563   @rtype: dict
564   @return: the hook environment dictionary
565
566   """
567   bep = lu.cfg.GetClusterInfo().FillBE(instance)
568   args = {
569     'name': instance.name,
570     'primary_node': instance.primary_node,
571     'secondary_nodes': instance.secondary_nodes,
572     'os_type': instance.os,
573     'status': instance.admin_up,
574     'memory': bep[constants.BE_MEMORY],
575     'vcpus': bep[constants.BE_VCPUS],
576     'nics': _PreBuildNICHooksList(lu, instance.nics),
577     'disk_template': instance.disk_template,
578     'disks': [(disk.size, disk.mode) for disk in instance.disks],
579   }
580   if override:
581     args.update(override)
582   return _BuildInstanceHookEnv(**args)
583
584
585 def _AdjustCandidatePool(lu):
586   """Adjust the candidate pool after node operations.
587
588   """
589   mod_list = lu.cfg.MaintainCandidatePool()
590   if mod_list:
591     lu.LogInfo("Promoted nodes to master candidate role: %s",
592                ", ".join(node.name for node in mod_list))
593     for name in mod_list:
594       lu.context.ReaddNode(name)
595   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
596   if mc_now > mc_max:
597     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
598                (mc_now, mc_max))
599
600
601 def _CheckNicsBridgesExist(lu, target_nics, target_node,
602                                profile=constants.PP_DEFAULT):
603   """Check that the brigdes needed by a list of nics exist.
604
605   """
606   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608                 for nic in target_nics]
609   brlist = [params[constants.NIC_LINK] for params in paramslist
610             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
611   if brlist:
612     result = lu.rpc.call_bridges_exist(target_node, brlist)
613     msg = result.RemoteFailMsg()
614     if msg:
615       raise errors.OpPrereqError("Error checking bridges on destination node"
616                                  " '%s': %s" % (target_node, msg))
617
618
619 def _CheckInstanceBridgesExist(lu, instance, node=None):
620   """Check that the brigdes needed by an instance exist.
621
622   """
623   if node is None:
624     node=instance.primary_node
625   _CheckNicsBridgesExist(lu, instance.nics, node)
626
627
628 class LUDestroyCluster(NoHooksLU):
629   """Logical unit for destroying the cluster.
630
631   """
632   _OP_REQP = []
633
634   def CheckPrereq(self):
635     """Check prerequisites.
636
637     This checks whether the cluster is empty.
638
639     Any errors are signalled by raising errors.OpPrereqError.
640
641     """
642     master = self.cfg.GetMasterNode()
643
644     nodelist = self.cfg.GetNodeList()
645     if len(nodelist) != 1 or nodelist[0] != master:
646       raise errors.OpPrereqError("There are still %d node(s) in"
647                                  " this cluster." % (len(nodelist) - 1))
648     instancelist = self.cfg.GetInstanceList()
649     if instancelist:
650       raise errors.OpPrereqError("There are still %d instance(s) in"
651                                  " this cluster." % len(instancelist))
652
653   def Exec(self, feedback_fn):
654     """Destroys the cluster.
655
656     """
657     master = self.cfg.GetMasterNode()
658     result = self.rpc.call_node_stop_master(master, False)
659     result.Raise()
660     if not result.data:
661       raise errors.OpExecError("Could not disable the master role")
662     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
663     utils.CreateBackup(priv_key)
664     utils.CreateBackup(pub_key)
665     return master
666
667
668 class LUVerifyCluster(LogicalUnit):
669   """Verifies the cluster status.
670
671   """
672   HPATH = "cluster-verify"
673   HTYPE = constants.HTYPE_CLUSTER
674   _OP_REQP = ["skip_checks"]
675   REQ_BGL = False
676
677   def ExpandNames(self):
678     self.needed_locks = {
679       locking.LEVEL_NODE: locking.ALL_SET,
680       locking.LEVEL_INSTANCE: locking.ALL_SET,
681     }
682     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
683
684   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
685                   node_result, feedback_fn, master_files,
686                   drbd_map, vg_name):
687     """Run multiple tests against a node.
688
689     Test list:
690
691       - compares ganeti version
692       - checks vg existance and size > 20G
693       - checks config file checksum
694       - checks ssh to other nodes
695
696     @type nodeinfo: L{objects.Node}
697     @param nodeinfo: the node to check
698     @param file_list: required list of files
699     @param local_cksum: dictionary of local files and their checksums
700     @param node_result: the results from the node
701     @param feedback_fn: function used to accumulate results
702     @param master_files: list of files that only masters should have
703     @param drbd_map: the useddrbd minors for this node, in
704         form of minor: (instance, must_exist) which correspond to instances
705         and their running status
706     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
707
708     """
709     node = nodeinfo.name
710
711     # main result, node_result should be a non-empty dict
712     if not node_result or not isinstance(node_result, dict):
713       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
714       return True
715
716     # compares ganeti version
717     local_version = constants.PROTOCOL_VERSION
718     remote_version = node_result.get('version', None)
719     if not (remote_version and isinstance(remote_version, (list, tuple)) and
720             len(remote_version) == 2):
721       feedback_fn("  - ERROR: connection to %s failed" % (node))
722       return True
723
724     if local_version != remote_version[0]:
725       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
726                   " node %s %s" % (local_version, node, remote_version[0]))
727       return True
728
729     # node seems compatible, we can actually try to look into its results
730
731     bad = False
732
733     # full package version
734     if constants.RELEASE_VERSION != remote_version[1]:
735       feedback_fn("  - WARNING: software version mismatch: master %s,"
736                   " node %s %s" %
737                   (constants.RELEASE_VERSION, node, remote_version[1]))
738
739     # checks vg existence and size > 20G
740     if vg_name is not None:
741       vglist = node_result.get(constants.NV_VGLIST, None)
742       if not vglist:
743         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
744                         (node,))
745         bad = True
746       else:
747         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
748                                               constants.MIN_VG_SIZE)
749         if vgstatus:
750           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
751           bad = True
752
753     # checks config file checksum
754
755     remote_cksum = node_result.get(constants.NV_FILELIST, None)
756     if not isinstance(remote_cksum, dict):
757       bad = True
758       feedback_fn("  - ERROR: node hasn't returned file checksum data")
759     else:
760       for file_name in file_list:
761         node_is_mc = nodeinfo.master_candidate
762         must_have_file = file_name not in master_files
763         if file_name not in remote_cksum:
764           if node_is_mc or must_have_file:
765             bad = True
766             feedback_fn("  - ERROR: file '%s' missing" % file_name)
767         elif remote_cksum[file_name] != local_cksum[file_name]:
768           if node_is_mc or must_have_file:
769             bad = True
770             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
771           else:
772             # not candidate and this is not a must-have file
773             bad = True
774             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
775                         " '%s'" % file_name)
776         else:
777           # all good, except non-master/non-must have combination
778           if not node_is_mc and not must_have_file:
779             feedback_fn("  - ERROR: file '%s' should not exist on non master"
780                         " candidates" % file_name)
781
782     # checks ssh to any
783
784     if constants.NV_NODELIST not in node_result:
785       bad = True
786       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
787     else:
788       if node_result[constants.NV_NODELIST]:
789         bad = True
790         for node in node_result[constants.NV_NODELIST]:
791           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
792                           (node, node_result[constants.NV_NODELIST][node]))
793
794     if constants.NV_NODENETTEST not in node_result:
795       bad = True
796       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
797     else:
798       if node_result[constants.NV_NODENETTEST]:
799         bad = True
800         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
801         for node in nlist:
802           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
803                           (node, node_result[constants.NV_NODENETTEST][node]))
804
805     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
806     if isinstance(hyp_result, dict):
807       for hv_name, hv_result in hyp_result.iteritems():
808         if hv_result is not None:
809           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
810                       (hv_name, hv_result))
811
812     # check used drbd list
813     if vg_name is not None:
814       used_minors = node_result.get(constants.NV_DRBDLIST, [])
815       if not isinstance(used_minors, (tuple, list)):
816         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
817                     str(used_minors))
818       else:
819         for minor, (iname, must_exist) in drbd_map.items():
820           if minor not in used_minors and must_exist:
821             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
822                         " not active" % (minor, iname))
823             bad = True
824         for minor in used_minors:
825           if minor not in drbd_map:
826             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
827                         minor)
828             bad = True
829
830     return bad
831
832   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
833                       node_instance, feedback_fn, n_offline):
834     """Verify an instance.
835
836     This function checks to see if the required block devices are
837     available on the instance's node.
838
839     """
840     bad = False
841
842     node_current = instanceconfig.primary_node
843
844     node_vol_should = {}
845     instanceconfig.MapLVsByNode(node_vol_should)
846
847     for node in node_vol_should:
848       if node in n_offline:
849         # ignore missing volumes on offline nodes
850         continue
851       for volume in node_vol_should[node]:
852         if node not in node_vol_is or volume not in node_vol_is[node]:
853           feedback_fn("  - ERROR: volume %s missing on node %s" %
854                           (volume, node))
855           bad = True
856
857     if instanceconfig.admin_up:
858       if ((node_current not in node_instance or
859           not instance in node_instance[node_current]) and
860           node_current not in n_offline):
861         feedback_fn("  - ERROR: instance %s not running on node %s" %
862                         (instance, node_current))
863         bad = True
864
865     for node in node_instance:
866       if (not node == node_current):
867         if instance in node_instance[node]:
868           feedback_fn("  - ERROR: instance %s should not run on node %s" %
869                           (instance, node))
870           bad = True
871
872     return bad
873
874   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
875     """Verify if there are any unknown volumes in the cluster.
876
877     The .os, .swap and backup volumes are ignored. All other volumes are
878     reported as unknown.
879
880     """
881     bad = False
882
883     for node in node_vol_is:
884       for volume in node_vol_is[node]:
885         if node not in node_vol_should or volume not in node_vol_should[node]:
886           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
887                       (volume, node))
888           bad = True
889     return bad
890
891   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
892     """Verify the list of running instances.
893
894     This checks what instances are running but unknown to the cluster.
895
896     """
897     bad = False
898     for node in node_instance:
899       for runninginstance in node_instance[node]:
900         if runninginstance not in instancelist:
901           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
902                           (runninginstance, node))
903           bad = True
904     return bad
905
906   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
907     """Verify N+1 Memory Resilience.
908
909     Check that if one single node dies we can still start all the instances it
910     was primary for.
911
912     """
913     bad = False
914
915     for node, nodeinfo in node_info.iteritems():
916       # This code checks that every node which is now listed as secondary has
917       # enough memory to host all instances it is supposed to should a single
918       # other node in the cluster fail.
919       # FIXME: not ready for failover to an arbitrary node
920       # FIXME: does not support file-backed instances
921       # WARNING: we currently take into account down instances as well as up
922       # ones, considering that even if they're down someone might want to start
923       # them even in the event of a node failure.
924       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
925         needed_mem = 0
926         for instance in instances:
927           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
928           if bep[constants.BE_AUTO_BALANCE]:
929             needed_mem += bep[constants.BE_MEMORY]
930         if nodeinfo['mfree'] < needed_mem:
931           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
932                       " failovers should node %s fail" % (node, prinode))
933           bad = True
934     return bad
935
936   def CheckPrereq(self):
937     """Check prerequisites.
938
939     Transform the list of checks we're going to skip into a set and check that
940     all its members are valid.
941
942     """
943     self.skip_set = frozenset(self.op.skip_checks)
944     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
945       raise errors.OpPrereqError("Invalid checks to be skipped specified")
946
947   def BuildHooksEnv(self):
948     """Build hooks env.
949
950     Cluster-Verify hooks just rone in the post phase and their failure makes
951     the output be logged in the verify output and the verification to fail.
952
953     """
954     all_nodes = self.cfg.GetNodeList()
955     env = {
956       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
957       }
958     for node in self.cfg.GetAllNodesInfo().values():
959       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
960
961     return env, [], all_nodes
962
963   def Exec(self, feedback_fn):
964     """Verify integrity of cluster, performing various test on nodes.
965
966     """
967     bad = False
968     feedback_fn("* Verifying global settings")
969     for msg in self.cfg.VerifyConfig():
970       feedback_fn("  - ERROR: %s" % msg)
971
972     vg_name = self.cfg.GetVGName()
973     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
974     nodelist = utils.NiceSort(self.cfg.GetNodeList())
975     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
976     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
977     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
978                         for iname in instancelist)
979     i_non_redundant = [] # Non redundant instances
980     i_non_a_balanced = [] # Non auto-balanced instances
981     n_offline = [] # List of offline nodes
982     n_drained = [] # List of nodes being drained
983     node_volume = {}
984     node_instance = {}
985     node_info = {}
986     instance_cfg = {}
987
988     # FIXME: verify OS list
989     # do local checksums
990     master_files = [constants.CLUSTER_CONF_FILE]
991
992     file_names = ssconf.SimpleStore().GetFileList()
993     file_names.append(constants.SSL_CERT_FILE)
994     file_names.append(constants.RAPI_CERT_FILE)
995     file_names.extend(master_files)
996
997     local_checksums = utils.FingerprintFiles(file_names)
998
999     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1000     node_verify_param = {
1001       constants.NV_FILELIST: file_names,
1002       constants.NV_NODELIST: [node.name for node in nodeinfo
1003                               if not node.offline],
1004       constants.NV_HYPERVISOR: hypervisors,
1005       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1006                                   node.secondary_ip) for node in nodeinfo
1007                                  if not node.offline],
1008       constants.NV_INSTANCELIST: hypervisors,
1009       constants.NV_VERSION: None,
1010       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1011       }
1012     if vg_name is not None:
1013       node_verify_param[constants.NV_VGLIST] = None
1014       node_verify_param[constants.NV_LVLIST] = vg_name
1015       node_verify_param[constants.NV_DRBDLIST] = None
1016     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1017                                            self.cfg.GetClusterName())
1018
1019     cluster = self.cfg.GetClusterInfo()
1020     master_node = self.cfg.GetMasterNode()
1021     all_drbd_map = self.cfg.ComputeDRBDMap()
1022
1023     for node_i in nodeinfo:
1024       node = node_i.name
1025       nresult = all_nvinfo[node].data
1026
1027       if node_i.offline:
1028         feedback_fn("* Skipping offline node %s" % (node,))
1029         n_offline.append(node)
1030         continue
1031
1032       if node == master_node:
1033         ntype = "master"
1034       elif node_i.master_candidate:
1035         ntype = "master candidate"
1036       elif node_i.drained:
1037         ntype = "drained"
1038         n_drained.append(node)
1039       else:
1040         ntype = "regular"
1041       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1042
1043       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1044         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1045         bad = True
1046         continue
1047
1048       node_drbd = {}
1049       for minor, instance in all_drbd_map[node].items():
1050         if instance not in instanceinfo:
1051           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1052                       instance)
1053           # ghost instance should not be running, but otherwise we
1054           # don't give double warnings (both ghost instance and
1055           # unallocated minor in use)
1056           node_drbd[minor] = (instance, False)
1057         else:
1058           instance = instanceinfo[instance]
1059           node_drbd[minor] = (instance.name, instance.admin_up)
1060       result = self._VerifyNode(node_i, file_names, local_checksums,
1061                                 nresult, feedback_fn, master_files,
1062                                 node_drbd, vg_name)
1063       bad = bad or result
1064
1065       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1066       if vg_name is None:
1067         node_volume[node] = {}
1068       elif isinstance(lvdata, basestring):
1069         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1070                     (node, utils.SafeEncode(lvdata)))
1071         bad = True
1072         node_volume[node] = {}
1073       elif not isinstance(lvdata, dict):
1074         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1075         bad = True
1076         continue
1077       else:
1078         node_volume[node] = lvdata
1079
1080       # node_instance
1081       idata = nresult.get(constants.NV_INSTANCELIST, None)
1082       if not isinstance(idata, list):
1083         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1084                     (node,))
1085         bad = True
1086         continue
1087
1088       node_instance[node] = idata
1089
1090       # node_info
1091       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1092       if not isinstance(nodeinfo, dict):
1093         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1094         bad = True
1095         continue
1096
1097       try:
1098         node_info[node] = {
1099           "mfree": int(nodeinfo['memory_free']),
1100           "pinst": [],
1101           "sinst": [],
1102           # dictionary holding all instances this node is secondary for,
1103           # grouped by their primary node. Each key is a cluster node, and each
1104           # value is a list of instances which have the key as primary and the
1105           # current node as secondary.  this is handy to calculate N+1 memory
1106           # availability if you can only failover from a primary to its
1107           # secondary.
1108           "sinst-by-pnode": {},
1109         }
1110         # FIXME: devise a free space model for file based instances as well
1111         if vg_name is not None:
1112           if (constants.NV_VGLIST not in nresult or
1113               vg_name not in nresult[constants.NV_VGLIST]):
1114             feedback_fn("  - ERROR: node %s didn't return data for the"
1115                         " volume group '%s' - it is either missing or broken" %
1116                         (node, vg_name))
1117             bad = True
1118             continue
1119           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1120       except (ValueError, KeyError):
1121         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1122                     " from node %s" % (node,))
1123         bad = True
1124         continue
1125
1126     node_vol_should = {}
1127
1128     for instance in instancelist:
1129       feedback_fn("* Verifying instance %s" % instance)
1130       inst_config = instanceinfo[instance]
1131       result =  self._VerifyInstance(instance, inst_config, node_volume,
1132                                      node_instance, feedback_fn, n_offline)
1133       bad = bad or result
1134       inst_nodes_offline = []
1135
1136       inst_config.MapLVsByNode(node_vol_should)
1137
1138       instance_cfg[instance] = inst_config
1139
1140       pnode = inst_config.primary_node
1141       if pnode in node_info:
1142         node_info[pnode]['pinst'].append(instance)
1143       elif pnode not in n_offline:
1144         feedback_fn("  - ERROR: instance %s, connection to primary node"
1145                     " %s failed" % (instance, pnode))
1146         bad = True
1147
1148       if pnode in n_offline:
1149         inst_nodes_offline.append(pnode)
1150
1151       # If the instance is non-redundant we cannot survive losing its primary
1152       # node, so we are not N+1 compliant. On the other hand we have no disk
1153       # templates with more than one secondary so that situation is not well
1154       # supported either.
1155       # FIXME: does not support file-backed instances
1156       if len(inst_config.secondary_nodes) == 0:
1157         i_non_redundant.append(instance)
1158       elif len(inst_config.secondary_nodes) > 1:
1159         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1160                     % instance)
1161
1162       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1163         i_non_a_balanced.append(instance)
1164
1165       for snode in inst_config.secondary_nodes:
1166         if snode in node_info:
1167           node_info[snode]['sinst'].append(instance)
1168           if pnode not in node_info[snode]['sinst-by-pnode']:
1169             node_info[snode]['sinst-by-pnode'][pnode] = []
1170           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1171         elif snode not in n_offline:
1172           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1173                       " %s failed" % (instance, snode))
1174           bad = True
1175         if snode in n_offline:
1176           inst_nodes_offline.append(snode)
1177
1178       if inst_nodes_offline:
1179         # warn that the instance lives on offline nodes, and set bad=True
1180         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1181                     ", ".join(inst_nodes_offline))
1182         bad = True
1183
1184     feedback_fn("* Verifying orphan volumes")
1185     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1186                                        feedback_fn)
1187     bad = bad or result
1188
1189     feedback_fn("* Verifying remaining instances")
1190     result = self._VerifyOrphanInstances(instancelist, node_instance,
1191                                          feedback_fn)
1192     bad = bad or result
1193
1194     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1195       feedback_fn("* Verifying N+1 Memory redundancy")
1196       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1197       bad = bad or result
1198
1199     feedback_fn("* Other Notes")
1200     if i_non_redundant:
1201       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1202                   % len(i_non_redundant))
1203
1204     if i_non_a_balanced:
1205       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1206                   % len(i_non_a_balanced))
1207
1208     if n_offline:
1209       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1210
1211     if n_drained:
1212       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1213
1214     return not bad
1215
1216   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1217     """Analize the post-hooks' result
1218
1219     This method analyses the hook result, handles it, and sends some
1220     nicely-formatted feedback back to the user.
1221
1222     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1223         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1224     @param hooks_results: the results of the multi-node hooks rpc call
1225     @param feedback_fn: function used send feedback back to the caller
1226     @param lu_result: previous Exec result
1227     @return: the new Exec result, based on the previous result
1228         and hook results
1229
1230     """
1231     # We only really run POST phase hooks, and are only interested in
1232     # their results
1233     if phase == constants.HOOKS_PHASE_POST:
1234       # Used to change hooks' output to proper indentation
1235       indent_re = re.compile('^', re.M)
1236       feedback_fn("* Hooks Results")
1237       if not hooks_results:
1238         feedback_fn("  - ERROR: general communication failure")
1239         lu_result = 1
1240       else:
1241         for node_name in hooks_results:
1242           show_node_header = True
1243           res = hooks_results[node_name]
1244           if res.failed or res.data is False or not isinstance(res.data, list):
1245             if res.offline:
1246               # no need to warn or set fail return value
1247               continue
1248             feedback_fn("    Communication failure in hooks execution")
1249             lu_result = 1
1250             continue
1251           for script, hkr, output in res.data:
1252             if hkr == constants.HKR_FAIL:
1253               # The node header is only shown once, if there are
1254               # failing hooks on that node
1255               if show_node_header:
1256                 feedback_fn("  Node %s:" % node_name)
1257                 show_node_header = False
1258               feedback_fn("    ERROR: Script %s failed, output:" % script)
1259               output = indent_re.sub('      ', output)
1260               feedback_fn("%s" % output)
1261               lu_result = 1
1262
1263       return lu_result
1264
1265
1266 class LUVerifyDisks(NoHooksLU):
1267   """Verifies the cluster disks status.
1268
1269   """
1270   _OP_REQP = []
1271   REQ_BGL = False
1272
1273   def ExpandNames(self):
1274     self.needed_locks = {
1275       locking.LEVEL_NODE: locking.ALL_SET,
1276       locking.LEVEL_INSTANCE: locking.ALL_SET,
1277     }
1278     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1279
1280   def CheckPrereq(self):
1281     """Check prerequisites.
1282
1283     This has no prerequisites.
1284
1285     """
1286     pass
1287
1288   def Exec(self, feedback_fn):
1289     """Verify integrity of cluster disks.
1290
1291     @rtype: tuple of three items
1292     @return: a tuple of (dict of node-to-node_error, list of instances
1293         which need activate-disks, dict of instance: (node, volume) for
1294         missing volumes
1295
1296     """
1297     result = res_nodes, res_instances, res_missing = {}, [], {}
1298
1299     vg_name = self.cfg.GetVGName()
1300     nodes = utils.NiceSort(self.cfg.GetNodeList())
1301     instances = [self.cfg.GetInstanceInfo(name)
1302                  for name in self.cfg.GetInstanceList()]
1303
1304     nv_dict = {}
1305     for inst in instances:
1306       inst_lvs = {}
1307       if (not inst.admin_up or
1308           inst.disk_template not in constants.DTS_NET_MIRROR):
1309         continue
1310       inst.MapLVsByNode(inst_lvs)
1311       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1312       for node, vol_list in inst_lvs.iteritems():
1313         for vol in vol_list:
1314           nv_dict[(node, vol)] = inst
1315
1316     if not nv_dict:
1317       return result
1318
1319     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1320
1321     to_act = set()
1322     for node in nodes:
1323       # node_volume
1324       node_res = node_lvs[node]
1325       if node_res.offline:
1326         continue
1327       msg = node_res.RemoteFailMsg()
1328       if msg:
1329         logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1330         res_nodes[node] = msg
1331         continue
1332
1333       lvs = node_res.payload
1334       for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1335         inst = nv_dict.pop((node, lv_name), None)
1336         if (not lv_online and inst is not None
1337             and inst.name not in res_instances):
1338           res_instances.append(inst.name)
1339
1340     # any leftover items in nv_dict are missing LVs, let's arrange the
1341     # data better
1342     for key, inst in nv_dict.iteritems():
1343       if inst.name not in res_missing:
1344         res_missing[inst.name] = []
1345       res_missing[inst.name].append(key)
1346
1347     return result
1348
1349
1350 class LURenameCluster(LogicalUnit):
1351   """Rename the cluster.
1352
1353   """
1354   HPATH = "cluster-rename"
1355   HTYPE = constants.HTYPE_CLUSTER
1356   _OP_REQP = ["name"]
1357
1358   def BuildHooksEnv(self):
1359     """Build hooks env.
1360
1361     """
1362     env = {
1363       "OP_TARGET": self.cfg.GetClusterName(),
1364       "NEW_NAME": self.op.name,
1365       }
1366     mn = self.cfg.GetMasterNode()
1367     return env, [mn], [mn]
1368
1369   def CheckPrereq(self):
1370     """Verify that the passed name is a valid one.
1371
1372     """
1373     hostname = utils.HostInfo(self.op.name)
1374
1375     new_name = hostname.name
1376     self.ip = new_ip = hostname.ip
1377     old_name = self.cfg.GetClusterName()
1378     old_ip = self.cfg.GetMasterIP()
1379     if new_name == old_name and new_ip == old_ip:
1380       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1381                                  " cluster has changed")
1382     if new_ip != old_ip:
1383       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1384         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1385                                    " reachable on the network. Aborting." %
1386                                    new_ip)
1387
1388     self.op.name = new_name
1389
1390   def Exec(self, feedback_fn):
1391     """Rename the cluster.
1392
1393     """
1394     clustername = self.op.name
1395     ip = self.ip
1396
1397     # shutdown the master IP
1398     master = self.cfg.GetMasterNode()
1399     result = self.rpc.call_node_stop_master(master, False)
1400     if result.failed or not result.data:
1401       raise errors.OpExecError("Could not disable the master role")
1402
1403     try:
1404       cluster = self.cfg.GetClusterInfo()
1405       cluster.cluster_name = clustername
1406       cluster.master_ip = ip
1407       self.cfg.Update(cluster)
1408
1409       # update the known hosts file
1410       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1411       node_list = self.cfg.GetNodeList()
1412       try:
1413         node_list.remove(master)
1414       except ValueError:
1415         pass
1416       result = self.rpc.call_upload_file(node_list,
1417                                          constants.SSH_KNOWN_HOSTS_FILE)
1418       for to_node, to_result in result.iteritems():
1419          msg = to_result.RemoteFailMsg()
1420          if msg:
1421            msg = ("Copy of file %s to node %s failed: %s" %
1422                    (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1423            self.proc.LogWarning(msg)
1424
1425     finally:
1426       result = self.rpc.call_node_start_master(master, False)
1427       if result.failed or not result.data:
1428         self.LogWarning("Could not re-enable the master role on"
1429                         " the master, please restart manually.")
1430
1431
1432 def _RecursiveCheckIfLVMBased(disk):
1433   """Check if the given disk or its children are lvm-based.
1434
1435   @type disk: L{objects.Disk}
1436   @param disk: the disk to check
1437   @rtype: booleean
1438   @return: boolean indicating whether a LD_LV dev_type was found or not
1439
1440   """
1441   if disk.children:
1442     for chdisk in disk.children:
1443       if _RecursiveCheckIfLVMBased(chdisk):
1444         return True
1445   return disk.dev_type == constants.LD_LV
1446
1447
1448 class LUSetClusterParams(LogicalUnit):
1449   """Change the parameters of the cluster.
1450
1451   """
1452   HPATH = "cluster-modify"
1453   HTYPE = constants.HTYPE_CLUSTER
1454   _OP_REQP = []
1455   REQ_BGL = False
1456
1457   def CheckArguments(self):
1458     """Check parameters
1459
1460     """
1461     if not hasattr(self.op, "candidate_pool_size"):
1462       self.op.candidate_pool_size = None
1463     if self.op.candidate_pool_size is not None:
1464       try:
1465         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1466       except (ValueError, TypeError), err:
1467         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1468                                    str(err))
1469       if self.op.candidate_pool_size < 1:
1470         raise errors.OpPrereqError("At least one master candidate needed")
1471
1472   def ExpandNames(self):
1473     # FIXME: in the future maybe other cluster params won't require checking on
1474     # all nodes to be modified.
1475     self.needed_locks = {
1476       locking.LEVEL_NODE: locking.ALL_SET,
1477     }
1478     self.share_locks[locking.LEVEL_NODE] = 1
1479
1480   def BuildHooksEnv(self):
1481     """Build hooks env.
1482
1483     """
1484     env = {
1485       "OP_TARGET": self.cfg.GetClusterName(),
1486       "NEW_VG_NAME": self.op.vg_name,
1487       }
1488     mn = self.cfg.GetMasterNode()
1489     return env, [mn], [mn]
1490
1491   def CheckPrereq(self):
1492     """Check prerequisites.
1493
1494     This checks whether the given params don't conflict and
1495     if the given volume group is valid.
1496
1497     """
1498     if self.op.vg_name is not None and not self.op.vg_name:
1499       instances = self.cfg.GetAllInstancesInfo().values()
1500       for inst in instances:
1501         for disk in inst.disks:
1502           if _RecursiveCheckIfLVMBased(disk):
1503             raise errors.OpPrereqError("Cannot disable lvm storage while"
1504                                        " lvm-based instances exist")
1505
1506     node_list = self.acquired_locks[locking.LEVEL_NODE]
1507
1508     # if vg_name not None, checks given volume group on all nodes
1509     if self.op.vg_name:
1510       vglist = self.rpc.call_vg_list(node_list)
1511       for node in node_list:
1512         msg = vglist[node].RemoteFailMsg()
1513         if msg:
1514           # ignoring down node
1515           self.LogWarning("Error while gathering data on node %s"
1516                           " (ignoring node): %s", node, msg)
1517           continue
1518         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1519                                               self.op.vg_name,
1520                                               constants.MIN_VG_SIZE)
1521         if vgstatus:
1522           raise errors.OpPrereqError("Error on node '%s': %s" %
1523                                      (node, vgstatus))
1524
1525     self.cluster = cluster = self.cfg.GetClusterInfo()
1526     # validate params changes
1527     if self.op.beparams:
1528       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1529       self.new_beparams = objects.FillDict(
1530         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1531
1532     if self.op.nicparams:
1533       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1534       self.new_nicparams = objects.FillDict(
1535         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1536       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1537
1538     # hypervisor list/parameters
1539     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1540     if self.op.hvparams:
1541       if not isinstance(self.op.hvparams, dict):
1542         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1543       for hv_name, hv_dict in self.op.hvparams.items():
1544         if hv_name not in self.new_hvparams:
1545           self.new_hvparams[hv_name] = hv_dict
1546         else:
1547           self.new_hvparams[hv_name].update(hv_dict)
1548
1549     if self.op.enabled_hypervisors is not None:
1550       self.hv_list = self.op.enabled_hypervisors
1551     else:
1552       self.hv_list = cluster.enabled_hypervisors
1553
1554     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1555       # either the enabled list has changed, or the parameters have, validate
1556       for hv_name, hv_params in self.new_hvparams.items():
1557         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1558             (self.op.enabled_hypervisors and
1559              hv_name in self.op.enabled_hypervisors)):
1560           # either this is a new hypervisor, or its parameters have changed
1561           hv_class = hypervisor.GetHypervisor(hv_name)
1562           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1563           hv_class.CheckParameterSyntax(hv_params)
1564           _CheckHVParams(self, node_list, hv_name, hv_params)
1565
1566   def Exec(self, feedback_fn):
1567     """Change the parameters of the cluster.
1568
1569     """
1570     if self.op.vg_name is not None:
1571       new_volume = self.op.vg_name
1572       if not new_volume:
1573         new_volume = None
1574       if new_volume != self.cfg.GetVGName():
1575         self.cfg.SetVGName(new_volume)
1576       else:
1577         feedback_fn("Cluster LVM configuration already in desired"
1578                     " state, not changing")
1579     if self.op.hvparams:
1580       self.cluster.hvparams = self.new_hvparams
1581     if self.op.enabled_hypervisors is not None:
1582       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1583     if self.op.beparams:
1584       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1585     if self.op.nicparams:
1586       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1587
1588     if self.op.candidate_pool_size is not None:
1589       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1590
1591     self.cfg.Update(self.cluster)
1592
1593     # we want to update nodes after the cluster so that if any errors
1594     # happen, we have recorded and saved the cluster info
1595     if self.op.candidate_pool_size is not None:
1596       _AdjustCandidatePool(self)
1597
1598
1599 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1600   """Distribute additional files which are part of the cluster configuration.
1601
1602   ConfigWriter takes care of distributing the config and ssconf files, but
1603   there are more files which should be distributed to all nodes. This function
1604   makes sure those are copied.
1605
1606   @param lu: calling logical unit
1607   @param additional_nodes: list of nodes not in the config to distribute to
1608
1609   """
1610   # 1. Gather target nodes
1611   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1612   dist_nodes = lu.cfg.GetNodeList()
1613   if additional_nodes is not None:
1614     dist_nodes.extend(additional_nodes)
1615   if myself.name in dist_nodes:
1616     dist_nodes.remove(myself.name)
1617   # 2. Gather files to distribute
1618   dist_files = set([constants.ETC_HOSTS,
1619                     constants.SSH_KNOWN_HOSTS_FILE,
1620                     constants.RAPI_CERT_FILE,
1621                     constants.RAPI_USERS_FILE,
1622                    ])
1623
1624   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1625   for hv_name in enabled_hypervisors:
1626     hv_class = hypervisor.GetHypervisor(hv_name)
1627     dist_files.update(hv_class.GetAncillaryFiles())
1628
1629   # 3. Perform the files upload
1630   for fname in dist_files:
1631     if os.path.exists(fname):
1632       result = lu.rpc.call_upload_file(dist_nodes, fname)
1633       for to_node, to_result in result.items():
1634          msg = to_result.RemoteFailMsg()
1635          if msg:
1636            msg = ("Copy of file %s to node %s failed: %s" %
1637                    (fname, to_node, msg))
1638            lu.proc.LogWarning(msg)
1639
1640
1641 class LURedistributeConfig(NoHooksLU):
1642   """Force the redistribution of cluster configuration.
1643
1644   This is a very simple LU.
1645
1646   """
1647   _OP_REQP = []
1648   REQ_BGL = False
1649
1650   def ExpandNames(self):
1651     self.needed_locks = {
1652       locking.LEVEL_NODE: locking.ALL_SET,
1653     }
1654     self.share_locks[locking.LEVEL_NODE] = 1
1655
1656   def CheckPrereq(self):
1657     """Check prerequisites.
1658
1659     """
1660
1661   def Exec(self, feedback_fn):
1662     """Redistribute the configuration.
1663
1664     """
1665     self.cfg.Update(self.cfg.GetClusterInfo())
1666     _RedistributeAncillaryFiles(self)
1667
1668
1669 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1670   """Sleep and poll for an instance's disk to sync.
1671
1672   """
1673   if not instance.disks:
1674     return True
1675
1676   if not oneshot:
1677     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1678
1679   node = instance.primary_node
1680
1681   for dev in instance.disks:
1682     lu.cfg.SetDiskID(dev, node)
1683
1684   retries = 0
1685   while True:
1686     max_time = 0
1687     done = True
1688     cumul_degraded = False
1689     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1690     msg = rstats.RemoteFailMsg()
1691     if msg:
1692       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1693       retries += 1
1694       if retries >= 10:
1695         raise errors.RemoteError("Can't contact node %s for mirror data,"
1696                                  " aborting." % node)
1697       time.sleep(6)
1698       continue
1699     rstats = rstats.payload
1700     retries = 0
1701     for i, mstat in enumerate(rstats):
1702       if mstat is None:
1703         lu.LogWarning("Can't compute data for node %s/%s",
1704                            node, instance.disks[i].iv_name)
1705         continue
1706       # we ignore the ldisk parameter
1707       perc_done, est_time, is_degraded, _ = mstat
1708       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1709       if perc_done is not None:
1710         done = False
1711         if est_time is not None:
1712           rem_time = "%d estimated seconds remaining" % est_time
1713           max_time = est_time
1714         else:
1715           rem_time = "no time estimate"
1716         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1717                         (instance.disks[i].iv_name, perc_done, rem_time))
1718     if done or oneshot:
1719       break
1720
1721     time.sleep(min(60, max_time))
1722
1723   if done:
1724     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1725   return not cumul_degraded
1726
1727
1728 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1729   """Check that mirrors are not degraded.
1730
1731   The ldisk parameter, if True, will change the test from the
1732   is_degraded attribute (which represents overall non-ok status for
1733   the device(s)) to the ldisk (representing the local storage status).
1734
1735   """
1736   lu.cfg.SetDiskID(dev, node)
1737   if ldisk:
1738     idx = 6
1739   else:
1740     idx = 5
1741
1742   result = True
1743   if on_primary or dev.AssembleOnSecondary():
1744     rstats = lu.rpc.call_blockdev_find(node, dev)
1745     msg = rstats.RemoteFailMsg()
1746     if msg:
1747       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1748       result = False
1749     elif not rstats.payload:
1750       lu.LogWarning("Can't find disk on node %s", node)
1751       result = False
1752     else:
1753       result = result and (not rstats.payload[idx])
1754   if dev.children:
1755     for child in dev.children:
1756       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1757
1758   return result
1759
1760
1761 class LUDiagnoseOS(NoHooksLU):
1762   """Logical unit for OS diagnose/query.
1763
1764   """
1765   _OP_REQP = ["output_fields", "names"]
1766   REQ_BGL = False
1767   _FIELDS_STATIC = utils.FieldSet()
1768   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1769
1770   def ExpandNames(self):
1771     if self.op.names:
1772       raise errors.OpPrereqError("Selective OS query not supported")
1773
1774     _CheckOutputFields(static=self._FIELDS_STATIC,
1775                        dynamic=self._FIELDS_DYNAMIC,
1776                        selected=self.op.output_fields)
1777
1778     # Lock all nodes, in shared mode
1779     # Temporary removal of locks, should be reverted later
1780     # TODO: reintroduce locks when they are lighter-weight
1781     self.needed_locks = {}
1782     #self.share_locks[locking.LEVEL_NODE] = 1
1783     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1784
1785   def CheckPrereq(self):
1786     """Check prerequisites.
1787
1788     """
1789
1790   @staticmethod
1791   def _DiagnoseByOS(node_list, rlist):
1792     """Remaps a per-node return list into an a per-os per-node dictionary
1793
1794     @param node_list: a list with the names of all nodes
1795     @param rlist: a map with node names as keys and OS objects as values
1796
1797     @rtype: dict
1798     @return: a dictionary with osnames as keys and as value another map, with
1799         nodes as keys and list of OS objects as values, eg::
1800
1801           {"debian-etch": {"node1": [<object>,...],
1802                            "node2": [<object>,]}
1803           }
1804
1805     """
1806     all_os = {}
1807     # we build here the list of nodes that didn't fail the RPC (at RPC
1808     # level), so that nodes with a non-responding node daemon don't
1809     # make all OSes invalid
1810     good_nodes = [node_name for node_name in rlist
1811                   if not rlist[node_name].failed]
1812     for node_name, nr in rlist.iteritems():
1813       if nr.failed or not nr.data:
1814         continue
1815       for os_obj in nr.data:
1816         if os_obj.name not in all_os:
1817           # build a list of nodes for this os containing empty lists
1818           # for each node in node_list
1819           all_os[os_obj.name] = {}
1820           for nname in good_nodes:
1821             all_os[os_obj.name][nname] = []
1822         all_os[os_obj.name][node_name].append(os_obj)
1823     return all_os
1824
1825   def Exec(self, feedback_fn):
1826     """Compute the list of OSes.
1827
1828     """
1829     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1830     node_data = self.rpc.call_os_diagnose(valid_nodes)
1831     if node_data == False:
1832       raise errors.OpExecError("Can't gather the list of OSes")
1833     pol = self._DiagnoseByOS(valid_nodes, node_data)
1834     output = []
1835     for os_name, os_data in pol.iteritems():
1836       row = []
1837       for field in self.op.output_fields:
1838         if field == "name":
1839           val = os_name
1840         elif field == "valid":
1841           val = utils.all([osl and osl[0] for osl in os_data.values()])
1842         elif field == "node_status":
1843           val = {}
1844           for node_name, nos_list in os_data.iteritems():
1845             val[node_name] = [(v.status, v.path) for v in nos_list]
1846         else:
1847           raise errors.ParameterError(field)
1848         row.append(val)
1849       output.append(row)
1850
1851     return output
1852
1853
1854 class LURemoveNode(LogicalUnit):
1855   """Logical unit for removing a node.
1856
1857   """
1858   HPATH = "node-remove"
1859   HTYPE = constants.HTYPE_NODE
1860   _OP_REQP = ["node_name"]
1861
1862   def BuildHooksEnv(self):
1863     """Build hooks env.
1864
1865     This doesn't run on the target node in the pre phase as a failed
1866     node would then be impossible to remove.
1867
1868     """
1869     env = {
1870       "OP_TARGET": self.op.node_name,
1871       "NODE_NAME": self.op.node_name,
1872       }
1873     all_nodes = self.cfg.GetNodeList()
1874     all_nodes.remove(self.op.node_name)
1875     return env, all_nodes, all_nodes
1876
1877   def CheckPrereq(self):
1878     """Check prerequisites.
1879
1880     This checks:
1881      - the node exists in the configuration
1882      - it does not have primary or secondary instances
1883      - it's not the master
1884
1885     Any errors are signalled by raising errors.OpPrereqError.
1886
1887     """
1888     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1889     if node is None:
1890       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1891
1892     instance_list = self.cfg.GetInstanceList()
1893
1894     masternode = self.cfg.GetMasterNode()
1895     if node.name == masternode:
1896       raise errors.OpPrereqError("Node is the master node,"
1897                                  " you need to failover first.")
1898
1899     for instance_name in instance_list:
1900       instance = self.cfg.GetInstanceInfo(instance_name)
1901       if node.name in instance.all_nodes:
1902         raise errors.OpPrereqError("Instance %s is still running on the node,"
1903                                    " please remove first." % instance_name)
1904     self.op.node_name = node.name
1905     self.node = node
1906
1907   def Exec(self, feedback_fn):
1908     """Removes the node from the cluster.
1909
1910     """
1911     node = self.node
1912     logging.info("Stopping the node daemon and removing configs from node %s",
1913                  node.name)
1914
1915     self.context.RemoveNode(node.name)
1916
1917     self.rpc.call_node_leave_cluster(node.name)
1918
1919     # Promote nodes to master candidate as needed
1920     _AdjustCandidatePool(self)
1921
1922
1923 class LUQueryNodes(NoHooksLU):
1924   """Logical unit for querying nodes.
1925
1926   """
1927   _OP_REQP = ["output_fields", "names", "use_locking"]
1928   REQ_BGL = False
1929   _FIELDS_DYNAMIC = utils.FieldSet(
1930     "dtotal", "dfree",
1931     "mtotal", "mnode", "mfree",
1932     "bootid",
1933     "ctotal", "cnodes", "csockets",
1934     )
1935
1936   _FIELDS_STATIC = utils.FieldSet(
1937     "name", "pinst_cnt", "sinst_cnt",
1938     "pinst_list", "sinst_list",
1939     "pip", "sip", "tags",
1940     "serial_no",
1941     "master_candidate",
1942     "master",
1943     "offline",
1944     "drained",
1945     )
1946
1947   def ExpandNames(self):
1948     _CheckOutputFields(static=self._FIELDS_STATIC,
1949                        dynamic=self._FIELDS_DYNAMIC,
1950                        selected=self.op.output_fields)
1951
1952     self.needed_locks = {}
1953     self.share_locks[locking.LEVEL_NODE] = 1
1954
1955     if self.op.names:
1956       self.wanted = _GetWantedNodes(self, self.op.names)
1957     else:
1958       self.wanted = locking.ALL_SET
1959
1960     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1961     self.do_locking = self.do_node_query and self.op.use_locking
1962     if self.do_locking:
1963       # if we don't request only static fields, we need to lock the nodes
1964       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1965
1966
1967   def CheckPrereq(self):
1968     """Check prerequisites.
1969
1970     """
1971     # The validation of the node list is done in the _GetWantedNodes,
1972     # if non empty, and if empty, there's no validation to do
1973     pass
1974
1975   def Exec(self, feedback_fn):
1976     """Computes the list of nodes and their attributes.
1977
1978     """
1979     all_info = self.cfg.GetAllNodesInfo()
1980     if self.do_locking:
1981       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1982     elif self.wanted != locking.ALL_SET:
1983       nodenames = self.wanted
1984       missing = set(nodenames).difference(all_info.keys())
1985       if missing:
1986         raise errors.OpExecError(
1987           "Some nodes were removed before retrieving their data: %s" % missing)
1988     else:
1989       nodenames = all_info.keys()
1990
1991     nodenames = utils.NiceSort(nodenames)
1992     nodelist = [all_info[name] for name in nodenames]
1993
1994     # begin data gathering
1995
1996     if self.do_node_query:
1997       live_data = {}
1998       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1999                                           self.cfg.GetHypervisorType())
2000       for name in nodenames:
2001         nodeinfo = node_data[name]
2002         if not nodeinfo.failed and nodeinfo.data:
2003           nodeinfo = nodeinfo.data
2004           fn = utils.TryConvert
2005           live_data[name] = {
2006             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2007             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2008             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2009             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2010             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2011             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2012             "bootid": nodeinfo.get('bootid', None),
2013             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2014             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2015             }
2016         else:
2017           live_data[name] = {}
2018     else:
2019       live_data = dict.fromkeys(nodenames, {})
2020
2021     node_to_primary = dict([(name, set()) for name in nodenames])
2022     node_to_secondary = dict([(name, set()) for name in nodenames])
2023
2024     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2025                              "sinst_cnt", "sinst_list"))
2026     if inst_fields & frozenset(self.op.output_fields):
2027       instancelist = self.cfg.GetInstanceList()
2028
2029       for instance_name in instancelist:
2030         inst = self.cfg.GetInstanceInfo(instance_name)
2031         if inst.primary_node in node_to_primary:
2032           node_to_primary[inst.primary_node].add(inst.name)
2033         for secnode in inst.secondary_nodes:
2034           if secnode in node_to_secondary:
2035             node_to_secondary[secnode].add(inst.name)
2036
2037     master_node = self.cfg.GetMasterNode()
2038
2039     # end data gathering
2040
2041     output = []
2042     for node in nodelist:
2043       node_output = []
2044       for field in self.op.output_fields:
2045         if field == "name":
2046           val = node.name
2047         elif field == "pinst_list":
2048           val = list(node_to_primary[node.name])
2049         elif field == "sinst_list":
2050           val = list(node_to_secondary[node.name])
2051         elif field == "pinst_cnt":
2052           val = len(node_to_primary[node.name])
2053         elif field == "sinst_cnt":
2054           val = len(node_to_secondary[node.name])
2055         elif field == "pip":
2056           val = node.primary_ip
2057         elif field == "sip":
2058           val = node.secondary_ip
2059         elif field == "tags":
2060           val = list(node.GetTags())
2061         elif field == "serial_no":
2062           val = node.serial_no
2063         elif field == "master_candidate":
2064           val = node.master_candidate
2065         elif field == "master":
2066           val = node.name == master_node
2067         elif field == "offline":
2068           val = node.offline
2069         elif field == "drained":
2070           val = node.drained
2071         elif self._FIELDS_DYNAMIC.Matches(field):
2072           val = live_data[node.name].get(field, None)
2073         else:
2074           raise errors.ParameterError(field)
2075         node_output.append(val)
2076       output.append(node_output)
2077
2078     return output
2079
2080
2081 class LUQueryNodeVolumes(NoHooksLU):
2082   """Logical unit for getting volumes on node(s).
2083
2084   """
2085   _OP_REQP = ["nodes", "output_fields"]
2086   REQ_BGL = False
2087   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2088   _FIELDS_STATIC = utils.FieldSet("node")
2089
2090   def ExpandNames(self):
2091     _CheckOutputFields(static=self._FIELDS_STATIC,
2092                        dynamic=self._FIELDS_DYNAMIC,
2093                        selected=self.op.output_fields)
2094
2095     self.needed_locks = {}
2096     self.share_locks[locking.LEVEL_NODE] = 1
2097     if not self.op.nodes:
2098       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2099     else:
2100       self.needed_locks[locking.LEVEL_NODE] = \
2101         _GetWantedNodes(self, self.op.nodes)
2102
2103   def CheckPrereq(self):
2104     """Check prerequisites.
2105
2106     This checks that the fields required are valid output fields.
2107
2108     """
2109     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2110
2111   def Exec(self, feedback_fn):
2112     """Computes the list of nodes and their attributes.
2113
2114     """
2115     nodenames = self.nodes
2116     volumes = self.rpc.call_node_volumes(nodenames)
2117
2118     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2119              in self.cfg.GetInstanceList()]
2120
2121     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2122
2123     output = []
2124     for node in nodenames:
2125       if node not in volumes or volumes[node].failed or not volumes[node].data:
2126         continue
2127
2128       node_vols = volumes[node].data[:]
2129       node_vols.sort(key=lambda vol: vol['dev'])
2130
2131       for vol in node_vols:
2132         node_output = []
2133         for field in self.op.output_fields:
2134           if field == "node":
2135             val = node
2136           elif field == "phys":
2137             val = vol['dev']
2138           elif field == "vg":
2139             val = vol['vg']
2140           elif field == "name":
2141             val = vol['name']
2142           elif field == "size":
2143             val = int(float(vol['size']))
2144           elif field == "instance":
2145             for inst in ilist:
2146               if node not in lv_by_node[inst]:
2147                 continue
2148               if vol['name'] in lv_by_node[inst][node]:
2149                 val = inst.name
2150                 break
2151             else:
2152               val = '-'
2153           else:
2154             raise errors.ParameterError(field)
2155           node_output.append(str(val))
2156
2157         output.append(node_output)
2158
2159     return output
2160
2161
2162 class LUAddNode(LogicalUnit):
2163   """Logical unit for adding node to the cluster.
2164
2165   """
2166   HPATH = "node-add"
2167   HTYPE = constants.HTYPE_NODE
2168   _OP_REQP = ["node_name"]
2169
2170   def BuildHooksEnv(self):
2171     """Build hooks env.
2172
2173     This will run on all nodes before, and on all nodes + the new node after.
2174
2175     """
2176     env = {
2177       "OP_TARGET": self.op.node_name,
2178       "NODE_NAME": self.op.node_name,
2179       "NODE_PIP": self.op.primary_ip,
2180       "NODE_SIP": self.op.secondary_ip,
2181       }
2182     nodes_0 = self.cfg.GetNodeList()
2183     nodes_1 = nodes_0 + [self.op.node_name, ]
2184     return env, nodes_0, nodes_1
2185
2186   def CheckPrereq(self):
2187     """Check prerequisites.
2188
2189     This checks:
2190      - the new node is not already in the config
2191      - it is resolvable
2192      - its parameters (single/dual homed) matches the cluster
2193
2194     Any errors are signalled by raising errors.OpPrereqError.
2195
2196     """
2197     node_name = self.op.node_name
2198     cfg = self.cfg
2199
2200     dns_data = utils.HostInfo(node_name)
2201
2202     node = dns_data.name
2203     primary_ip = self.op.primary_ip = dns_data.ip
2204     secondary_ip = getattr(self.op, "secondary_ip", None)
2205     if secondary_ip is None:
2206       secondary_ip = primary_ip
2207     if not utils.IsValidIP(secondary_ip):
2208       raise errors.OpPrereqError("Invalid secondary IP given")
2209     self.op.secondary_ip = secondary_ip
2210
2211     node_list = cfg.GetNodeList()
2212     if not self.op.readd and node in node_list:
2213       raise errors.OpPrereqError("Node %s is already in the configuration" %
2214                                  node)
2215     elif self.op.readd and node not in node_list:
2216       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2217
2218     for existing_node_name in node_list:
2219       existing_node = cfg.GetNodeInfo(existing_node_name)
2220
2221       if self.op.readd and node == existing_node_name:
2222         if (existing_node.primary_ip != primary_ip or
2223             existing_node.secondary_ip != secondary_ip):
2224           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2225                                      " address configuration as before")
2226         continue
2227
2228       if (existing_node.primary_ip == primary_ip or
2229           existing_node.secondary_ip == primary_ip or
2230           existing_node.primary_ip == secondary_ip or
2231           existing_node.secondary_ip == secondary_ip):
2232         raise errors.OpPrereqError("New node ip address(es) conflict with"
2233                                    " existing node %s" % existing_node.name)
2234
2235     # check that the type of the node (single versus dual homed) is the
2236     # same as for the master
2237     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2238     master_singlehomed = myself.secondary_ip == myself.primary_ip
2239     newbie_singlehomed = secondary_ip == primary_ip
2240     if master_singlehomed != newbie_singlehomed:
2241       if master_singlehomed:
2242         raise errors.OpPrereqError("The master has no private ip but the"
2243                                    " new node has one")
2244       else:
2245         raise errors.OpPrereqError("The master has a private ip but the"
2246                                    " new node doesn't have one")
2247
2248     # checks reachablity
2249     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2250       raise errors.OpPrereqError("Node not reachable by ping")
2251
2252     if not newbie_singlehomed:
2253       # check reachability from my secondary ip to newbie's secondary ip
2254       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2255                            source=myself.secondary_ip):
2256         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2257                                    " based ping to noded port")
2258
2259     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2260     mc_now, _ = self.cfg.GetMasterCandidateStats()
2261     master_candidate = mc_now < cp_size
2262
2263     self.new_node = objects.Node(name=node,
2264                                  primary_ip=primary_ip,
2265                                  secondary_ip=secondary_ip,
2266                                  master_candidate=master_candidate,
2267                                  offline=False, drained=False)
2268
2269   def Exec(self, feedback_fn):
2270     """Adds the new node to the cluster.
2271
2272     """
2273     new_node = self.new_node
2274     node = new_node.name
2275
2276     # check connectivity
2277     result = self.rpc.call_version([node])[node]
2278     result.Raise()
2279     if result.data:
2280       if constants.PROTOCOL_VERSION == result.data:
2281         logging.info("Communication to node %s fine, sw version %s match",
2282                      node, result.data)
2283       else:
2284         raise errors.OpExecError("Version mismatch master version %s,"
2285                                  " node version %s" %
2286                                  (constants.PROTOCOL_VERSION, result.data))
2287     else:
2288       raise errors.OpExecError("Cannot get version from the new node")
2289
2290     # setup ssh on node
2291     logging.info("Copy ssh key to node %s", node)
2292     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2293     keyarray = []
2294     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2295                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2296                 priv_key, pub_key]
2297
2298     for i in keyfiles:
2299       f = open(i, 'r')
2300       try:
2301         keyarray.append(f.read())
2302       finally:
2303         f.close()
2304
2305     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2306                                     keyarray[2],
2307                                     keyarray[3], keyarray[4], keyarray[5])
2308
2309     msg = result.RemoteFailMsg()
2310     if msg:
2311       raise errors.OpExecError("Cannot transfer ssh keys to the"
2312                                " new node: %s" % msg)
2313
2314     # Add node to our /etc/hosts, and add key to known_hosts
2315     if self.cfg.GetClusterInfo().modify_etc_hosts:
2316       utils.AddHostToEtcHosts(new_node.name)
2317
2318     if new_node.secondary_ip != new_node.primary_ip:
2319       result = self.rpc.call_node_has_ip_address(new_node.name,
2320                                                  new_node.secondary_ip)
2321       if result.failed or not result.data:
2322         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2323                                  " you gave (%s). Please fix and re-run this"
2324                                  " command." % new_node.secondary_ip)
2325
2326     node_verify_list = [self.cfg.GetMasterNode()]
2327     node_verify_param = {
2328       'nodelist': [node],
2329       # TODO: do a node-net-test as well?
2330     }
2331
2332     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2333                                        self.cfg.GetClusterName())
2334     for verifier in node_verify_list:
2335       if result[verifier].failed or not result[verifier].data:
2336         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2337                                  " for remote verification" % verifier)
2338       if result[verifier].data['nodelist']:
2339         for failed in result[verifier].data['nodelist']:
2340           feedback_fn("ssh/hostname verification failed %s -> %s" %
2341                       (verifier, result[verifier].data['nodelist'][failed]))
2342         raise errors.OpExecError("ssh/hostname verification failed.")
2343
2344     if self.op.readd:
2345       _RedistributeAncillaryFiles(self)
2346       self.context.ReaddNode(new_node)
2347     else:
2348       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2349       self.context.AddNode(new_node)
2350
2351
2352 class LUSetNodeParams(LogicalUnit):
2353   """Modifies the parameters of a node.
2354
2355   """
2356   HPATH = "node-modify"
2357   HTYPE = constants.HTYPE_NODE
2358   _OP_REQP = ["node_name"]
2359   REQ_BGL = False
2360
2361   def CheckArguments(self):
2362     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2363     if node_name is None:
2364       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2365     self.op.node_name = node_name
2366     _CheckBooleanOpField(self.op, 'master_candidate')
2367     _CheckBooleanOpField(self.op, 'offline')
2368     _CheckBooleanOpField(self.op, 'drained')
2369     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2370     if all_mods.count(None) == 3:
2371       raise errors.OpPrereqError("Please pass at least one modification")
2372     if all_mods.count(True) > 1:
2373       raise errors.OpPrereqError("Can't set the node into more than one"
2374                                  " state at the same time")
2375
2376   def ExpandNames(self):
2377     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2378
2379   def BuildHooksEnv(self):
2380     """Build hooks env.
2381
2382     This runs on the master node.
2383
2384     """
2385     env = {
2386       "OP_TARGET": self.op.node_name,
2387       "MASTER_CANDIDATE": str(self.op.master_candidate),
2388       "OFFLINE": str(self.op.offline),
2389       "DRAINED": str(self.op.drained),
2390       }
2391     nl = [self.cfg.GetMasterNode(),
2392           self.op.node_name]
2393     return env, nl, nl
2394
2395   def CheckPrereq(self):
2396     """Check prerequisites.
2397
2398     This only checks the instance list against the existing names.
2399
2400     """
2401     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2402
2403     if ((self.op.master_candidate == False or self.op.offline == True or
2404          self.op.drained == True) and node.master_candidate):
2405       # we will demote the node from master_candidate
2406       if self.op.node_name == self.cfg.GetMasterNode():
2407         raise errors.OpPrereqError("The master node has to be a"
2408                                    " master candidate, online and not drained")
2409       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2410       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2411       if num_candidates <= cp_size:
2412         msg = ("Not enough master candidates (desired"
2413                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2414         if self.op.force:
2415           self.LogWarning(msg)
2416         else:
2417           raise errors.OpPrereqError(msg)
2418
2419     if (self.op.master_candidate == True and
2420         ((node.offline and not self.op.offline == False) or
2421          (node.drained and not self.op.drained == False))):
2422       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2423                                  " to master_candidate" % node.name)
2424
2425     return
2426
2427   def Exec(self, feedback_fn):
2428     """Modifies a node.
2429
2430     """
2431     node = self.node
2432
2433     result = []
2434     changed_mc = False
2435
2436     if self.op.offline is not None:
2437       node.offline = self.op.offline
2438       result.append(("offline", str(self.op.offline)))
2439       if self.op.offline == True:
2440         if node.master_candidate:
2441           node.master_candidate = False
2442           changed_mc = True
2443           result.append(("master_candidate", "auto-demotion due to offline"))
2444         if node.drained:
2445           node.drained = False
2446           result.append(("drained", "clear drained status due to offline"))
2447
2448     if self.op.master_candidate is not None:
2449       node.master_candidate = self.op.master_candidate
2450       changed_mc = True
2451       result.append(("master_candidate", str(self.op.master_candidate)))
2452       if self.op.master_candidate == False:
2453         rrc = self.rpc.call_node_demote_from_mc(node.name)
2454         msg = rrc.RemoteFailMsg()
2455         if msg:
2456           self.LogWarning("Node failed to demote itself: %s" % msg)
2457
2458     if self.op.drained is not None:
2459       node.drained = self.op.drained
2460       result.append(("drained", str(self.op.drained)))
2461       if self.op.drained == True:
2462         if node.master_candidate:
2463           node.master_candidate = False
2464           changed_mc = True
2465           result.append(("master_candidate", "auto-demotion due to drain"))
2466         if node.offline:
2467           node.offline = False
2468           result.append(("offline", "clear offline status due to drain"))
2469
2470     # this will trigger configuration file update, if needed
2471     self.cfg.Update(node)
2472     # this will trigger job queue propagation or cleanup
2473     if changed_mc:
2474       self.context.ReaddNode(node)
2475
2476     return result
2477
2478
2479 class LUPowercycleNode(NoHooksLU):
2480   """Powercycles a node.
2481
2482   """
2483   _OP_REQP = ["node_name", "force"]
2484   REQ_BGL = False
2485
2486   def CheckArguments(self):
2487     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2488     if node_name is None:
2489       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2490     self.op.node_name = node_name
2491     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2492       raise errors.OpPrereqError("The node is the master and the force"
2493                                  " parameter was not set")
2494
2495   def ExpandNames(self):
2496     """Locking for PowercycleNode.
2497
2498     This is a last-resource option and shouldn't block on other
2499     jobs. Therefore, we grab no locks.
2500
2501     """
2502     self.needed_locks = {}
2503
2504   def CheckPrereq(self):
2505     """Check prerequisites.
2506
2507     This LU has no prereqs.
2508
2509     """
2510     pass
2511
2512   def Exec(self, feedback_fn):
2513     """Reboots a node.
2514
2515     """
2516     result = self.rpc.call_node_powercycle(self.op.node_name,
2517                                            self.cfg.GetHypervisorType())
2518     msg = result.RemoteFailMsg()
2519     if msg:
2520       raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2521     return result.payload
2522
2523
2524 class LUQueryClusterInfo(NoHooksLU):
2525   """Query cluster configuration.
2526
2527   """
2528   _OP_REQP = []
2529   REQ_BGL = False
2530
2531   def ExpandNames(self):
2532     self.needed_locks = {}
2533
2534   def CheckPrereq(self):
2535     """No prerequsites needed for this LU.
2536
2537     """
2538     pass
2539
2540   def Exec(self, feedback_fn):
2541     """Return cluster config.
2542
2543     """
2544     cluster = self.cfg.GetClusterInfo()
2545     result = {
2546       "software_version": constants.RELEASE_VERSION,
2547       "protocol_version": constants.PROTOCOL_VERSION,
2548       "config_version": constants.CONFIG_VERSION,
2549       "os_api_version": constants.OS_API_VERSION,
2550       "export_version": constants.EXPORT_VERSION,
2551       "architecture": (platform.architecture()[0], platform.machine()),
2552       "name": cluster.cluster_name,
2553       "master": cluster.master_node,
2554       "default_hypervisor": cluster.default_hypervisor,
2555       "enabled_hypervisors": cluster.enabled_hypervisors,
2556       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2557                         for hypervisor in cluster.enabled_hypervisors]),
2558       "beparams": cluster.beparams,
2559       "nicparams": cluster.nicparams,
2560       "candidate_pool_size": cluster.candidate_pool_size,
2561       "master_netdev": cluster.master_netdev,
2562       "volume_group_name": cluster.volume_group_name,
2563       "file_storage_dir": cluster.file_storage_dir,
2564       }
2565
2566     return result
2567
2568
2569 class LUQueryConfigValues(NoHooksLU):
2570   """Return configuration values.
2571
2572   """
2573   _OP_REQP = []
2574   REQ_BGL = False
2575   _FIELDS_DYNAMIC = utils.FieldSet()
2576   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2577
2578   def ExpandNames(self):
2579     self.needed_locks = {}
2580
2581     _CheckOutputFields(static=self._FIELDS_STATIC,
2582                        dynamic=self._FIELDS_DYNAMIC,
2583                        selected=self.op.output_fields)
2584
2585   def CheckPrereq(self):
2586     """No prerequisites.
2587
2588     """
2589     pass
2590
2591   def Exec(self, feedback_fn):
2592     """Dump a representation of the cluster config to the standard output.
2593
2594     """
2595     values = []
2596     for field in self.op.output_fields:
2597       if field == "cluster_name":
2598         entry = self.cfg.GetClusterName()
2599       elif field == "master_node":
2600         entry = self.cfg.GetMasterNode()
2601       elif field == "drain_flag":
2602         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2603       else:
2604         raise errors.ParameterError(field)
2605       values.append(entry)
2606     return values
2607
2608
2609 class LUActivateInstanceDisks(NoHooksLU):
2610   """Bring up an instance's disks.
2611
2612   """
2613   _OP_REQP = ["instance_name"]
2614   REQ_BGL = False
2615
2616   def ExpandNames(self):
2617     self._ExpandAndLockInstance()
2618     self.needed_locks[locking.LEVEL_NODE] = []
2619     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2620
2621   def DeclareLocks(self, level):
2622     if level == locking.LEVEL_NODE:
2623       self._LockInstancesNodes()
2624
2625   def CheckPrereq(self):
2626     """Check prerequisites.
2627
2628     This checks that the instance is in the cluster.
2629
2630     """
2631     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2632     assert self.instance is not None, \
2633       "Cannot retrieve locked instance %s" % self.op.instance_name
2634     _CheckNodeOnline(self, self.instance.primary_node)
2635
2636   def Exec(self, feedback_fn):
2637     """Activate the disks.
2638
2639     """
2640     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2641     if not disks_ok:
2642       raise errors.OpExecError("Cannot activate block devices")
2643
2644     return disks_info
2645
2646
2647 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2648   """Prepare the block devices for an instance.
2649
2650   This sets up the block devices on all nodes.
2651
2652   @type lu: L{LogicalUnit}
2653   @param lu: the logical unit on whose behalf we execute
2654   @type instance: L{objects.Instance}
2655   @param instance: the instance for whose disks we assemble
2656   @type ignore_secondaries: boolean
2657   @param ignore_secondaries: if true, errors on secondary nodes
2658       won't result in an error return from the function
2659   @return: False if the operation failed, otherwise a list of
2660       (host, instance_visible_name, node_visible_name)
2661       with the mapping from node devices to instance devices
2662
2663   """
2664   device_info = []
2665   disks_ok = True
2666   iname = instance.name
2667   # With the two passes mechanism we try to reduce the window of
2668   # opportunity for the race condition of switching DRBD to primary
2669   # before handshaking occured, but we do not eliminate it
2670
2671   # The proper fix would be to wait (with some limits) until the
2672   # connection has been made and drbd transitions from WFConnection
2673   # into any other network-connected state (Connected, SyncTarget,
2674   # SyncSource, etc.)
2675
2676   # 1st pass, assemble on all nodes in secondary mode
2677   for inst_disk in instance.disks:
2678     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2679       lu.cfg.SetDiskID(node_disk, node)
2680       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2681       msg = result.RemoteFailMsg()
2682       if msg:
2683         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2684                            " (is_primary=False, pass=1): %s",
2685                            inst_disk.iv_name, node, msg)
2686         if not ignore_secondaries:
2687           disks_ok = False
2688
2689   # FIXME: race condition on drbd migration to primary
2690
2691   # 2nd pass, do only the primary node
2692   for inst_disk in instance.disks:
2693     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2694       if node != instance.primary_node:
2695         continue
2696       lu.cfg.SetDiskID(node_disk, node)
2697       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2698       msg = result.RemoteFailMsg()
2699       if msg:
2700         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2701                            " (is_primary=True, pass=2): %s",
2702                            inst_disk.iv_name, node, msg)
2703         disks_ok = False
2704     device_info.append((instance.primary_node, inst_disk.iv_name,
2705                         result.payload))
2706
2707   # leave the disks configured for the primary node
2708   # this is a workaround that would be fixed better by
2709   # improving the logical/physical id handling
2710   for disk in instance.disks:
2711     lu.cfg.SetDiskID(disk, instance.primary_node)
2712
2713   return disks_ok, device_info
2714
2715
2716 def _StartInstanceDisks(lu, instance, force):
2717   """Start the disks of an instance.
2718
2719   """
2720   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2721                                            ignore_secondaries=force)
2722   if not disks_ok:
2723     _ShutdownInstanceDisks(lu, instance)
2724     if force is not None and not force:
2725       lu.proc.LogWarning("", hint="If the message above refers to a"
2726                          " secondary node,"
2727                          " you can retry the operation using '--force'.")
2728     raise errors.OpExecError("Disk consistency error")
2729
2730
2731 class LUDeactivateInstanceDisks(NoHooksLU):
2732   """Shutdown an instance's disks.
2733
2734   """
2735   _OP_REQP = ["instance_name"]
2736   REQ_BGL = False
2737
2738   def ExpandNames(self):
2739     self._ExpandAndLockInstance()
2740     self.needed_locks[locking.LEVEL_NODE] = []
2741     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2742
2743   def DeclareLocks(self, level):
2744     if level == locking.LEVEL_NODE:
2745       self._LockInstancesNodes()
2746
2747   def CheckPrereq(self):
2748     """Check prerequisites.
2749
2750     This checks that the instance is in the cluster.
2751
2752     """
2753     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2754     assert self.instance is not None, \
2755       "Cannot retrieve locked instance %s" % self.op.instance_name
2756
2757   def Exec(self, feedback_fn):
2758     """Deactivate the disks
2759
2760     """
2761     instance = self.instance
2762     _SafeShutdownInstanceDisks(self, instance)
2763
2764
2765 def _SafeShutdownInstanceDisks(lu, instance):
2766   """Shutdown block devices of an instance.
2767
2768   This function checks if an instance is running, before calling
2769   _ShutdownInstanceDisks.
2770
2771   """
2772   pnode = instance.primary_node
2773   ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])
2774   ins_l = ins_l[pnode]
2775   msg = ins_l.RemoteFailMsg()
2776   if msg:
2777     raise errors.OpExecError("Can't contact node %s: %s" % (pnode, msg))
2778
2779   if instance.name in ins_l.payload:
2780     raise errors.OpExecError("Instance is running, can't shutdown"
2781                              " block devices.")
2782
2783   _ShutdownInstanceDisks(lu, instance)
2784
2785
2786 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2787   """Shutdown block devices of an instance.
2788
2789   This does the shutdown on all nodes of the instance.
2790
2791   If the ignore_primary is false, errors on the primary node are
2792   ignored.
2793
2794   """
2795   all_result = True
2796   for disk in instance.disks:
2797     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2798       lu.cfg.SetDiskID(top_disk, node)
2799       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2800       msg = result.RemoteFailMsg()
2801       if msg:
2802         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2803                       disk.iv_name, node, msg)
2804         if not ignore_primary or node != instance.primary_node:
2805           all_result = False
2806   return all_result
2807
2808
2809 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2810   """Checks if a node has enough free memory.
2811
2812   This function check if a given node has the needed amount of free
2813   memory. In case the node has less memory or we cannot get the
2814   information from the node, this function raise an OpPrereqError
2815   exception.
2816
2817   @type lu: C{LogicalUnit}
2818   @param lu: a logical unit from which we get configuration data
2819   @type node: C{str}
2820   @param node: the node to check
2821   @type reason: C{str}
2822   @param reason: string to use in the error message
2823   @type requested: C{int}
2824   @param requested: the amount of memory in MiB to check for
2825   @type hypervisor_name: C{str}
2826   @param hypervisor_name: the hypervisor to ask for memory stats
2827   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2828       we cannot check the node
2829
2830   """
2831   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2832   nodeinfo[node].Raise()
2833   free_mem = nodeinfo[node].data.get('memory_free')
2834   if not isinstance(free_mem, int):
2835     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2836                              " was '%s'" % (node, free_mem))
2837   if requested > free_mem:
2838     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2839                              " needed %s MiB, available %s MiB" %
2840                              (node, reason, requested, free_mem))
2841
2842
2843 class LUStartupInstance(LogicalUnit):
2844   """Starts an instance.
2845
2846   """
2847   HPATH = "instance-start"
2848   HTYPE = constants.HTYPE_INSTANCE
2849   _OP_REQP = ["instance_name", "force"]
2850   REQ_BGL = False
2851
2852   def ExpandNames(self):
2853     self._ExpandAndLockInstance()
2854
2855   def BuildHooksEnv(self):
2856     """Build hooks env.
2857
2858     This runs on master, primary and secondary nodes of the instance.
2859
2860     """
2861     env = {
2862       "FORCE": self.op.force,
2863       }
2864     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2865     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2866     return env, nl, nl
2867
2868   def CheckPrereq(self):
2869     """Check prerequisites.
2870
2871     This checks that the instance is in the cluster.
2872
2873     """
2874     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2875     assert self.instance is not None, \
2876       "Cannot retrieve locked instance %s" % self.op.instance_name
2877
2878     # extra beparams
2879     self.beparams = getattr(self.op, "beparams", {})
2880     if self.beparams:
2881       if not isinstance(self.beparams, dict):
2882         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2883                                    " dict" % (type(self.beparams), ))
2884       # fill the beparams dict
2885       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2886       self.op.beparams = self.beparams
2887
2888     # extra hvparams
2889     self.hvparams = getattr(self.op, "hvparams", {})
2890     if self.hvparams:
2891       if not isinstance(self.hvparams, dict):
2892         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2893                                    " dict" % (type(self.hvparams), ))
2894
2895       # check hypervisor parameter syntax (locally)
2896       cluster = self.cfg.GetClusterInfo()
2897       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2898       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2899                                     instance.hvparams)
2900       filled_hvp.update(self.hvparams)
2901       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2902       hv_type.CheckParameterSyntax(filled_hvp)
2903       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2904       self.op.hvparams = self.hvparams
2905
2906     _CheckNodeOnline(self, instance.primary_node)
2907
2908     bep = self.cfg.GetClusterInfo().FillBE(instance)
2909     # check bridges existance
2910     _CheckInstanceBridgesExist(self, instance)
2911
2912     remote_info = self.rpc.call_instance_info(instance.primary_node,
2913                                               instance.name,
2914                                               instance.hypervisor)
2915     msg = remote_info.RemoteFailMsg()
2916     if msg:
2917       raise errors.OpPrereqError("Error checking node %s: %s" %
2918                                  (instance.primary_node, msg))
2919     if not remote_info.payload: # not running already
2920       _CheckNodeFreeMemory(self, instance.primary_node,
2921                            "starting instance %s" % instance.name,
2922                            bep[constants.BE_MEMORY], instance.hypervisor)
2923
2924   def Exec(self, feedback_fn):
2925     """Start the instance.
2926
2927     """
2928     instance = self.instance
2929     force = self.op.force
2930
2931     self.cfg.MarkInstanceUp(instance.name)
2932
2933     node_current = instance.primary_node
2934
2935     _StartInstanceDisks(self, instance, force)
2936
2937     result = self.rpc.call_instance_start(node_current, instance,
2938                                           self.hvparams, self.beparams)
2939     msg = result.RemoteFailMsg()
2940     if msg:
2941       _ShutdownInstanceDisks(self, instance)
2942       raise errors.OpExecError("Could not start instance: %s" % msg)
2943
2944
2945 class LURebootInstance(LogicalUnit):
2946   """Reboot an instance.
2947
2948   """
2949   HPATH = "instance-reboot"
2950   HTYPE = constants.HTYPE_INSTANCE
2951   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2952   REQ_BGL = False
2953
2954   def ExpandNames(self):
2955     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2956                                    constants.INSTANCE_REBOOT_HARD,
2957                                    constants.INSTANCE_REBOOT_FULL]:
2958       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2959                                   (constants.INSTANCE_REBOOT_SOFT,
2960                                    constants.INSTANCE_REBOOT_HARD,
2961                                    constants.INSTANCE_REBOOT_FULL))
2962     self._ExpandAndLockInstance()
2963
2964   def BuildHooksEnv(self):
2965     """Build hooks env.
2966
2967     This runs on master, primary and secondary nodes of the instance.
2968
2969     """
2970     env = {
2971       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2972       "REBOOT_TYPE": self.op.reboot_type,
2973       }
2974     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2975     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2976     return env, nl, nl
2977
2978   def CheckPrereq(self):
2979     """Check prerequisites.
2980
2981     This checks that the instance is in the cluster.
2982
2983     """
2984     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2985     assert self.instance is not None, \
2986       "Cannot retrieve locked instance %s" % self.op.instance_name
2987
2988     _CheckNodeOnline(self, instance.primary_node)
2989
2990     # check bridges existance
2991     _CheckInstanceBridgesExist(self, instance)
2992
2993   def Exec(self, feedback_fn):
2994     """Reboot the instance.
2995
2996     """
2997     instance = self.instance
2998     ignore_secondaries = self.op.ignore_secondaries
2999     reboot_type = self.op.reboot_type
3000
3001     node_current = instance.primary_node
3002
3003     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3004                        constants.INSTANCE_REBOOT_HARD]:
3005       for disk in instance.disks:
3006         self.cfg.SetDiskID(disk, node_current)
3007       result = self.rpc.call_instance_reboot(node_current, instance,
3008                                              reboot_type)
3009       msg = result.RemoteFailMsg()
3010       if msg:
3011         raise errors.OpExecError("Could not reboot instance: %s" % msg)
3012     else:
3013       result = self.rpc.call_instance_shutdown(node_current, instance)
3014       msg = result.RemoteFailMsg()
3015       if msg:
3016         raise errors.OpExecError("Could not shutdown instance for"
3017                                  " full reboot: %s" % msg)
3018       _ShutdownInstanceDisks(self, instance)
3019       _StartInstanceDisks(self, instance, ignore_secondaries)
3020       result = self.rpc.call_instance_start(node_current, instance, None, None)
3021       msg = result.RemoteFailMsg()
3022       if msg:
3023         _ShutdownInstanceDisks(self, instance)
3024         raise errors.OpExecError("Could not start instance for"
3025                                  " full reboot: %s" % msg)
3026
3027     self.cfg.MarkInstanceUp(instance.name)
3028
3029
3030 class LUShutdownInstance(LogicalUnit):
3031   """Shutdown an instance.
3032
3033   """
3034   HPATH = "instance-stop"
3035   HTYPE = constants.HTYPE_INSTANCE
3036   _OP_REQP = ["instance_name"]
3037   REQ_BGL = False
3038
3039   def ExpandNames(self):
3040     self._ExpandAndLockInstance()
3041
3042   def BuildHooksEnv(self):
3043     """Build hooks env.
3044
3045     This runs on master, primary and secondary nodes of the instance.
3046
3047     """
3048     env = _BuildInstanceHookEnvByObject(self, self.instance)
3049     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3050     return env, nl, nl
3051
3052   def CheckPrereq(self):
3053     """Check prerequisites.
3054
3055     This checks that the instance is in the cluster.
3056
3057     """
3058     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3059     assert self.instance is not None, \
3060       "Cannot retrieve locked instance %s" % self.op.instance_name
3061     _CheckNodeOnline(self, self.instance.primary_node)
3062
3063   def Exec(self, feedback_fn):
3064     """Shutdown the instance.
3065
3066     """
3067     instance = self.instance
3068     node_current = instance.primary_node
3069     self.cfg.MarkInstanceDown(instance.name)
3070     result = self.rpc.call_instance_shutdown(node_current, instance)
3071     msg = result.RemoteFailMsg()
3072     if msg:
3073       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3074
3075     _ShutdownInstanceDisks(self, instance)
3076
3077
3078 class LUReinstallInstance(LogicalUnit):
3079   """Reinstall an instance.
3080
3081   """
3082   HPATH = "instance-reinstall"
3083   HTYPE = constants.HTYPE_INSTANCE
3084   _OP_REQP = ["instance_name"]
3085   REQ_BGL = False
3086
3087   def ExpandNames(self):
3088     self._ExpandAndLockInstance()
3089
3090   def BuildHooksEnv(self):
3091     """Build hooks env.
3092
3093     This runs on master, primary and secondary nodes of the instance.
3094
3095     """
3096     env = _BuildInstanceHookEnvByObject(self, self.instance)
3097     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3098     return env, nl, nl
3099
3100   def CheckPrereq(self):
3101     """Check prerequisites.
3102
3103     This checks that the instance is in the cluster and is not running.
3104
3105     """
3106     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3107     assert instance is not None, \
3108       "Cannot retrieve locked instance %s" % self.op.instance_name
3109     _CheckNodeOnline(self, instance.primary_node)
3110
3111     if instance.disk_template == constants.DT_DISKLESS:
3112       raise errors.OpPrereqError("Instance '%s' has no disks" %
3113                                  self.op.instance_name)
3114     if instance.admin_up:
3115       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3116                                  self.op.instance_name)
3117     remote_info = self.rpc.call_instance_info(instance.primary_node,
3118                                               instance.name,
3119                                               instance.hypervisor)
3120     msg = remote_info.RemoteFailMsg()
3121     if msg:
3122       raise errors.OpPrereqError("Error checking node %s: %s" %
3123                                  (instance.primary_node, msg))
3124     if remote_info.payload:
3125       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3126                                  (self.op.instance_name,
3127                                   instance.primary_node))
3128
3129     self.op.os_type = getattr(self.op, "os_type", None)
3130     if self.op.os_type is not None:
3131       # OS verification
3132       pnode = self.cfg.GetNodeInfo(
3133         self.cfg.ExpandNodeName(instance.primary_node))
3134       if pnode is None:
3135         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3136                                    self.op.pnode)
3137       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3138       result.Raise()
3139       if not isinstance(result.data, objects.OS):
3140         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3141                                    " primary node"  % self.op.os_type)
3142
3143     self.instance = instance
3144
3145   def Exec(self, feedback_fn):
3146     """Reinstall the instance.
3147
3148     """
3149     inst = self.instance
3150
3151     if self.op.os_type is not None:
3152       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3153       inst.os = self.op.os_type
3154       self.cfg.Update(inst)
3155
3156     _StartInstanceDisks(self, inst, None)
3157     try:
3158       feedback_fn("Running the instance OS create scripts...")
3159       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3160       msg = result.RemoteFailMsg()
3161       if msg:
3162         raise errors.OpExecError("Could not install OS for instance %s"
3163                                  " on node %s: %s" %
3164                                  (inst.name, inst.primary_node, msg))
3165     finally:
3166       _ShutdownInstanceDisks(self, inst)
3167
3168
3169 class LURenameInstance(LogicalUnit):
3170   """Rename an instance.
3171
3172   """
3173   HPATH = "instance-rename"
3174   HTYPE = constants.HTYPE_INSTANCE
3175   _OP_REQP = ["instance_name", "new_name"]
3176
3177   def BuildHooksEnv(self):
3178     """Build hooks env.
3179
3180     This runs on master, primary and secondary nodes of the instance.
3181
3182     """
3183     env = _BuildInstanceHookEnvByObject(self, self.instance)
3184     env["INSTANCE_NEW_NAME"] = self.op.new_name
3185     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3186     return env, nl, nl
3187
3188   def CheckPrereq(self):
3189     """Check prerequisites.
3190
3191     This checks that the instance is in the cluster and is not running.
3192
3193     """
3194     instance = self.cfg.GetInstanceInfo(
3195       self.cfg.ExpandInstanceName(self.op.instance_name))
3196     if instance is None:
3197       raise errors.OpPrereqError("Instance '%s' not known" %
3198                                  self.op.instance_name)
3199     _CheckNodeOnline(self, instance.primary_node)
3200
3201     if instance.admin_up:
3202       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3203                                  self.op.instance_name)
3204     remote_info = self.rpc.call_instance_info(instance.primary_node,
3205                                               instance.name,
3206                                               instance.hypervisor)
3207     msg = remote_info.RemoteFailMsg()
3208     if msg:
3209       raise errors.OpPrereqError("Error checking node %s: %s" %
3210                                  (instance.primary_node, msg))
3211     if remote_info.payload:
3212       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3213                                  (self.op.instance_name,
3214                                   instance.primary_node))
3215     self.instance = instance
3216
3217     # new name verification
3218     name_info = utils.HostInfo(self.op.new_name)
3219
3220     self.op.new_name = new_name = name_info.name
3221     instance_list = self.cfg.GetInstanceList()
3222     if new_name in instance_list:
3223       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3224                                  new_name)
3225
3226     if not getattr(self.op, "ignore_ip", False):
3227       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3228         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3229                                    (name_info.ip, new_name))
3230
3231
3232   def Exec(self, feedback_fn):
3233     """Reinstall the instance.
3234
3235     """
3236     inst = self.instance
3237     old_name = inst.name
3238
3239     if inst.disk_template == constants.DT_FILE:
3240       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3241
3242     self.cfg.RenameInstance(inst.name, self.op.new_name)
3243     # Change the instance lock. This is definitely safe while we hold the BGL
3244     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3245     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3246
3247     # re-read the instance from the configuration after rename
3248     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3249
3250     if inst.disk_template == constants.DT_FILE:
3251       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3252       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3253                                                      old_file_storage_dir,
3254                                                      new_file_storage_dir)
3255       result.Raise()
3256       if not result.data:
3257         raise errors.OpExecError("Could not connect to node '%s' to rename"
3258                                  " directory '%s' to '%s' (but the instance"
3259                                  " has been renamed in Ganeti)" % (
3260                                  inst.primary_node, old_file_storage_dir,
3261                                  new_file_storage_dir))
3262
3263       if not result.data[0]:
3264         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3265                                  " (but the instance has been renamed in"
3266                                  " Ganeti)" % (old_file_storage_dir,
3267                                                new_file_storage_dir))
3268
3269     _StartInstanceDisks(self, inst, None)
3270     try:
3271       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3272                                                  old_name)
3273       msg = result.RemoteFailMsg()
3274       if msg:
3275         msg = ("Could not run OS rename script for instance %s on node %s"
3276                " (but the instance has been renamed in Ganeti): %s" %
3277                (inst.name, inst.primary_node, msg))
3278         self.proc.LogWarning(msg)
3279     finally:
3280       _ShutdownInstanceDisks(self, inst)
3281
3282
3283 class LURemoveInstance(LogicalUnit):
3284   """Remove an instance.
3285
3286   """
3287   HPATH = "instance-remove"
3288   HTYPE = constants.HTYPE_INSTANCE
3289   _OP_REQP = ["instance_name", "ignore_failures"]
3290   REQ_BGL = False
3291
3292   def ExpandNames(self):
3293     self._ExpandAndLockInstance()
3294     self.needed_locks[locking.LEVEL_NODE] = []
3295     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3296
3297   def DeclareLocks(self, level):
3298     if level == locking.LEVEL_NODE:
3299       self._LockInstancesNodes()
3300
3301   def BuildHooksEnv(self):
3302     """Build hooks env.
3303
3304     This runs on master, primary and secondary nodes of the instance.
3305
3306     """
3307     env = _BuildInstanceHookEnvByObject(self, self.instance)
3308     nl = [self.cfg.GetMasterNode()]
3309     return env, nl, nl
3310
3311   def CheckPrereq(self):
3312     """Check prerequisites.
3313
3314     This checks that the instance is in the cluster.
3315
3316     """
3317     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3318     assert self.instance is not None, \
3319       "Cannot retrieve locked instance %s" % self.op.instance_name
3320
3321   def Exec(self, feedback_fn):
3322     """Remove the instance.
3323
3324     """
3325     instance = self.instance
3326     logging.info("Shutting down instance %s on node %s",
3327                  instance.name, instance.primary_node)
3328
3329     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3330     msg = result.RemoteFailMsg()
3331     if msg:
3332       if self.op.ignore_failures:
3333         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3334       else:
3335         raise errors.OpExecError("Could not shutdown instance %s on"
3336                                  " node %s: %s" %
3337                                  (instance.name, instance.primary_node, msg))
3338
3339     logging.info("Removing block devices for instance %s", instance.name)
3340
3341     if not _RemoveDisks(self, instance):
3342       if self.op.ignore_failures:
3343         feedback_fn("Warning: can't remove instance's disks")
3344       else:
3345         raise errors.OpExecError("Can't remove instance's disks")
3346
3347     logging.info("Removing instance %s out of cluster config", instance.name)
3348
3349     self.cfg.RemoveInstance(instance.name)
3350     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3351
3352
3353 class LUQueryInstances(NoHooksLU):
3354   """Logical unit for querying instances.
3355
3356   """
3357   _OP_REQP = ["output_fields", "names", "use_locking"]
3358   REQ_BGL = False
3359   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3360                                     "admin_state",
3361                                     "disk_template", "ip", "mac", "bridge",
3362                                     "sda_size", "sdb_size", "vcpus", "tags",
3363                                     "network_port", "beparams",
3364                                     r"(disk)\.(size)/([0-9]+)",
3365                                     r"(disk)\.(sizes)", "disk_usage",
3366                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3367                                     r"(nic)\.(macs|ips|bridges)",
3368                                     r"(disk|nic)\.(count)",
3369                                     "serial_no", "hypervisor", "hvparams",] +
3370                                   ["hv/%s" % name
3371                                    for name in constants.HVS_PARAMETERS] +
3372                                   ["be/%s" % name
3373                                    for name in constants.BES_PARAMETERS])
3374   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3375
3376
3377   def ExpandNames(self):
3378     _CheckOutputFields(static=self._FIELDS_STATIC,
3379                        dynamic=self._FIELDS_DYNAMIC,
3380                        selected=self.op.output_fields)
3381
3382     self.needed_locks = {}
3383     self.share_locks[locking.LEVEL_INSTANCE] = 1
3384     self.share_locks[locking.LEVEL_NODE] = 1
3385
3386     if self.op.names:
3387       self.wanted = _GetWantedInstances(self, self.op.names)
3388     else:
3389       self.wanted = locking.ALL_SET
3390
3391     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3392     self.do_locking = self.do_node_query and self.op.use_locking
3393     if self.do_locking:
3394       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3395       self.needed_locks[locking.LEVEL_NODE] = []
3396       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3397
3398   def DeclareLocks(self, level):
3399     if level == locking.LEVEL_NODE and self.do_locking:
3400       self._LockInstancesNodes()
3401
3402   def CheckPrereq(self):
3403     """Check prerequisites.
3404
3405     """
3406     pass
3407
3408   def Exec(self, feedback_fn):
3409     """Computes the list of nodes and their attributes.
3410
3411     """
3412     all_info = self.cfg.GetAllInstancesInfo()
3413     if self.wanted == locking.ALL_SET:
3414       # caller didn't specify instance names, so ordering is not important
3415       if self.do_locking:
3416         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3417       else:
3418         instance_names = all_info.keys()
3419       instance_names = utils.NiceSort(instance_names)
3420     else:
3421       # caller did specify names, so we must keep the ordering
3422       if self.do_locking:
3423         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3424       else:
3425         tgt_set = all_info.keys()
3426       missing = set(self.wanted).difference(tgt_set)
3427       if missing:
3428         raise errors.OpExecError("Some instances were removed before"
3429                                  " retrieving their data: %s" % missing)
3430       instance_names = self.wanted
3431
3432     instance_list = [all_info[iname] for iname in instance_names]
3433
3434     # begin data gathering
3435
3436     nodes = frozenset([inst.primary_node for inst in instance_list])
3437     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3438
3439     bad_nodes = []
3440     off_nodes = []
3441     if self.do_node_query:
3442       live_data = {}
3443       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3444       for name in nodes:
3445         result = node_data[name]
3446         if result.offline:
3447           # offline nodes will be in both lists
3448           off_nodes.append(name)
3449         if result.failed or result.RemoteFailMsg():
3450           bad_nodes.append(name)
3451         else:
3452           if result.payload:
3453             live_data.update(result.payload)
3454           # else no instance is alive
3455     else:
3456       live_data = dict([(name, {}) for name in instance_names])
3457
3458     # end data gathering
3459
3460     HVPREFIX = "hv/"
3461     BEPREFIX = "be/"
3462     output = []
3463     for instance in instance_list:
3464       iout = []
3465       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3466       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3467       for field in self.op.output_fields:
3468         st_match = self._FIELDS_STATIC.Matches(field)
3469         if field == "name":
3470           val = instance.name
3471         elif field == "os":
3472           val = instance.os
3473         elif field == "pnode":
3474           val = instance.primary_node
3475         elif field == "snodes":
3476           val = list(instance.secondary_nodes)
3477         elif field == "admin_state":
3478           val = instance.admin_up
3479         elif field == "oper_state":
3480           if instance.primary_node in bad_nodes:
3481             val = None
3482           else:
3483             val = bool(live_data.get(instance.name))
3484         elif field == "status":
3485           if instance.primary_node in off_nodes:
3486             val = "ERROR_nodeoffline"
3487           elif instance.primary_node in bad_nodes:
3488             val = "ERROR_nodedown"
3489           else:
3490             running = bool(live_data.get(instance.name))
3491             if running:
3492               if instance.admin_up:
3493                 val = "running"
3494               else:
3495                 val = "ERROR_up"
3496             else:
3497               if instance.admin_up:
3498                 val = "ERROR_down"
3499               else:
3500                 val = "ADMIN_down"
3501         elif field == "oper_ram":
3502           if instance.primary_node in bad_nodes:
3503             val = None
3504           elif instance.name in live_data:
3505             val = live_data[instance.name].get("memory", "?")
3506           else:
3507             val = "-"
3508         elif field == "disk_template":
3509           val = instance.disk_template
3510         elif field == "ip":
3511           val = instance.nics[0].ip
3512         elif field == "bridge":
3513           val = instance.nics[0].bridge
3514         elif field == "mac":
3515           val = instance.nics[0].mac
3516         elif field == "sda_size" or field == "sdb_size":
3517           idx = ord(field[2]) - ord('a')
3518           try:
3519             val = instance.FindDisk(idx).size
3520           except errors.OpPrereqError:
3521             val = None
3522         elif field == "disk_usage": # total disk usage per node
3523           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3524           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3525         elif field == "tags":
3526           val = list(instance.GetTags())
3527         elif field == "serial_no":
3528           val = instance.serial_no
3529         elif field == "network_port":
3530           val = instance.network_port
3531         elif field == "hypervisor":
3532           val = instance.hypervisor
3533         elif field == "hvparams":
3534           val = i_hv
3535         elif (field.startswith(HVPREFIX) and
3536               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3537           val = i_hv.get(field[len(HVPREFIX):], None)
3538         elif field == "beparams":
3539           val = i_be
3540         elif (field.startswith(BEPREFIX) and
3541               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3542           val = i_be.get(field[len(BEPREFIX):], None)
3543         elif st_match and st_match.groups():
3544           # matches a variable list
3545           st_groups = st_match.groups()
3546           if st_groups and st_groups[0] == "disk":
3547             if st_groups[1] == "count":
3548               val = len(instance.disks)
3549             elif st_groups[1] == "sizes":
3550               val = [disk.size for disk in instance.disks]
3551             elif st_groups[1] == "size":
3552               try:
3553                 val = instance.FindDisk(st_groups[2]).size
3554               except errors.OpPrereqError:
3555                 val = None
3556             else:
3557               assert False, "Unhandled disk parameter"
3558           elif st_groups[0] == "nic":
3559             if st_groups[1] == "count":
3560               val = len(instance.nics)
3561             elif st_groups[1] == "macs":
3562               val = [nic.mac for nic in instance.nics]
3563             elif st_groups[1] == "ips":
3564               val = [nic.ip for nic in instance.nics]
3565             elif st_groups[1] == "bridges":
3566               val = [nic.bridge for nic in instance.nics]
3567             else:
3568               # index-based item
3569               nic_idx = int(st_groups[2])
3570               if nic_idx >= len(instance.nics):
3571                 val = None
3572               else:
3573                 if st_groups[1] == "mac":
3574                   val = instance.nics[nic_idx].mac
3575                 elif st_groups[1] == "ip":
3576                   val = instance.nics[nic_idx].ip
3577                 elif st_groups[1] == "bridge":
3578                   val = instance.nics[nic_idx].bridge
3579                 else:
3580                   assert False, "Unhandled NIC parameter"
3581           else:
3582             assert False, "Unhandled variable parameter"
3583         else:
3584           raise errors.ParameterError(field)
3585         iout.append(val)
3586       output.append(iout)
3587
3588     return output
3589
3590
3591 class LUFailoverInstance(LogicalUnit):
3592   """Failover an instance.
3593
3594   """
3595   HPATH = "instance-failover"
3596   HTYPE = constants.HTYPE_INSTANCE
3597   _OP_REQP = ["instance_name", "ignore_consistency"]
3598   REQ_BGL = False
3599
3600   def ExpandNames(self):
3601     self._ExpandAndLockInstance()
3602     self.needed_locks[locking.LEVEL_NODE] = []
3603     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3604
3605   def DeclareLocks(self, level):
3606     if level == locking.LEVEL_NODE:
3607       self._LockInstancesNodes()
3608
3609   def BuildHooksEnv(self):
3610     """Build hooks env.
3611
3612     This runs on master, primary and secondary nodes of the instance.
3613
3614     """
3615     env = {
3616       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3617       }
3618     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3619     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3620     return env, nl, nl
3621
3622   def CheckPrereq(self):
3623     """Check prerequisites.
3624
3625     This checks that the instance is in the cluster.
3626
3627     """
3628     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3629     assert self.instance is not None, \
3630       "Cannot retrieve locked instance %s" % self.op.instance_name
3631
3632     bep = self.cfg.GetClusterInfo().FillBE(instance)
3633     if instance.disk_template not in constants.DTS_NET_MIRROR:
3634       raise errors.OpPrereqError("Instance's disk layout is not"
3635                                  " network mirrored, cannot failover.")
3636
3637     secondary_nodes = instance.secondary_nodes
3638     if not secondary_nodes:
3639       raise errors.ProgrammerError("no secondary node but using "
3640                                    "a mirrored disk template")
3641
3642     target_node = secondary_nodes[0]
3643     _CheckNodeOnline(self, target_node)
3644     _CheckNodeNotDrained(self, target_node)
3645     # check memory requirements on the secondary node
3646     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3647                          instance.name, bep[constants.BE_MEMORY],
3648                          instance.hypervisor)
3649     # check bridge existance
3650     _CheckInstanceBridgesExist(self, instance, node=target_node)
3651
3652   def Exec(self, feedback_fn):
3653     """Failover an instance.
3654
3655     The failover is done by shutting it down on its present node and
3656     starting it on the secondary.
3657
3658     """
3659     instance = self.instance
3660
3661     source_node = instance.primary_node
3662     target_node = instance.secondary_nodes[0]
3663
3664     feedback_fn("* checking disk consistency between source and target")
3665     for dev in instance.disks:
3666       # for drbd, these are drbd over lvm
3667       if not _CheckDiskConsistency(self, dev, target_node, False):
3668         if instance.admin_up and not self.op.ignore_consistency:
3669           raise errors.OpExecError("Disk %s is degraded on target node,"
3670                                    " aborting failover." % dev.iv_name)
3671
3672     feedback_fn("* shutting down instance on source node")
3673     logging.info("Shutting down instance %s on node %s",
3674                  instance.name, source_node)
3675
3676     result = self.rpc.call_instance_shutdown(source_node, instance)
3677     msg = result.RemoteFailMsg()
3678     if msg:
3679       if self.op.ignore_consistency:
3680         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3681                              " Proceeding anyway. Please make sure node"
3682                              " %s is down. Error details: %s",
3683                              instance.name, source_node, source_node, msg)
3684       else:
3685         raise errors.OpExecError("Could not shutdown instance %s on"
3686                                  " node %s: %s" %
3687                                  (instance.name, source_node, msg))
3688
3689     feedback_fn("* deactivating the instance's disks on source node")
3690     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3691       raise errors.OpExecError("Can't shut down the instance's disks.")
3692
3693     instance.primary_node = target_node
3694     # distribute new instance config to the other nodes
3695     self.cfg.Update(instance)
3696
3697     # Only start the instance if it's marked as up
3698     if instance.admin_up:
3699       feedback_fn("* activating the instance's disks on target node")
3700       logging.info("Starting instance %s on node %s",
3701                    instance.name, target_node)
3702
3703       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3704                                                ignore_secondaries=True)
3705       if not disks_ok:
3706         _ShutdownInstanceDisks(self, instance)
3707         raise errors.OpExecError("Can't activate the instance's disks")
3708
3709       feedback_fn("* starting the instance on the target node")
3710       result = self.rpc.call_instance_start(target_node, instance, None, None)
3711       msg = result.RemoteFailMsg()
3712       if msg:
3713         _ShutdownInstanceDisks(self, instance)
3714         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3715                                  (instance.name, target_node, msg))
3716
3717
3718 class LUMigrateInstance(LogicalUnit):
3719   """Migrate an instance.
3720
3721   This is migration without shutting down, compared to the failover,
3722   which is done with shutdown.
3723
3724   """
3725   HPATH = "instance-migrate"
3726   HTYPE = constants.HTYPE_INSTANCE
3727   _OP_REQP = ["instance_name", "live", "cleanup"]
3728
3729   REQ_BGL = False
3730
3731   def ExpandNames(self):
3732     self._ExpandAndLockInstance()
3733     self.needed_locks[locking.LEVEL_NODE] = []
3734     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3735
3736   def DeclareLocks(self, level):
3737     if level == locking.LEVEL_NODE:
3738       self._LockInstancesNodes()
3739
3740   def BuildHooksEnv(self):
3741     """Build hooks env.
3742
3743     This runs on master, primary and secondary nodes of the instance.
3744
3745     """
3746     env = _BuildInstanceHookEnvByObject(self, self.instance)
3747     env["MIGRATE_LIVE"] = self.op.live
3748     env["MIGRATE_CLEANUP"] = self.op.cleanup
3749     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3750     return env, nl, nl
3751
3752   def CheckPrereq(self):
3753     """Check prerequisites.
3754
3755     This checks that the instance is in the cluster.
3756
3757     """
3758     instance = self.cfg.GetInstanceInfo(
3759       self.cfg.ExpandInstanceName(self.op.instance_name))
3760     if instance is None:
3761       raise errors.OpPrereqError("Instance '%s' not known" %
3762                                  self.op.instance_name)
3763
3764     if instance.disk_template != constants.DT_DRBD8:
3765       raise errors.OpPrereqError("Instance's disk layout is not"
3766                                  " drbd8, cannot migrate.")
3767
3768     secondary_nodes = instance.secondary_nodes
3769     if not secondary_nodes:
3770       raise errors.ConfigurationError("No secondary node but using"
3771                                       " drbd8 disk template")
3772
3773     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3774
3775     target_node = secondary_nodes[0]
3776     # check memory requirements on the secondary node
3777     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3778                          instance.name, i_be[constants.BE_MEMORY],
3779                          instance.hypervisor)
3780
3781     # check bridge existance
3782     _CheckInstanceBridgesExist(self, instance, node=target_node)
3783
3784     if not self.op.cleanup:
3785       _CheckNodeNotDrained(self, target_node)
3786       result = self.rpc.call_instance_migratable(instance.primary_node,
3787                                                  instance)
3788       msg = result.RemoteFailMsg()
3789       if msg:
3790         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3791                                    msg)
3792
3793     self.instance = instance
3794
3795   def _WaitUntilSync(self):
3796     """Poll with custom rpc for disk sync.
3797
3798     This uses our own step-based rpc call.
3799
3800     """
3801     self.feedback_fn("* wait until resync is done")
3802     all_done = False
3803     while not all_done:
3804       all_done = True
3805       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3806                                             self.nodes_ip,
3807                                             self.instance.disks)
3808       min_percent = 100
3809       for node, nres in result.items():
3810         msg = nres.RemoteFailMsg()
3811         if msg:
3812           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3813                                    (node, msg))
3814         node_done, node_percent = nres.payload
3815         all_done = all_done and node_done
3816         if node_percent is not None:
3817           min_percent = min(min_percent, node_percent)
3818       if not all_done:
3819         if min_percent < 100:
3820           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3821         time.sleep(2)
3822
3823   def _EnsureSecondary(self, node):
3824     """Demote a node to secondary.
3825
3826     """
3827     self.feedback_fn("* switching node %s to secondary mode" % node)
3828
3829     for dev in self.instance.disks:
3830       self.cfg.SetDiskID(dev, node)
3831
3832     result = self.rpc.call_blockdev_close(node, self.instance.name,
3833                                           self.instance.disks)
3834     msg = result.RemoteFailMsg()
3835     if msg:
3836       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3837                                " error %s" % (node, msg))
3838
3839   def _GoStandalone(self):
3840     """Disconnect from the network.
3841
3842     """
3843     self.feedback_fn("* changing into standalone mode")
3844     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3845                                                self.instance.disks)
3846     for node, nres in result.items():
3847       msg = nres.RemoteFailMsg()
3848       if msg:
3849         raise errors.OpExecError("Cannot disconnect disks node %s,"
3850                                  " error %s" % (node, msg))
3851
3852   def _GoReconnect(self, multimaster):
3853     """Reconnect to the network.
3854
3855     """
3856     if multimaster:
3857       msg = "dual-master"
3858     else:
3859       msg = "single-master"
3860     self.feedback_fn("* changing disks into %s mode" % msg)
3861     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3862                                            self.instance.disks,
3863                                            self.instance.name, multimaster)
3864     for node, nres in result.items():
3865       msg = nres.RemoteFailMsg()
3866       if msg:
3867         raise errors.OpExecError("Cannot change disks config on node %s,"
3868                                  " error: %s" % (node, msg))
3869
3870   def _ExecCleanup(self):
3871     """Try to cleanup after a failed migration.
3872
3873     The cleanup is done by:
3874       - check that the instance is running only on one node
3875         (and update the config if needed)
3876       - change disks on its secondary node to secondary
3877       - wait until disks are fully synchronized
3878       - disconnect from the network
3879       - change disks into single-master mode
3880       - wait again until disks are fully synchronized
3881
3882     """
3883     instance = self.instance
3884     target_node = self.target_node
3885     source_node = self.source_node
3886
3887     # check running on only one node
3888     self.feedback_fn("* checking where the instance actually runs"
3889                      " (if this hangs, the hypervisor might be in"
3890                      " a bad state)")
3891     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3892     for node, result in ins_l.items():
3893       msg = result.RemoteFailMsg()
3894       if msg:
3895         raise errors.OpExecError("Can't contact node %s: %s" % (node, msg))
3896
3897     runningon_source = instance.name in ins_l[source_node].payload
3898     runningon_target = instance.name in ins_l[target_node].payload
3899
3900     if runningon_source and runningon_target:
3901       raise errors.OpExecError("Instance seems to be running on two nodes,"
3902                                " or the hypervisor is confused. You will have"
3903                                " to ensure manually that it runs only on one"
3904                                " and restart this operation.")
3905
3906     if not (runningon_source or runningon_target):
3907       raise errors.OpExecError("Instance does not seem to be running at all."
3908                                " In this case, it's safer to repair by"
3909                                " running 'gnt-instance stop' to ensure disk"
3910                                " shutdown, and then restarting it.")
3911
3912     if runningon_target:
3913       # the migration has actually succeeded, we need to update the config
3914       self.feedback_fn("* instance running on secondary node (%s),"
3915                        " updating config" % target_node)
3916       instance.primary_node = target_node
3917       self.cfg.Update(instance)
3918       demoted_node = source_node
3919     else:
3920       self.feedback_fn("* instance confirmed to be running on its"
3921                        " primary node (%s)" % source_node)
3922       demoted_node = target_node
3923
3924     self._EnsureSecondary(demoted_node)
3925     try:
3926       self._WaitUntilSync()
3927     except errors.OpExecError:
3928       # we ignore here errors, since if the device is standalone, it
3929       # won't be able to sync
3930       pass
3931     self._GoStandalone()
3932     self._GoReconnect(False)
3933     self._WaitUntilSync()
3934
3935     self.feedback_fn("* done")
3936
3937   def _RevertDiskStatus(self):
3938     """Try to revert the disk status after a failed migration.
3939
3940     """
3941     target_node = self.target_node
3942     try:
3943       self._EnsureSecondary(target_node)
3944       self._GoStandalone()
3945       self._GoReconnect(False)
3946       self._WaitUntilSync()
3947     except errors.OpExecError, err:
3948       self.LogWarning("Migration failed and I can't reconnect the"
3949                       " drives: error '%s'\n"
3950                       "Please look and recover the instance status" %
3951                       str(err))
3952
3953   def _AbortMigration(self):
3954     """Call the hypervisor code to abort a started migration.
3955
3956     """
3957     instance = self.instance
3958     target_node = self.target_node
3959     migration_info = self.migration_info
3960
3961     abort_result = self.rpc.call_finalize_migration(target_node,
3962                                                     instance,
3963                                                     migration_info,
3964                                                     False)
3965     abort_msg = abort_result.RemoteFailMsg()
3966     if abort_msg:
3967       logging.error("Aborting migration failed on target node %s: %s" %
3968                     (target_node, abort_msg))
3969       # Don't raise an exception here, as we stil have to try to revert the
3970       # disk status, even if this step failed.
3971
3972   def _ExecMigration(self):
3973     """Migrate an instance.
3974
3975     The migrate is done by:
3976       - change the disks into dual-master mode
3977       - wait until disks are fully synchronized again
3978       - migrate the instance
3979       - change disks on the new secondary node (the old primary) to secondary
3980       - wait until disks are fully synchronized
3981       - change disks into single-master mode
3982
3983     """
3984     instance = self.instance
3985     target_node = self.target_node
3986     source_node = self.source_node
3987
3988     self.feedback_fn("* checking disk consistency between source and target")
3989     for dev in instance.disks:
3990       if not _CheckDiskConsistency(self, dev, target_node, False):
3991         raise errors.OpExecError("Disk %s is degraded or not fully"
3992                                  " synchronized on target node,"
3993                                  " aborting migrate." % dev.iv_name)
3994
3995     # First get the migration information from the remote node
3996     result = self.rpc.call_migration_info(source_node, instance)
3997     msg = result.RemoteFailMsg()
3998     if msg:
3999       log_err = ("Failed fetching source migration information from %s: %s" %
4000                  (source_node, msg))
4001       logging.error(log_err)
4002       raise errors.OpExecError(log_err)
4003
4004     self.migration_info = migration_info = result.payload
4005
4006     # Then switch the disks to master/master mode
4007     self._EnsureSecondary(target_node)
4008     self._GoStandalone()
4009     self._GoReconnect(True)
4010     self._WaitUntilSync()
4011
4012     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4013     result = self.rpc.call_accept_instance(target_node,
4014                                            instance,
4015                                            migration_info,
4016                                            self.nodes_ip[target_node])
4017
4018     msg = result.RemoteFailMsg()
4019     if msg:
4020       logging.error("Instance pre-migration failed, trying to revert"
4021                     " disk status: %s", msg)
4022       self._AbortMigration()
4023       self._RevertDiskStatus()
4024       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4025                                (instance.name, msg))
4026
4027     self.feedback_fn("* migrating instance to %s" % target_node)
4028     time.sleep(10)
4029     result = self.rpc.call_instance_migrate(source_node, instance,
4030                                             self.nodes_ip[target_node],
4031                                             self.op.live)
4032     msg = result.RemoteFailMsg()
4033     if msg:
4034       logging.error("Instance migration failed, trying to revert"
4035                     " disk status: %s", msg)
4036       self._AbortMigration()
4037       self._RevertDiskStatus()
4038       raise errors.OpExecError("Could not migrate instance %s: %s" %
4039                                (instance.name, msg))
4040     time.sleep(10)
4041
4042     instance.primary_node = target_node
4043     # distribute new instance config to the other nodes
4044     self.cfg.Update(instance)
4045
4046     result = self.rpc.call_finalize_migration(target_node,
4047                                               instance,
4048                                               migration_info,
4049                                               True)
4050     msg = result.RemoteFailMsg()
4051     if msg:
4052       logging.error("Instance migration succeeded, but finalization failed:"
4053                     " %s" % msg)
4054       raise errors.OpExecError("Could not finalize instance migration: %s" %
4055                                msg)
4056
4057     self._EnsureSecondary(source_node)
4058     self._WaitUntilSync()
4059     self._GoStandalone()
4060     self._GoReconnect(False)
4061     self._WaitUntilSync()
4062
4063     self.feedback_fn("* done")
4064
4065   def Exec(self, feedback_fn):
4066     """Perform the migration.
4067
4068     """
4069     self.feedback_fn = feedback_fn
4070
4071     self.source_node = self.instance.primary_node
4072     self.target_node = self.instance.secondary_nodes[0]
4073     self.all_nodes = [self.source_node, self.target_node]
4074     self.nodes_ip = {
4075       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4076       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4077       }
4078     if self.op.cleanup:
4079       return self._ExecCleanup()
4080     else:
4081       return self._ExecMigration()
4082
4083
4084 def _CreateBlockDev(lu, node, instance, device, force_create,
4085                     info, force_open):
4086   """Create a tree of block devices on a given node.
4087
4088   If this device type has to be created on secondaries, create it and
4089   all its children.
4090
4091   If not, just recurse to children keeping the same 'force' value.
4092
4093   @param lu: the lu on whose behalf we execute
4094   @param node: the node on which to create the device
4095   @type instance: L{objects.Instance}
4096   @param instance: the instance which owns the device
4097   @type device: L{objects.Disk}
4098   @param device: the device to create
4099   @type force_create: boolean
4100   @param force_create: whether to force creation of this device; this
4101       will be change to True whenever we find a device which has
4102       CreateOnSecondary() attribute
4103   @param info: the extra 'metadata' we should attach to the device
4104       (this will be represented as a LVM tag)
4105   @type force_open: boolean
4106   @param force_open: this parameter will be passes to the
4107       L{backend.BlockdevCreate} function where it specifies
4108       whether we run on primary or not, and it affects both
4109       the child assembly and the device own Open() execution
4110
4111   """
4112   if device.CreateOnSecondary():
4113     force_create = True
4114
4115   if device.children:
4116     for child in device.children:
4117       _CreateBlockDev(lu, node, instance, child, force_create,
4118                       info, force_open)
4119
4120   if not force_create:
4121     return
4122
4123   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4124
4125
4126 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4127   """Create a single block device on a given node.
4128
4129   This will not recurse over children of the device, so they must be
4130   created in advance.
4131
4132   @param lu: the lu on whose behalf we execute
4133   @param node: the node on which to create the device
4134   @type instance: L{objects.Instance}
4135   @param instance: the instance which owns the device
4136   @type device: L{objects.Disk}
4137   @param device: the device to create
4138   @param info: the extra 'metadata' we should attach to the device
4139       (this will be represented as a LVM tag)
4140   @type force_open: boolean
4141   @param force_open: this parameter will be passes to the
4142       L{backend.BlockdevCreate} function where it specifies
4143       whether we run on primary or not, and it affects both
4144       the child assembly and the device own Open() execution
4145
4146   """
4147   lu.cfg.SetDiskID(device, node)
4148   result = lu.rpc.call_blockdev_create(node, device, device.size,
4149                                        instance.name, force_open, info)
4150   msg = result.RemoteFailMsg()
4151   if msg:
4152     raise errors.OpExecError("Can't create block device %s on"
4153                              " node %s for instance %s: %s" %
4154                              (device, node, instance.name, msg))
4155   if device.physical_id is None:
4156     device.physical_id = result.payload
4157
4158
4159 def _GenerateUniqueNames(lu, exts):
4160   """Generate a suitable LV name.
4161
4162   This will generate a logical volume name for the given instance.
4163
4164   """
4165   results = []
4166   for val in exts:
4167     new_id = lu.cfg.GenerateUniqueID()
4168     results.append("%s%s" % (new_id, val))
4169   return results
4170
4171
4172 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4173                          p_minor, s_minor):
4174   """Generate a drbd8 device complete with its children.
4175
4176   """
4177   port = lu.cfg.AllocatePort()
4178   vgname = lu.cfg.GetVGName()
4179   shared_secret = lu.cfg.GenerateDRBDSecret()
4180   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4181                           logical_id=(vgname, names[0]))
4182   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4183                           logical_id=(vgname, names[1]))
4184   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4185                           logical_id=(primary, secondary, port,
4186                                       p_minor, s_minor,
4187                                       shared_secret),
4188                           children=[dev_data, dev_meta],
4189                           iv_name=iv_name)
4190   return drbd_dev
4191
4192
4193 def _GenerateDiskTemplate(lu, template_name,
4194                           instance_name, primary_node,
4195                           secondary_nodes, disk_info,
4196                           file_storage_dir, file_driver,
4197                           base_index):
4198   """Generate the entire disk layout for a given template type.
4199
4200   """
4201   #TODO: compute space requirements
4202
4203   vgname = lu.cfg.GetVGName()
4204   disk_count = len(disk_info)
4205   disks = []
4206   if template_name == constants.DT_DISKLESS:
4207     pass
4208   elif template_name == constants.DT_PLAIN:
4209     if len(secondary_nodes) != 0:
4210       raise errors.ProgrammerError("Wrong template configuration")
4211
4212     names = _GenerateUniqueNames(lu, [".disk%d" % i
4213                                       for i in range(disk_count)])
4214     for idx, disk in enumerate(disk_info):
4215       disk_index = idx + base_index
4216       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4217                               logical_id=(vgname, names[idx]),
4218                               iv_name="disk/%d" % disk_index,
4219                               mode=disk["mode"])
4220       disks.append(disk_dev)
4221   elif template_name == constants.DT_DRBD8:
4222     if len(secondary_nodes) != 1:
4223       raise errors.ProgrammerError("Wrong template configuration")
4224     remote_node = secondary_nodes[0]
4225     minors = lu.cfg.AllocateDRBDMinor(
4226       [primary_node, remote_node] * len(disk_info), instance_name)
4227
4228     names = []
4229     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4230                                                for i in range(disk_count)]):
4231       names.append(lv_prefix + "_data")
4232       names.append(lv_prefix + "_meta")
4233     for idx, disk in enumerate(disk_info):
4234       disk_index = idx + base_index
4235       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4236                                       disk["size"], names[idx*2:idx*2+2],
4237                                       "disk/%d" % disk_index,
4238                                       minors[idx*2], minors[idx*2+1])
4239       disk_dev.mode = disk["mode"]
4240       disks.append(disk_dev)
4241   elif template_name == constants.DT_FILE:
4242     if len(secondary_nodes) != 0:
4243       raise errors.ProgrammerError("Wrong template configuration")
4244
4245     for idx, disk in enumerate(disk_info):
4246       disk_index = idx + base_index
4247       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4248                               iv_name="disk/%d" % disk_index,
4249                               logical_id=(file_driver,
4250                                           "%s/disk%d" % (file_storage_dir,
4251                                                          disk_index)),
4252                               mode=disk["mode"])
4253       disks.append(disk_dev)
4254   else:
4255     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4256   return disks
4257
4258
4259 def _GetInstanceInfoText(instance):
4260   """Compute that text that should be added to the disk's metadata.
4261
4262   """
4263   return "originstname+%s" % instance.name
4264
4265
4266 def _CreateDisks(lu, instance):
4267   """Create all disks for an instance.
4268
4269   This abstracts away some work from AddInstance.
4270
4271   @type lu: L{LogicalUnit}
4272   @param lu: the logical unit on whose behalf we execute
4273   @type instance: L{objects.Instance}
4274   @param instance: the instance whose disks we should create
4275   @rtype: boolean
4276   @return: the success of the creation
4277
4278   """
4279   info = _GetInstanceInfoText(instance)
4280   pnode = instance.primary_node
4281
4282   if instance.disk_template == constants.DT_FILE:
4283     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4284     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4285
4286     if result.failed or not result.data:
4287       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4288
4289     if not result.data[0]:
4290       raise errors.OpExecError("Failed to create directory '%s'" %
4291                                file_storage_dir)
4292
4293   # Note: this needs to be kept in sync with adding of disks in
4294   # LUSetInstanceParams
4295   for device in instance.disks:
4296     logging.info("Creating volume %s for instance %s",
4297                  device.iv_name, instance.name)
4298     #HARDCODE
4299     for node in instance.all_nodes:
4300       f_create = node == pnode
4301       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4302
4303
4304 def _RemoveDisks(lu, instance):
4305   """Remove all disks for an instance.
4306
4307   This abstracts away some work from `AddInstance()` and
4308   `RemoveInstance()`. Note that in case some of the devices couldn't
4309   be removed, the removal will continue with the other ones (compare
4310   with `_CreateDisks()`).
4311
4312   @type lu: L{LogicalUnit}
4313   @param lu: the logical unit on whose behalf we execute
4314   @type instance: L{objects.Instance}
4315   @param instance: the instance whose disks we should remove
4316   @rtype: boolean
4317   @return: the success of the removal
4318
4319   """
4320   logging.info("Removing block devices for instance %s", instance.name)
4321
4322   all_result = True
4323   for device in instance.disks:
4324     for node, disk in device.ComputeNodeTree(instance.primary_node):
4325       lu.cfg.SetDiskID(disk, node)
4326       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4327       if msg:
4328         lu.LogWarning("Could not remove block device %s on node %s,"
4329                       " continuing anyway: %s", device.iv_name, node, msg)
4330         all_result = False
4331
4332   if instance.disk_template == constants.DT_FILE:
4333     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4334     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4335                                                  file_storage_dir)
4336     if result.failed or not result.data:
4337       logging.error("Could not remove directory '%s'", file_storage_dir)
4338       all_result = False
4339
4340   return all_result
4341
4342
4343 def _ComputeDiskSize(disk_template, disks):
4344   """Compute disk size requirements in the volume group
4345
4346   """
4347   # Required free disk space as a function of disk and swap space
4348   req_size_dict = {
4349     constants.DT_DISKLESS: None,
4350     constants.DT_PLAIN: sum(d["size"] for d in disks),
4351     # 128 MB are added for drbd metadata for each disk
4352     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4353     constants.DT_FILE: None,
4354   }
4355
4356   if disk_template not in req_size_dict:
4357     raise errors.ProgrammerError("Disk template '%s' size requirement"
4358                                  " is unknown" %  disk_template)
4359
4360   return req_size_dict[disk_template]
4361
4362
4363 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4364   """Hypervisor parameter validation.
4365
4366   This function abstract the hypervisor parameter validation to be
4367   used in both instance create and instance modify.
4368
4369   @type lu: L{LogicalUnit}
4370   @param lu: the logical unit for which we check
4371   @type nodenames: list
4372   @param nodenames: the list of nodes on which we should check
4373   @type hvname: string
4374   @param hvname: the name of the hypervisor we should use
4375   @type hvparams: dict
4376   @param hvparams: the parameters which we need to check
4377   @raise errors.OpPrereqError: if the parameters are not valid
4378
4379   """
4380   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4381                                                   hvname,
4382                                                   hvparams)
4383   for node in nodenames:
4384     info = hvinfo[node]
4385     if info.offline:
4386       continue
4387     msg = info.RemoteFailMsg()
4388     if msg:
4389       raise errors.OpPrereqError("Hypervisor parameter validation"
4390                                  " failed on node %s: %s" % (node, msg))
4391
4392
4393 class LUCreateInstance(LogicalUnit):
4394   """Create an instance.
4395
4396   """
4397   HPATH = "instance-add"
4398   HTYPE = constants.HTYPE_INSTANCE
4399   _OP_REQP = ["instance_name", "disks", "disk_template",
4400               "mode", "start",
4401               "wait_for_sync", "ip_check", "nics",
4402               "hvparams", "beparams"]
4403   REQ_BGL = False
4404
4405   def _ExpandNode(self, node):
4406     """Expands and checks one node name.
4407
4408     """
4409     node_full = self.cfg.ExpandNodeName(node)
4410     if node_full is None:
4411       raise errors.OpPrereqError("Unknown node %s" % node)
4412     return node_full
4413
4414   def ExpandNames(self):
4415     """ExpandNames for CreateInstance.
4416
4417     Figure out the right locks for instance creation.
4418
4419     """
4420     self.needed_locks = {}
4421
4422     # set optional parameters to none if they don't exist
4423     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4424       if not hasattr(self.op, attr):
4425         setattr(self.op, attr, None)
4426
4427     # cheap checks, mostly valid constants given
4428
4429     # verify creation mode
4430     if self.op.mode not in (constants.INSTANCE_CREATE,
4431                             constants.INSTANCE_IMPORT):
4432       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4433                                  self.op.mode)
4434
4435     # disk template and mirror node verification
4436     if self.op.disk_template not in constants.DISK_TEMPLATES:
4437       raise errors.OpPrereqError("Invalid disk template name")
4438
4439     if self.op.hypervisor is None:
4440       self.op.hypervisor = self.cfg.GetHypervisorType()
4441
4442     cluster = self.cfg.GetClusterInfo()
4443     enabled_hvs = cluster.enabled_hypervisors
4444     if self.op.hypervisor not in enabled_hvs:
4445       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4446                                  " cluster (%s)" % (self.op.hypervisor,
4447                                   ",".join(enabled_hvs)))
4448
4449     # check hypervisor parameter syntax (locally)
4450     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4451     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4452                                   self.op.hvparams)
4453     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4454     hv_type.CheckParameterSyntax(filled_hvp)
4455
4456     # fill and remember the beparams dict
4457     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4458     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4459                                     self.op.beparams)
4460
4461     #### instance parameters check
4462
4463     # instance name verification
4464     hostname1 = utils.HostInfo(self.op.instance_name)
4465     self.op.instance_name = instance_name = hostname1.name
4466
4467     # this is just a preventive check, but someone might still add this
4468     # instance in the meantime, and creation will fail at lock-add time
4469     if instance_name in self.cfg.GetInstanceList():
4470       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4471                                  instance_name)
4472
4473     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4474
4475     # NIC buildup
4476     self.nics = []
4477     for idx, nic in enumerate(self.op.nics):
4478       nic_mode_req = nic.get("mode", None)
4479       nic_mode = nic_mode_req
4480       if nic_mode is None:
4481         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4482
4483       # in routed mode, for the first nic, the default ip is 'auto'
4484       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4485         default_ip_mode = constants.VALUE_AUTO
4486       else:
4487         default_ip_mode = constants.VALUE_NONE
4488
4489       # ip validity checks
4490       ip = nic.get("ip", default_ip_mode)
4491       if ip is None or ip.lower() == constants.VALUE_NONE:
4492         nic_ip = None
4493       elif ip.lower() == constants.VALUE_AUTO:
4494         nic_ip = hostname1.ip
4495       else:
4496         if not utils.IsValidIP(ip):
4497           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4498                                      " like a valid IP" % ip)
4499         nic_ip = ip
4500
4501       # TODO: check the ip for uniqueness !!
4502       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4503         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4504
4505       # MAC address verification
4506       mac = nic.get("mac", constants.VALUE_AUTO)
4507       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4508         if not utils.IsValidMac(mac.lower()):
4509           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4510                                      mac)
4511       # bridge verification
4512       bridge = nic.get("bridge", None)
4513       link = nic.get("link", None)
4514       if bridge and link:
4515         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4516       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4517         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4518       elif bridge:
4519         link = bridge
4520
4521       nicparams = {}
4522       if nic_mode_req:
4523         nicparams[constants.NIC_MODE] = nic_mode_req
4524       if link:
4525         nicparams[constants.NIC_LINK] = link
4526
4527       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4528                                       nicparams)
4529       objects.NIC.CheckParameterSyntax(check_params)
4530       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4531
4532     # disk checks/pre-build
4533     self.disks = []
4534     for disk in self.op.disks:
4535       mode = disk.get("mode", constants.DISK_RDWR)
4536       if mode not in constants.DISK_ACCESS_SET:
4537         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4538                                    mode)
4539       size = disk.get("size", None)
4540       if size is None:
4541         raise errors.OpPrereqError("Missing disk size")
4542       try:
4543         size = int(size)
4544       except ValueError:
4545         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4546       self.disks.append({"size": size, "mode": mode})
4547
4548     # used in CheckPrereq for ip ping check
4549     self.check_ip = hostname1.ip
4550
4551     # file storage checks
4552     if (self.op.file_driver and
4553         not self.op.file_driver in constants.FILE_DRIVER):
4554       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4555                                  self.op.file_driver)
4556
4557     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4558       raise errors.OpPrereqError("File storage directory path not absolute")
4559
4560     ### Node/iallocator related checks
4561     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4562       raise errors.OpPrereqError("One and only one of iallocator and primary"
4563                                  " node must be given")
4564
4565     if self.op.iallocator:
4566       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4567     else:
4568       self.op.pnode = self._ExpandNode(self.op.pnode)
4569       nodelist = [self.op.pnode]
4570       if self.op.snode is not None:
4571         self.op.snode = self._ExpandNode(self.op.snode)
4572         nodelist.append(self.op.snode)
4573       self.needed_locks[locking.LEVEL_NODE] = nodelist
4574
4575     # in case of import lock the source node too
4576     if self.op.mode == constants.INSTANCE_IMPORT:
4577       src_node = getattr(self.op, "src_node", None)
4578       src_path = getattr(self.op, "src_path", None)
4579
4580       if src_path is None:
4581         self.op.src_path = src_path = self.op.instance_name
4582
4583       if src_node is None:
4584         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4585         self.op.src_node = None
4586         if os.path.isabs(src_path):
4587           raise errors.OpPrereqError("Importing an instance from an absolute"
4588                                      " path requires a source node option.")
4589       else:
4590         self.op.src_node = src_node = self._ExpandNode(src_node)
4591         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4592           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4593         if not os.path.isabs(src_path):
4594           self.op.src_path = src_path = \
4595             os.path.join(constants.EXPORT_DIR, src_path)
4596
4597     else: # INSTANCE_CREATE
4598       if getattr(self.op, "os_type", None) is None:
4599         raise errors.OpPrereqError("No guest OS specified")
4600
4601   def _RunAllocator(self):
4602     """Run the allocator based on input opcode.
4603
4604     """
4605     nics = [n.ToDict() for n in self.nics]
4606     ial = IAllocator(self,
4607                      mode=constants.IALLOCATOR_MODE_ALLOC,
4608                      name=self.op.instance_name,
4609                      disk_template=self.op.disk_template,
4610                      tags=[],
4611                      os=self.op.os_type,
4612                      vcpus=self.be_full[constants.BE_VCPUS],
4613                      mem_size=self.be_full[constants.BE_MEMORY],
4614                      disks=self.disks,
4615                      nics=nics,
4616                      hypervisor=self.op.hypervisor,
4617                      )
4618
4619     ial.Run(self.op.iallocator)
4620
4621     if not ial.success:
4622       raise errors.OpPrereqError("Can't compute nodes using"
4623                                  " iallocator '%s': %s" % (self.op.iallocator,
4624                                                            ial.info))
4625     if len(ial.nodes) != ial.required_nodes:
4626       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4627                                  " of nodes (%s), required %s" %
4628                                  (self.op.iallocator, len(ial.nodes),
4629                                   ial.required_nodes))
4630     self.op.pnode = ial.nodes[0]
4631     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4632                  self.op.instance_name, self.op.iallocator,
4633                  ", ".join(ial.nodes))
4634     if ial.required_nodes == 2:
4635       self.op.snode = ial.nodes[1]
4636
4637   def BuildHooksEnv(self):
4638     """Build hooks env.
4639
4640     This runs on master, primary and secondary nodes of the instance.
4641
4642     """
4643     env = {
4644       "ADD_MODE": self.op.mode,
4645       }
4646     if self.op.mode == constants.INSTANCE_IMPORT:
4647       env["SRC_NODE"] = self.op.src_node
4648       env["SRC_PATH"] = self.op.src_path
4649       env["SRC_IMAGES"] = self.src_images
4650
4651     env.update(_BuildInstanceHookEnv(
4652       name=self.op.instance_name,
4653       primary_node=self.op.pnode,
4654       secondary_nodes=self.secondaries,
4655       status=self.op.start,
4656       os_type=self.op.os_type,
4657       memory=self.be_full[constants.BE_MEMORY],
4658       vcpus=self.be_full[constants.BE_VCPUS],
4659       nics=_PreBuildNICHooksList(self, self.nics),
4660       disk_template=self.op.disk_template,
4661       disks=[(d["size"], d["mode"]) for d in self.disks],
4662     ))
4663
4664     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4665           self.secondaries)
4666     return env, nl, nl
4667
4668
4669   def CheckPrereq(self):
4670     """Check prerequisites.
4671
4672     """
4673     if (not self.cfg.GetVGName() and
4674         self.op.disk_template not in constants.DTS_NOT_LVM):
4675       raise errors.OpPrereqError("Cluster does not support lvm-based"
4676                                  " instances")
4677
4678     if self.op.mode == constants.INSTANCE_IMPORT:
4679       src_node = self.op.src_node
4680       src_path = self.op.src_path
4681
4682       if src_node is None:
4683         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4684         exp_list = self.rpc.call_export_list(locked_nodes)
4685         found = False
4686         for node in exp_list:
4687           if exp_list[node].RemoteFailMsg():
4688             continue
4689           if src_path in exp_list[node].payload:
4690             found = True
4691             self.op.src_node = src_node = node
4692             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4693                                                        src_path)
4694             break
4695         if not found:
4696           raise errors.OpPrereqError("No export found for relative path %s" %
4697                                       src_path)
4698
4699       _CheckNodeOnline(self, src_node)
4700       result = self.rpc.call_export_info(src_node, src_path)
4701       msg = result.RemoteFailMsg()
4702       if msg:
4703         raise errors.OpPrereqError("No export or invalid export found in"
4704                                    " dir %s: %s" % (src_path, msg))
4705
4706       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4707       if not export_info.has_section(constants.INISECT_EXP):
4708         raise errors.ProgrammerError("Corrupted export config")
4709
4710       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4711       if (int(ei_version) != constants.EXPORT_VERSION):
4712         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4713                                    (ei_version, constants.EXPORT_VERSION))
4714
4715       # Check that the new instance doesn't have less disks than the export
4716       instance_disks = len(self.disks)
4717       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4718       if instance_disks < export_disks:
4719         raise errors.OpPrereqError("Not enough disks to import."
4720                                    " (instance: %d, export: %d)" %
4721                                    (instance_disks, export_disks))
4722
4723       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4724       disk_images = []
4725       for idx in range(export_disks):
4726         option = 'disk%d_dump' % idx
4727         if export_info.has_option(constants.INISECT_INS, option):
4728           # FIXME: are the old os-es, disk sizes, etc. useful?
4729           export_name = export_info.get(constants.INISECT_INS, option)
4730           image = os.path.join(src_path, export_name)
4731           disk_images.append(image)
4732         else:
4733           disk_images.append(False)
4734
4735       self.src_images = disk_images
4736
4737       old_name = export_info.get(constants.INISECT_INS, 'name')
4738       # FIXME: int() here could throw a ValueError on broken exports
4739       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4740       if self.op.instance_name == old_name:
4741         for idx, nic in enumerate(self.nics):
4742           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4743             nic_mac_ini = 'nic%d_mac' % idx
4744             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4745
4746     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4747     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4748     if self.op.start and not self.op.ip_check:
4749       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4750                                  " adding an instance in start mode")
4751
4752     if self.op.ip_check:
4753       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4754         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4755                                    (self.check_ip, self.op.instance_name))
4756
4757     #### mac address generation
4758     # By generating here the mac address both the allocator and the hooks get
4759     # the real final mac address rather than the 'auto' or 'generate' value.
4760     # There is a race condition between the generation and the instance object
4761     # creation, which means that we know the mac is valid now, but we're not
4762     # sure it will be when we actually add the instance. If things go bad
4763     # adding the instance will abort because of a duplicate mac, and the
4764     # creation job will fail.
4765     for nic in self.nics:
4766       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4767         nic.mac = self.cfg.GenerateMAC()
4768
4769     #### allocator run
4770
4771     if self.op.iallocator is not None:
4772       self._RunAllocator()
4773
4774     #### node related checks
4775
4776     # check primary node
4777     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4778     assert self.pnode is not None, \
4779       "Cannot retrieve locked node %s" % self.op.pnode
4780     if pnode.offline:
4781       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4782                                  pnode.name)
4783     if pnode.drained:
4784       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4785                                  pnode.name)
4786
4787     self.secondaries = []
4788
4789     # mirror node verification
4790     if self.op.disk_template in constants.DTS_NET_MIRROR:
4791       if self.op.snode is None:
4792         raise errors.OpPrereqError("The networked disk templates need"
4793                                    " a mirror node")
4794       if self.op.snode == pnode.name:
4795         raise errors.OpPrereqError("The secondary node cannot be"
4796                                    " the primary node.")
4797       _CheckNodeOnline(self, self.op.snode)
4798       _CheckNodeNotDrained(self, self.op.snode)
4799       self.secondaries.append(self.op.snode)
4800
4801     nodenames = [pnode.name] + self.secondaries
4802
4803     req_size = _ComputeDiskSize(self.op.disk_template,
4804                                 self.disks)
4805
4806     # Check lv size requirements
4807     if req_size is not None:
4808       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4809                                          self.op.hypervisor)
4810       for node in nodenames:
4811         info = nodeinfo[node]
4812         info.Raise()
4813         info = info.data
4814         if not info:
4815           raise errors.OpPrereqError("Cannot get current information"
4816                                      " from node '%s'" % node)
4817         vg_free = info.get('vg_free', None)
4818         if not isinstance(vg_free, int):
4819           raise errors.OpPrereqError("Can't compute free disk space on"
4820                                      " node %s" % node)
4821         if req_size > info['vg_free']:
4822           raise errors.OpPrereqError("Not enough disk space on target node %s."
4823                                      " %d MB available, %d MB required" %
4824                                      (node, info['vg_free'], req_size))
4825
4826     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4827
4828     # os verification
4829     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4830     result.Raise()
4831     if not isinstance(result.data, objects.OS):
4832       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4833                                  " primary node"  % self.op.os_type)
4834
4835     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4836
4837     # memory check on primary node
4838     if self.op.start:
4839       _CheckNodeFreeMemory(self, self.pnode.name,
4840                            "creating instance %s" % self.op.instance_name,
4841                            self.be_full[constants.BE_MEMORY],
4842                            self.op.hypervisor)
4843
4844   def Exec(self, feedback_fn):
4845     """Create and add the instance to the cluster.
4846
4847     """
4848     instance = self.op.instance_name
4849     pnode_name = self.pnode.name
4850
4851     ht_kind = self.op.hypervisor
4852     if ht_kind in constants.HTS_REQ_PORT:
4853       network_port = self.cfg.AllocatePort()
4854     else:
4855       network_port = None
4856
4857     ##if self.op.vnc_bind_address is None:
4858     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4859
4860     # this is needed because os.path.join does not accept None arguments
4861     if self.op.file_storage_dir is None:
4862       string_file_storage_dir = ""
4863     else:
4864       string_file_storage_dir = self.op.file_storage_dir
4865
4866     # build the full file storage dir path
4867     file_storage_dir = os.path.normpath(os.path.join(
4868                                         self.cfg.GetFileStorageDir(),
4869                                         string_file_storage_dir, instance))
4870
4871
4872     disks = _GenerateDiskTemplate(self,
4873                                   self.op.disk_template,
4874                                   instance, pnode_name,
4875                                   self.secondaries,
4876                                   self.disks,
4877                                   file_storage_dir,
4878                                   self.op.file_driver,
4879                                   0)
4880
4881     iobj = objects.Instance(name=instance, os=self.op.os_type,
4882                             primary_node=pnode_name,
4883                             nics=self.nics, disks=disks,
4884                             disk_template=self.op.disk_template,
4885                             admin_up=False,
4886                             network_port=network_port,
4887                             beparams=self.op.beparams,
4888                             hvparams=self.op.hvparams,
4889                             hypervisor=self.op.hypervisor,
4890                             )
4891
4892     feedback_fn("* creating instance disks...")
4893     try:
4894       _CreateDisks(self, iobj)
4895     except errors.OpExecError:
4896       self.LogWarning("Device creation failed, reverting...")
4897       try:
4898         _RemoveDisks(self, iobj)
4899       finally:
4900         self.cfg.ReleaseDRBDMinors(instance)
4901         raise
4902
4903     feedback_fn("adding instance %s to cluster config" % instance)
4904
4905     self.cfg.AddInstance(iobj)
4906     # Declare that we don't want to remove the instance lock anymore, as we've
4907     # added the instance to the config
4908     del self.remove_locks[locking.LEVEL_INSTANCE]
4909     # Unlock all the nodes
4910     if self.op.mode == constants.INSTANCE_IMPORT:
4911       nodes_keep = [self.op.src_node]
4912       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4913                        if node != self.op.src_node]
4914       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4915       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4916     else:
4917       self.context.glm.release(locking.LEVEL_NODE)
4918       del self.acquired_locks[locking.LEVEL_NODE]
4919
4920     if self.op.wait_for_sync:
4921       disk_abort = not _WaitForSync(self, iobj)
4922     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4923       # make sure the disks are not degraded (still sync-ing is ok)
4924       time.sleep(15)
4925       feedback_fn("* checking mirrors status")
4926       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4927     else:
4928       disk_abort = False
4929
4930     if disk_abort:
4931       _RemoveDisks(self, iobj)
4932       self.cfg.RemoveInstance(iobj.name)
4933       # Make sure the instance lock gets removed
4934       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4935       raise errors.OpExecError("There are some degraded disks for"
4936                                " this instance")
4937
4938     feedback_fn("creating os for instance %s on node %s" %
4939                 (instance, pnode_name))
4940
4941     if iobj.disk_template != constants.DT_DISKLESS:
4942       if self.op.mode == constants.INSTANCE_CREATE:
4943         feedback_fn("* running the instance OS create scripts...")
4944         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4945         msg = result.RemoteFailMsg()
4946         if msg:
4947           raise errors.OpExecError("Could not add os for instance %s"
4948                                    " on node %s: %s" %
4949                                    (instance, pnode_name, msg))
4950
4951       elif self.op.mode == constants.INSTANCE_IMPORT:
4952         feedback_fn("* running the instance OS import scripts...")
4953         src_node = self.op.src_node
4954         src_images = self.src_images
4955         cluster_name = self.cfg.GetClusterName()
4956         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4957                                                          src_node, src_images,
4958                                                          cluster_name)
4959         msg = import_result.RemoteFailMsg()
4960         if msg:
4961           self.LogWarning("Error while importing the disk images for instance"
4962                           " %s on node %s: %s" % (instance, pnode_name, msg))
4963       else:
4964         # also checked in the prereq part
4965         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4966                                      % self.op.mode)
4967
4968     if self.op.start:
4969       iobj.admin_up = True
4970       self.cfg.Update(iobj)
4971       logging.info("Starting instance %s on node %s", instance, pnode_name)
4972       feedback_fn("* starting instance...")
4973       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4974       msg = result.RemoteFailMsg()
4975       if msg:
4976         raise errors.OpExecError("Could not start instance: %s" % msg)
4977
4978
4979 class LUConnectConsole(NoHooksLU):
4980   """Connect to an instance's console.
4981
4982   This is somewhat special in that it returns the command line that
4983   you need to run on the master node in order to connect to the
4984   console.
4985
4986   """
4987   _OP_REQP = ["instance_name"]
4988   REQ_BGL = False
4989
4990   def ExpandNames(self):
4991     self._ExpandAndLockInstance()
4992
4993   def CheckPrereq(self):
4994     """Check prerequisites.
4995
4996     This checks that the instance is in the cluster.
4997
4998     """
4999     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5000     assert self.instance is not None, \
5001       "Cannot retrieve locked instance %s" % self.op.instance_name
5002     _CheckNodeOnline(self, self.instance.primary_node)
5003
5004   def Exec(self, feedback_fn):
5005     """Connect to the console of an instance
5006
5007     """
5008     instance = self.instance
5009     node = instance.primary_node
5010
5011     node_insts = self.rpc.call_instance_list([node],
5012                                              [instance.hypervisor])[node]
5013     msg = node_insts.RemoteFailMsg()
5014     if msg:
5015       raise errors.OpExecError("Can't get node information from %s: %s" %
5016                                (node, msg))
5017
5018     if instance.name not in node_insts.payload:
5019       raise errors.OpExecError("Instance %s is not running." % instance.name)
5020
5021     logging.debug("Connecting to console of %s on %s", instance.name, node)
5022
5023     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5024     cluster = self.cfg.GetClusterInfo()
5025     # beparams and hvparams are passed separately, to avoid editing the
5026     # instance and then saving the defaults in the instance itself.
5027     hvparams = cluster.FillHV(instance)
5028     beparams = cluster.FillBE(instance)
5029     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5030
5031     # build ssh cmdline
5032     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5033
5034
5035 class LUReplaceDisks(LogicalUnit):
5036   """Replace the disks of an instance.
5037
5038   """
5039   HPATH = "mirrors-replace"
5040   HTYPE = constants.HTYPE_INSTANCE
5041   _OP_REQP = ["instance_name", "mode", "disks"]
5042   REQ_BGL = False
5043
5044   def CheckArguments(self):
5045     if not hasattr(self.op, "remote_node"):
5046       self.op.remote_node = None
5047     if not hasattr(self.op, "iallocator"):
5048       self.op.iallocator = None
5049
5050     # check for valid parameter combination
5051     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5052     if self.op.mode == constants.REPLACE_DISK_CHG:
5053       if cnt == 2:
5054         raise errors.OpPrereqError("When changing the secondary either an"
5055                                    " iallocator script must be used or the"
5056                                    " new node given")
5057       elif cnt == 0:
5058         raise errors.OpPrereqError("Give either the iallocator or the new"
5059                                    " secondary, not both")
5060     else: # not replacing the secondary
5061       if cnt != 2:
5062         raise errors.OpPrereqError("The iallocator and new node options can"
5063                                    " be used only when changing the"
5064                                    " secondary node")
5065
5066   def ExpandNames(self):
5067     self._ExpandAndLockInstance()
5068
5069     if self.op.iallocator is not None:
5070       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5071     elif self.op.remote_node is not None:
5072       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5073       if remote_node is None:
5074         raise errors.OpPrereqError("Node '%s' not known" %
5075                                    self.op.remote_node)
5076       self.op.remote_node = remote_node
5077       # Warning: do not remove the locking of the new secondary here
5078       # unless DRBD8.AddChildren is changed to work in parallel;
5079       # currently it doesn't since parallel invocations of
5080       # FindUnusedMinor will conflict
5081       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5082       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5083     else:
5084       self.needed_locks[locking.LEVEL_NODE] = []
5085       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5086
5087   def DeclareLocks(self, level):
5088     # If we're not already locking all nodes in the set we have to declare the
5089     # instance's primary/secondary nodes.
5090     if (level == locking.LEVEL_NODE and
5091         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5092       self._LockInstancesNodes()
5093
5094   def _RunAllocator(self):
5095     """Compute a new secondary node using an IAllocator.
5096
5097     """
5098     ial = IAllocator(self,
5099                      mode=constants.IALLOCATOR_MODE_RELOC,
5100                      name=self.op.instance_name,
5101                      relocate_from=[self.sec_node])
5102
5103     ial.Run(self.op.iallocator)
5104
5105     if not ial.success:
5106       raise errors.OpPrereqError("Can't compute nodes using"
5107                                  " iallocator '%s': %s" % (self.op.iallocator,
5108                                                            ial.info))
5109     if len(ial.nodes) != ial.required_nodes:
5110       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5111                                  " of nodes (%s), required %s" %
5112                                  (len(ial.nodes), ial.required_nodes))
5113     self.op.remote_node = ial.nodes[0]
5114     self.LogInfo("Selected new secondary for the instance: %s",
5115                  self.op.remote_node)
5116
5117   def BuildHooksEnv(self):
5118     """Build hooks env.
5119
5120     This runs on the master, the primary and all the secondaries.
5121
5122     """
5123     env = {
5124       "MODE": self.op.mode,
5125       "NEW_SECONDARY": self.op.remote_node,
5126       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5127       }
5128     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5129     nl = [
5130       self.cfg.GetMasterNode(),
5131       self.instance.primary_node,
5132       ]
5133     if self.op.remote_node is not None:
5134       nl.append(self.op.remote_node)
5135     return env, nl, nl
5136
5137   def CheckPrereq(self):
5138     """Check prerequisites.
5139
5140     This checks that the instance is in the cluster.
5141
5142     """
5143     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5144     assert instance is not None, \
5145       "Cannot retrieve locked instance %s" % self.op.instance_name
5146     self.instance = instance
5147
5148     if instance.disk_template != constants.DT_DRBD8:
5149       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5150                                  " instances")
5151
5152     if len(instance.secondary_nodes) != 1:
5153       raise errors.OpPrereqError("The instance has a strange layout,"
5154                                  " expected one secondary but found %d" %
5155                                  len(instance.secondary_nodes))
5156
5157     self.sec_node = instance.secondary_nodes[0]
5158
5159     if self.op.iallocator is not None:
5160       self._RunAllocator()
5161
5162     remote_node = self.op.remote_node
5163     if remote_node is not None:
5164       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5165       assert self.remote_node_info is not None, \
5166         "Cannot retrieve locked node %s" % remote_node
5167     else:
5168       self.remote_node_info = None
5169     if remote_node == instance.primary_node:
5170       raise errors.OpPrereqError("The specified node is the primary node of"
5171                                  " the instance.")
5172     elif remote_node == self.sec_node:
5173       raise errors.OpPrereqError("The specified node is already the"
5174                                  " secondary node of the instance.")
5175
5176     if self.op.mode == constants.REPLACE_DISK_PRI:
5177       n1 = self.tgt_node = instance.primary_node
5178       n2 = self.oth_node = self.sec_node
5179     elif self.op.mode == constants.REPLACE_DISK_SEC:
5180       n1 = self.tgt_node = self.sec_node
5181       n2 = self.oth_node = instance.primary_node
5182     elif self.op.mode == constants.REPLACE_DISK_CHG:
5183       n1 = self.new_node = remote_node
5184       n2 = self.oth_node = instance.primary_node
5185       self.tgt_node = self.sec_node
5186       _CheckNodeNotDrained(self, remote_node)
5187     else:
5188       raise errors.ProgrammerError("Unhandled disk replace mode")
5189
5190     _CheckNodeOnline(self, n1)
5191     _CheckNodeOnline(self, n2)
5192
5193     if not self.op.disks:
5194       self.op.disks = range(len(instance.disks))
5195
5196     for disk_idx in self.op.disks:
5197       instance.FindDisk(disk_idx)
5198
5199   def _ExecD8DiskOnly(self, feedback_fn):
5200     """Replace a disk on the primary or secondary for dbrd8.
5201
5202     The algorithm for replace is quite complicated:
5203
5204       1. for each disk to be replaced:
5205
5206         1. create new LVs on the target node with unique names
5207         1. detach old LVs from the drbd device
5208         1. rename old LVs to name_replaced.<time_t>
5209         1. rename new LVs to old LVs
5210         1. attach the new LVs (with the old names now) to the drbd device
5211
5212       1. wait for sync across all devices
5213
5214       1. for each modified disk:
5215
5216         1. remove old LVs (which have the name name_replaces.<time_t>)
5217
5218     Failures are not very well handled.
5219
5220     """
5221     steps_total = 6
5222     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5223     instance = self.instance
5224     iv_names = {}
5225     vgname = self.cfg.GetVGName()
5226     # start of work
5227     cfg = self.cfg
5228     tgt_node = self.tgt_node
5229     oth_node = self.oth_node
5230
5231     # Step: check device activation
5232     self.proc.LogStep(1, steps_total, "check device existence")
5233     info("checking volume groups")
5234     my_vg = cfg.GetVGName()
5235     results = self.rpc.call_vg_list([oth_node, tgt_node])
5236     if not results:
5237       raise errors.OpExecError("Can't list volume groups on the nodes")
5238     for node in oth_node, tgt_node:
5239       res = results[node]
5240       msg = res.RemoteFailMsg()
5241       if msg:
5242         raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5243       if my_vg not in res.payload:
5244         raise errors.OpExecError("Volume group '%s' not found on %s" %
5245                                  (my_vg, node))
5246     for idx, dev in enumerate(instance.disks):
5247       if idx not in self.op.disks:
5248         continue
5249       for node in tgt_node, oth_node:
5250         info("checking disk/%d on %s" % (idx, node))
5251         cfg.SetDiskID(dev, node)
5252         result = self.rpc.call_blockdev_find(node, dev)
5253         msg = result.RemoteFailMsg()
5254         if not msg and not result.payload:
5255           msg = "disk not found"
5256         if msg:
5257           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5258                                    (idx, node, msg))
5259
5260     # Step: check other node consistency
5261     self.proc.LogStep(2, steps_total, "check peer consistency")
5262     for idx, dev in enumerate(instance.disks):
5263       if idx not in self.op.disks:
5264         continue
5265       info("checking disk/%d consistency on %s" % (idx, oth_node))
5266       if not _CheckDiskConsistency(self, dev, oth_node,
5267                                    oth_node==instance.primary_node):
5268         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5269                                  " to replace disks on this node (%s)" %
5270                                  (oth_node, tgt_node))
5271
5272     # Step: create new storage
5273     self.proc.LogStep(3, steps_total, "allocate new storage")
5274     for idx, dev in enumerate(instance.disks):
5275       if idx not in self.op.disks:
5276         continue
5277       size = dev.size
5278       cfg.SetDiskID(dev, tgt_node)
5279       lv_names = [".disk%d_%s" % (idx, suf)
5280                   for suf in ["data", "meta"]]
5281       names = _GenerateUniqueNames(self, lv_names)
5282       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5283                              logical_id=(vgname, names[0]))
5284       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5285                              logical_id=(vgname, names[1]))
5286       new_lvs = [lv_data, lv_meta]
5287       old_lvs = dev.children
5288       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5289       info("creating new local storage on %s for %s" %
5290            (tgt_node, dev.iv_name))
5291       # we pass force_create=True to force the LVM creation
5292       for new_lv in new_lvs:
5293         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5294                         _GetInstanceInfoText(instance), False)
5295
5296     # Step: for each lv, detach+rename*2+attach
5297     self.proc.LogStep(4, steps_total, "change drbd configuration")
5298     for dev, old_lvs, new_lvs in iv_names.itervalues():
5299       info("detaching %s drbd from local storage" % dev.iv_name)
5300       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5301       msg = result.RemoteFailMsg()
5302       if msg:
5303         raise errors.OpExecError("Can't detach drbd from local storage on node"
5304                                  " %s for device %s: %s" %
5305                                  (tgt_node, dev.iv_name, msg))
5306       #dev.children = []
5307       #cfg.Update(instance)
5308
5309       # ok, we created the new LVs, so now we know we have the needed
5310       # storage; as such, we proceed on the target node to rename
5311       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5312       # using the assumption that logical_id == physical_id (which in
5313       # turn is the unique_id on that node)
5314
5315       # FIXME(iustin): use a better name for the replaced LVs
5316       temp_suffix = int(time.time())
5317       ren_fn = lambda d, suff: (d.physical_id[0],
5318                                 d.physical_id[1] + "_replaced-%s" % suff)
5319       # build the rename list based on what LVs exist on the node
5320       rlist = []
5321       for to_ren in old_lvs:
5322         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5323         if not result.RemoteFailMsg() and result.payload:
5324           # device exists
5325           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5326
5327       info("renaming the old LVs on the target node")
5328       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5329       msg = result.RemoteFailMsg()
5330       if msg:
5331         raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5332                                  (tgt_node, msg))
5333       # now we rename the new LVs to the old LVs
5334       info("renaming the new LVs on the target node")
5335       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5336       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5337       msg = result.RemoteFailMsg()
5338       if msg:
5339         raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5340                                  (tgt_node, msg))
5341
5342       for old, new in zip(old_lvs, new_lvs):
5343         new.logical_id = old.logical_id
5344         cfg.SetDiskID(new, tgt_node)
5345
5346       for disk in old_lvs:
5347         disk.logical_id = ren_fn(disk, temp_suffix)
5348         cfg.SetDiskID(disk, tgt_node)
5349
5350       # now that the new lvs have the old name, we can add them to the device
5351       info("adding new mirror component on %s" % tgt_node)
5352       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5353       msg = result.RemoteFailMsg()
5354       if msg:
5355         for new_lv in new_lvs:
5356           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5357           if msg:
5358             warning("Can't rollback device %s: %s", dev, msg,
5359                     hint="cleanup manually the unused logical volumes")
5360         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5361
5362       dev.children = new_lvs
5363       cfg.Update(instance)
5364
5365     # Step: wait for sync
5366
5367     # this can fail as the old devices are degraded and _WaitForSync
5368     # does a combined result over all disks, so we don't check its
5369     # return value
5370     self.proc.LogStep(5, steps_total, "sync devices")
5371     _WaitForSync(self, instance, unlock=True)
5372
5373     # so check manually all the devices
5374     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5375       cfg.SetDiskID(dev, instance.primary_node)
5376       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5377       msg = result.RemoteFailMsg()
5378       if not msg and not result.payload:
5379         msg = "disk not found"
5380       if msg:
5381         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5382                                  (name, msg))
5383       if result.payload[5]:
5384         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5385
5386     # Step: remove old storage
5387     self.proc.LogStep(6, steps_total, "removing old storage")
5388     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5389       info("remove logical volumes for %s" % name)
5390       for lv in old_lvs:
5391         cfg.SetDiskID(lv, tgt_node)
5392         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5393         if msg:
5394           warning("Can't remove old LV: %s" % msg,
5395                   hint="manually remove unused LVs")
5396           continue
5397
5398   def _ExecD8Secondary(self, feedback_fn):
5399     """Replace the secondary node for drbd8.
5400
5401     The algorithm for replace is quite complicated:
5402       - for all disks of the instance:
5403         - create new LVs on the new node with same names
5404         - shutdown the drbd device on the old secondary
5405         - disconnect the drbd network on the primary
5406         - create the drbd device on the new secondary
5407         - network attach the drbd on the primary, using an artifice:
5408           the drbd code for Attach() will connect to the network if it
5409           finds a device which is connected to the good local disks but
5410           not network enabled
5411       - wait for sync across all devices
5412       - remove all disks from the old secondary
5413
5414     Failures are not very well handled.
5415
5416     """
5417     steps_total = 6
5418     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5419     instance = self.instance
5420     iv_names = {}
5421     # start of work
5422     cfg = self.cfg
5423     old_node = self.tgt_node
5424     new_node = self.new_node
5425     pri_node = instance.primary_node
5426     nodes_ip = {
5427       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5428       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5429       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5430       }
5431
5432     # Step: check device activation
5433     self.proc.LogStep(1, steps_total, "check device existence")
5434     info("checking volume groups")
5435     my_vg = cfg.GetVGName()
5436     results = self.rpc.call_vg_list([pri_node, new_node])
5437     for node in pri_node, new_node:
5438       res = results[node]
5439       msg = res.RemoteFailMsg()
5440       if msg:
5441         raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5442       if my_vg not in res.payload:
5443         raise errors.OpExecError("Volume group '%s' not found on %s" %
5444                                  (my_vg, node))
5445     for idx, dev in enumerate(instance.disks):
5446       if idx not in self.op.disks:
5447         continue
5448       info("checking disk/%d on %s" % (idx, pri_node))
5449       cfg.SetDiskID(dev, pri_node)
5450       result = self.rpc.call_blockdev_find(pri_node, dev)
5451       msg = result.RemoteFailMsg()
5452       if not msg and not result.payload:
5453         msg = "disk not found"
5454       if msg:
5455         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5456                                  (idx, pri_node, msg))
5457
5458     # Step: check other node consistency
5459     self.proc.LogStep(2, steps_total, "check peer consistency")
5460     for idx, dev in enumerate(instance.disks):
5461       if idx not in self.op.disks:
5462         continue
5463       info("checking disk/%d consistency on %s" % (idx, pri_node))
5464       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5465         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5466                                  " unsafe to replace the secondary" %
5467                                  pri_node)
5468
5469     # Step: create new storage
5470     self.proc.LogStep(3, steps_total, "allocate new storage")
5471     for idx, dev in enumerate(instance.disks):
5472       info("adding new local storage on %s for disk/%d" %
5473            (new_node, idx))
5474       # we pass force_create=True to force LVM creation
5475       for new_lv in dev.children:
5476         _CreateBlockDev(self, new_node, instance, new_lv, True,
5477                         _GetInstanceInfoText(instance), False)
5478
5479     # Step 4: dbrd minors and drbd setups changes
5480     # after this, we must manually remove the drbd minors on both the
5481     # error and the success paths
5482     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5483                                    instance.name)
5484     logging.debug("Allocated minors %s" % (minors,))
5485     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5486     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5487       size = dev.size
5488       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5489       # create new devices on new_node; note that we create two IDs:
5490       # one without port, so the drbd will be activated without
5491       # networking information on the new node at this stage, and one
5492       # with network, for the latter activation in step 4
5493       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5494       if pri_node == o_node1:
5495         p_minor = o_minor1
5496       else:
5497         p_minor = o_minor2
5498
5499       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5500       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5501
5502       iv_names[idx] = (dev, dev.children, new_net_id)
5503       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5504                     new_net_id)
5505       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5506                               logical_id=new_alone_id,
5507                               children=dev.children)
5508       try:
5509         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5510                               _GetInstanceInfoText(instance), False)
5511       except errors.GenericError:
5512         self.cfg.ReleaseDRBDMinors(instance.name)
5513         raise
5514
5515     for idx, dev in enumerate(instance.disks):
5516       # we have new devices, shutdown the drbd on the old secondary
5517       info("shutting down drbd for disk/%d on old node" % idx)
5518       cfg.SetDiskID(dev, old_node)
5519       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5520       if msg:
5521         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5522                 (idx, msg),
5523                 hint="Please cleanup this device manually as soon as possible")
5524
5525     info("detaching primary drbds from the network (=> standalone)")
5526     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5527                                                instance.disks)[pri_node]
5528
5529     msg = result.RemoteFailMsg()
5530     if msg:
5531       # detaches didn't succeed (unlikely)
5532       self.cfg.ReleaseDRBDMinors(instance.name)
5533       raise errors.OpExecError("Can't detach the disks from the network on"
5534                                " old node: %s" % (msg,))
5535
5536     # if we managed to detach at least one, we update all the disks of
5537     # the instance to point to the new secondary
5538     info("updating instance configuration")
5539     for dev, _, new_logical_id in iv_names.itervalues():
5540       dev.logical_id = new_logical_id
5541       cfg.SetDiskID(dev, pri_node)
5542     cfg.Update(instance)
5543
5544     # and now perform the drbd attach
5545     info("attaching primary drbds to new secondary (standalone => connected)")
5546     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5547                                            instance.disks, instance.name,
5548                                            False)
5549     for to_node, to_result in result.items():
5550       msg = to_result.RemoteFailMsg()
5551       if msg:
5552         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5553                 hint="please do a gnt-instance info to see the"
5554                 " status of disks")
5555
5556     # this can fail as the old devices are degraded and _WaitForSync
5557     # does a combined result over all disks, so we don't check its
5558     # return value
5559     self.proc.LogStep(5, steps_total, "sync devices")
5560     _WaitForSync(self, instance, unlock=True)
5561
5562     # so check manually all the devices
5563     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5564       cfg.SetDiskID(dev, pri_node)
5565       result = self.rpc.call_blockdev_find(pri_node, dev)
5566       msg = result.RemoteFailMsg()
5567       if not msg and not result.payload:
5568         msg = "disk not found"
5569       if msg:
5570         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5571                                  (idx, msg))
5572       if result.payload[5]:
5573         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5574
5575     self.proc.LogStep(6, steps_total, "removing old storage")
5576     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5577       info("remove logical volumes for disk/%d" % idx)
5578       for lv in old_lvs:
5579         cfg.SetDiskID(lv, old_node)
5580         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5581         if msg:
5582           warning("Can't remove LV on old secondary: %s", msg,
5583                   hint="Cleanup stale volumes by hand")
5584
5585   def Exec(self, feedback_fn):
5586     """Execute disk replacement.
5587
5588     This dispatches the disk replacement to the appropriate handler.
5589
5590     """
5591     instance = self.instance
5592
5593     # Activate the instance disks if we're replacing them on a down instance
5594     if not instance.admin_up:
5595       _StartInstanceDisks(self, instance, True)
5596
5597     if self.op.mode == constants.REPLACE_DISK_CHG:
5598       fn = self._ExecD8Secondary
5599     else:
5600       fn = self._ExecD8DiskOnly
5601
5602     ret = fn(feedback_fn)
5603
5604     # Deactivate the instance disks if we're replacing them on a down instance
5605     if not instance.admin_up:
5606       _SafeShutdownInstanceDisks(self, instance)
5607
5608     return ret
5609
5610
5611 class LUGrowDisk(LogicalUnit):
5612   """Grow a disk of an instance.
5613
5614   """
5615   HPATH = "disk-grow"
5616   HTYPE = constants.HTYPE_INSTANCE
5617   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5618   REQ_BGL = False
5619
5620   def ExpandNames(self):
5621     self._ExpandAndLockInstance()
5622     self.needed_locks[locking.LEVEL_NODE] = []
5623     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5624
5625   def DeclareLocks(self, level):
5626     if level == locking.LEVEL_NODE:
5627       self._LockInstancesNodes()
5628
5629   def BuildHooksEnv(self):
5630     """Build hooks env.
5631
5632     This runs on the master, the primary and all the secondaries.
5633
5634     """
5635     env = {
5636       "DISK": self.op.disk,
5637       "AMOUNT": self.op.amount,
5638       }
5639     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5640     nl = [
5641       self.cfg.GetMasterNode(),
5642       self.instance.primary_node,
5643       ]
5644     return env, nl, nl
5645
5646   def CheckPrereq(self):
5647     """Check prerequisites.
5648
5649     This checks that the instance is in the cluster.
5650
5651     """
5652     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5653     assert instance is not None, \
5654       "Cannot retrieve locked instance %s" % self.op.instance_name
5655     nodenames = list(instance.all_nodes)
5656     for node in nodenames:
5657       _CheckNodeOnline(self, node)
5658
5659
5660     self.instance = instance
5661
5662     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5663       raise errors.OpPrereqError("Instance's disk layout does not support"
5664                                  " growing.")
5665
5666     self.disk = instance.FindDisk(self.op.disk)
5667
5668     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5669                                        instance.hypervisor)
5670     for node in nodenames:
5671       info = nodeinfo[node]
5672       if info.failed or not info.data:
5673         raise errors.OpPrereqError("Cannot get current information"
5674                                    " from node '%s'" % node)
5675       vg_free = info.data.get('vg_free', None)
5676       if not isinstance(vg_free, int):
5677         raise errors.OpPrereqError("Can't compute free disk space on"
5678                                    " node %s" % node)
5679       if self.op.amount > vg_free:
5680         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5681                                    " %d MiB available, %d MiB required" %
5682                                    (node, vg_free, self.op.amount))
5683
5684   def Exec(self, feedback_fn):
5685     """Execute disk grow.
5686
5687     """
5688     instance = self.instance
5689     disk = self.disk
5690     for node in instance.all_nodes:
5691       self.cfg.SetDiskID(disk, node)
5692       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5693       msg = result.RemoteFailMsg()
5694       if msg:
5695         raise errors.OpExecError("Grow request failed to node %s: %s" %
5696                                  (node, msg))
5697     disk.RecordGrow(self.op.amount)
5698     self.cfg.Update(instance)
5699     if self.op.wait_for_sync:
5700       disk_abort = not _WaitForSync(self, instance)
5701       if disk_abort:
5702         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5703                              " status.\nPlease check the instance.")
5704
5705
5706 class LUQueryInstanceData(NoHooksLU):
5707   """Query runtime instance data.
5708
5709   """
5710   _OP_REQP = ["instances", "static"]
5711   REQ_BGL = False
5712
5713   def ExpandNames(self):
5714     self.needed_locks = {}
5715     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5716
5717     if not isinstance(self.op.instances, list):
5718       raise errors.OpPrereqError("Invalid argument type 'instances'")
5719
5720     if self.op.instances:
5721       self.wanted_names = []
5722       for name in self.op.instances:
5723         full_name = self.cfg.ExpandInstanceName(name)
5724         if full_name is None:
5725           raise errors.OpPrereqError("Instance '%s' not known" % name)
5726         self.wanted_names.append(full_name)
5727       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5728     else:
5729       self.wanted_names = None
5730       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5731
5732     self.needed_locks[locking.LEVEL_NODE] = []
5733     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5734
5735   def DeclareLocks(self, level):
5736     if level == locking.LEVEL_NODE:
5737       self._LockInstancesNodes()
5738
5739   def CheckPrereq(self):
5740     """Check prerequisites.
5741
5742     This only checks the optional instance list against the existing names.
5743
5744     """
5745     if self.wanted_names is None:
5746       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5747
5748     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5749                              in self.wanted_names]
5750     return
5751
5752   def _ComputeDiskStatus(self, instance, snode, dev):
5753     """Compute block device status.
5754
5755     """
5756     static = self.op.static
5757     if not static:
5758       self.cfg.SetDiskID(dev, instance.primary_node)
5759       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5760       if dev_pstatus.offline:
5761         dev_pstatus = None
5762       else:
5763         msg = dev_pstatus.RemoteFailMsg()
5764         if msg:
5765           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5766                                    (instance.name, msg))
5767         dev_pstatus = dev_pstatus.payload
5768     else:
5769       dev_pstatus = None
5770
5771     if dev.dev_type in constants.LDS_DRBD:
5772       # we change the snode then (otherwise we use the one passed in)
5773       if dev.logical_id[0] == instance.primary_node:
5774         snode = dev.logical_id[1]
5775       else:
5776         snode = dev.logical_id[0]
5777
5778     if snode and not static:
5779       self.cfg.SetDiskID(dev, snode)
5780       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5781       if dev_sstatus.offline:
5782         dev_sstatus = None
5783       else:
5784         msg = dev_sstatus.RemoteFailMsg()
5785         if msg:
5786           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5787                                    (instance.name, msg))
5788         dev_sstatus = dev_sstatus.payload
5789     else:
5790       dev_sstatus = None
5791
5792     if dev.children:
5793       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5794                       for child in dev.children]
5795     else:
5796       dev_children = []
5797
5798     data = {
5799       "iv_name": dev.iv_name,
5800       "dev_type": dev.dev_type,
5801       "logical_id": dev.logical_id,
5802       "physical_id": dev.physical_id,
5803       "pstatus": dev_pstatus,
5804       "sstatus": dev_sstatus,
5805       "children": dev_children,
5806       "mode": dev.mode,
5807       }
5808
5809     return data
5810
5811   def Exec(self, feedback_fn):
5812     """Gather and return data"""
5813     result = {}
5814
5815     cluster = self.cfg.GetClusterInfo()
5816
5817     for instance in self.wanted_instances:
5818       if not self.op.static:
5819         remote_info = self.rpc.call_instance_info(instance.primary_node,
5820                                                   instance.name,
5821                                                   instance.hypervisor)
5822         msg = remote_info.RemoteFailMsg()
5823         if msg:
5824           raise errors.OpExecError("Error checking node %s: %s" %
5825                                    (instance.primary_node, msg))
5826         remote_info = remote_info.payload
5827         if remote_info and "state" in remote_info:
5828           remote_state = "up"
5829         else:
5830           remote_state = "down"
5831       else:
5832         remote_state = None
5833       if instance.admin_up:
5834         config_state = "up"
5835       else:
5836         config_state = "down"
5837
5838       disks = [self._ComputeDiskStatus(instance, None, device)
5839                for device in instance.disks]
5840
5841       idict = {
5842         "name": instance.name,
5843         "config_state": config_state,
5844         "run_state": remote_state,
5845         "pnode": instance.primary_node,
5846         "snodes": instance.secondary_nodes,
5847         "os": instance.os,
5848         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5849         "disks": disks,
5850         "hypervisor": instance.hypervisor,
5851         "network_port": instance.network_port,
5852         "hv_instance": instance.hvparams,
5853         "hv_actual": cluster.FillHV(instance),
5854         "be_instance": instance.beparams,
5855         "be_actual": cluster.FillBE(instance),
5856         }
5857
5858       result[instance.name] = idict
5859
5860     return result
5861
5862
5863 class LUSetInstanceParams(LogicalUnit):
5864   """Modifies an instances's parameters.
5865
5866   """
5867   HPATH = "instance-modify"
5868   HTYPE = constants.HTYPE_INSTANCE
5869   _OP_REQP = ["instance_name"]
5870   REQ_BGL = False
5871
5872   def CheckArguments(self):
5873     if not hasattr(self.op, 'nics'):
5874       self.op.nics = []
5875     if not hasattr(self.op, 'disks'):
5876       self.op.disks = []
5877     if not hasattr(self.op, 'beparams'):
5878       self.op.beparams = {}
5879     if not hasattr(self.op, 'hvparams'):
5880       self.op.hvparams = {}
5881     self.op.force = getattr(self.op, "force", False)
5882     if not (self.op.nics or self.op.disks or
5883             self.op.hvparams or self.op.beparams):
5884       raise errors.OpPrereqError("No changes submitted")
5885
5886     # Disk validation
5887     disk_addremove = 0
5888     for disk_op, disk_dict in self.op.disks:
5889       if disk_op == constants.DDM_REMOVE:
5890         disk_addremove += 1
5891         continue
5892       elif disk_op == constants.DDM_ADD:
5893         disk_addremove += 1
5894       else:
5895         if not isinstance(disk_op, int):
5896           raise errors.OpPrereqError("Invalid disk index")
5897       if disk_op == constants.DDM_ADD:
5898         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5899         if mode not in constants.DISK_ACCESS_SET:
5900           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5901         size = disk_dict.get('size', None)
5902         if size is None:
5903           raise errors.OpPrereqError("Required disk parameter size missing")
5904         try:
5905           size = int(size)
5906         except ValueError, err:
5907           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5908                                      str(err))
5909         disk_dict['size'] = size
5910       else:
5911         # modification of disk
5912         if 'size' in disk_dict:
5913           raise errors.OpPrereqError("Disk size change not possible, use"
5914                                      " grow-disk")
5915
5916     if disk_addremove > 1:
5917       raise errors.OpPrereqError("Only one disk add or remove operation"
5918                                  " supported at a time")
5919
5920     # NIC validation
5921     nic_addremove = 0
5922     for nic_op, nic_dict in self.op.nics:
5923       if nic_op == constants.DDM_REMOVE:
5924         nic_addremove += 1
5925         continue
5926       elif nic_op == constants.DDM_ADD:
5927         nic_addremove += 1
5928       else:
5929         if not isinstance(nic_op, int):
5930           raise errors.OpPrereqError("Invalid nic index")
5931
5932       # nic_dict should be a dict
5933       nic_ip = nic_dict.get('ip', None)
5934       if nic_ip is not None:
5935         if nic_ip.lower() == constants.VALUE_NONE:
5936           nic_dict['ip'] = None
5937         else:
5938           if not utils.IsValidIP(nic_ip):
5939             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5940
5941       nic_bridge = nic_dict.get('bridge', None)
5942       nic_link = nic_dict.get('link', None)
5943       if nic_bridge and nic_link:
5944         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5945       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5946         nic_dict['bridge'] = None
5947       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5948         nic_dict['link'] = None
5949
5950       if nic_op == constants.DDM_ADD:
5951         nic_mac = nic_dict.get('mac', None)
5952         if nic_mac is None:
5953           nic_dict['mac'] = constants.VALUE_AUTO
5954
5955       if 'mac' in nic_dict:
5956         nic_mac = nic_dict['mac']
5957         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5958           if not utils.IsValidMac(nic_mac):
5959             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5960         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5961           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5962                                      " modifying an existing nic")
5963
5964     if nic_addremove > 1:
5965       raise errors.OpPrereqError("Only one NIC add or remove operation"
5966                                  " supported at a time")
5967
5968   def ExpandNames(self):
5969     self._ExpandAndLockInstance()
5970     self.needed_locks[locking.LEVEL_NODE] = []
5971     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5972
5973   def DeclareLocks(self, level):
5974     if level == locking.LEVEL_NODE:
5975       self._LockInstancesNodes()
5976
5977   def BuildHooksEnv(self):
5978     """Build hooks env.
5979
5980     This runs on the master, primary and secondaries.
5981
5982     """
5983     args = dict()
5984     if constants.BE_MEMORY in self.be_new:
5985       args['memory'] = self.be_new[constants.BE_MEMORY]
5986     if constants.BE_VCPUS in self.be_new:
5987       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5988     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5989     # information at all.
5990     if self.op.nics:
5991       args['nics'] = []
5992       nic_override = dict(self.op.nics)
5993       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
5994       for idx, nic in enumerate(self.instance.nics):
5995         if idx in nic_override:
5996           this_nic_override = nic_override[idx]
5997         else:
5998           this_nic_override = {}
5999         if 'ip' in this_nic_override:
6000           ip = this_nic_override['ip']
6001         else:
6002           ip = nic.ip
6003         if 'mac' in this_nic_override:
6004           mac = this_nic_override['mac']
6005         else:
6006           mac = nic.mac
6007         if idx in self.nic_pnew:
6008           nicparams = self.nic_pnew[idx]
6009         else:
6010           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6011         mode = nicparams[constants.NIC_MODE]
6012         link = nicparams[constants.NIC_LINK]
6013         args['nics'].append((ip, mac, mode, link))
6014       if constants.DDM_ADD in nic_override:
6015         ip = nic_override[constants.DDM_ADD].get('ip', None)
6016         mac = nic_override[constants.DDM_ADD]['mac']
6017         nicparams = self.nic_pnew[constants.DDM_ADD]
6018         mode = nicparams[constants.NIC_MODE]
6019         link = nicparams[constants.NIC_LINK]
6020         args['nics'].append((ip, mac, mode, link))
6021       elif constants.DDM_REMOVE in nic_override:
6022         del args['nics'][-1]
6023
6024     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6025     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6026     return env, nl, nl
6027
6028   def _GetUpdatedParams(self, old_params, update_dict,
6029                         default_values, parameter_types):
6030     """Return the new params dict for the given params.
6031
6032     @type old_params: dict
6033     @type old_params: old parameters
6034     @type update_dict: dict
6035     @type update_dict: dict containing new parameter values,
6036                        or constants.VALUE_DEFAULT to reset the
6037                        parameter to its default value
6038     @type default_values: dict
6039     @param default_values: default values for the filled parameters
6040     @type parameter_types: dict
6041     @param parameter_types: dict mapping target dict keys to types
6042                             in constants.ENFORCEABLE_TYPES
6043     @rtype: (dict, dict)
6044     @return: (new_parameters, filled_parameters)
6045
6046     """
6047     params_copy = copy.deepcopy(old_params)
6048     for key, val in update_dict.iteritems():
6049       if val == constants.VALUE_DEFAULT:
6050         try:
6051           del params_copy[key]
6052         except KeyError:
6053           pass
6054       else:
6055         params_copy[key] = val
6056     utils.ForceDictType(params_copy, parameter_types)
6057     params_filled = objects.FillDict(default_values, params_copy)
6058     return (params_copy, params_filled)
6059
6060   def CheckPrereq(self):
6061     """Check prerequisites.
6062
6063     This only checks the instance list against the existing names.
6064
6065     """
6066     force = self.force = self.op.force
6067
6068     # checking the new params on the primary/secondary nodes
6069
6070     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6071     cluster = self.cluster = self.cfg.GetClusterInfo()
6072     assert self.instance is not None, \
6073       "Cannot retrieve locked instance %s" % self.op.instance_name
6074     pnode = instance.primary_node
6075     nodelist = list(instance.all_nodes)
6076
6077     # hvparams processing
6078     if self.op.hvparams:
6079       i_hvdict, hv_new = self._GetUpdatedParams(
6080                              instance.hvparams, self.op.hvparams,
6081                              cluster.hvparams[instance.hypervisor],
6082                              constants.HVS_PARAMETER_TYPES)
6083       # local check
6084       hypervisor.GetHypervisor(
6085         instance.hypervisor).CheckParameterSyntax(hv_new)
6086       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6087       self.hv_new = hv_new # the new actual values
6088       self.hv_inst = i_hvdict # the new dict (without defaults)
6089     else:
6090       self.hv_new = self.hv_inst = {}
6091
6092     # beparams processing
6093     if self.op.beparams:
6094       i_bedict, be_new = self._GetUpdatedParams(
6095                              instance.beparams, self.op.beparams,
6096                              cluster.beparams[constants.PP_DEFAULT],
6097                              constants.BES_PARAMETER_TYPES)
6098       self.be_new = be_new # the new actual values
6099       self.be_inst = i_bedict # the new dict (without defaults)
6100     else:
6101       self.be_new = self.be_inst = {}
6102
6103     self.warn = []
6104
6105     if constants.BE_MEMORY in self.op.beparams and not self.force:
6106       mem_check_list = [pnode]
6107       if be_new[constants.BE_AUTO_BALANCE]:
6108         # either we changed auto_balance to yes or it was from before
6109         mem_check_list.extend(instance.secondary_nodes)
6110       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6111                                                   instance.hypervisor)
6112       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6113                                          instance.hypervisor)
6114       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6115         # Assume the primary node is unreachable and go ahead
6116         self.warn.append("Can't get info from primary node %s" % pnode)
6117       elif instance_info.RemoteFailMsg():
6118         self.warn.append("Can't get instance runtime information: %s" %
6119                         instance_info.RemoteFailMsg())
6120       else:
6121         if instance_info.payload:
6122           current_mem = int(instance_info.payload['memory'])
6123         else:
6124           # Assume instance not running
6125           # (there is a slight race condition here, but it's not very probable,
6126           # and we have no other way to check)
6127           current_mem = 0
6128         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6129                     nodeinfo[pnode].data['memory_free'])
6130         if miss_mem > 0:
6131           raise errors.OpPrereqError("This change will prevent the instance"
6132                                      " from starting, due to %d MB of memory"
6133                                      " missing on its primary node" % miss_mem)
6134
6135       if be_new[constants.BE_AUTO_BALANCE]:
6136         for node, nres in nodeinfo.iteritems():
6137           if node not in instance.secondary_nodes:
6138             continue
6139           if nres.failed or not isinstance(nres.data, dict):
6140             self.warn.append("Can't get info from secondary node %s" % node)
6141           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6142             self.warn.append("Not enough memory to failover instance to"
6143                              " secondary node %s" % node)
6144
6145     # NIC processing
6146     self.nic_pnew = {}
6147     self.nic_pinst = {}
6148     for nic_op, nic_dict in self.op.nics:
6149       if nic_op == constants.DDM_REMOVE:
6150         if not instance.nics:
6151           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6152         continue
6153       if nic_op != constants.DDM_ADD:
6154         # an existing nic
6155         if nic_op < 0 or nic_op >= len(instance.nics):
6156           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6157                                      " are 0 to %d" %
6158                                      (nic_op, len(instance.nics)))
6159         old_nic_params = instance.nics[nic_op].nicparams
6160         old_nic_ip = instance.nics[nic_op].ip
6161       else:
6162         old_nic_params = {}
6163         old_nic_ip = None
6164
6165       update_params_dict = dict([(key, nic_dict[key])
6166                                  for key in constants.NICS_PARAMETERS
6167                                  if key in nic_dict])
6168
6169       if 'bridge' in nic_dict:
6170         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6171
6172       new_nic_params, new_filled_nic_params = \
6173           self._GetUpdatedParams(old_nic_params, update_params_dict,
6174                                  cluster.nicparams[constants.PP_DEFAULT],
6175                                  constants.NICS_PARAMETER_TYPES)
6176       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6177       self.nic_pinst[nic_op] = new_nic_params
6178       self.nic_pnew[nic_op] = new_filled_nic_params
6179       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6180
6181       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6182         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6183         result = self.rpc.call_bridges_exist(pnode, [nic_bridge])
6184         msg = result.RemoteFailMsg()
6185         if msg:
6186           msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6187           if self.force:
6188             self.warn.append(msg)
6189           else:
6190             raise errors.OpPrereqError(msg)
6191       if new_nic_mode == constants.NIC_MODE_ROUTED:
6192         if 'ip' in nic_dict:
6193           nic_ip = nic_dict['ip']
6194         else:
6195           nic_ip = old_nic_ip
6196         if nic_ip is None:
6197           raise errors.OpPrereqError('Cannot set the nic ip to None'
6198                                      ' on a routed nic')
6199       if 'mac' in nic_dict:
6200         nic_mac = nic_dict['mac']
6201         if nic_mac is None:
6202           raise errors.OpPrereqError('Cannot set the nic mac to None')
6203         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6204           # otherwise generate the mac
6205           nic_dict['mac'] = self.cfg.GenerateMAC()
6206         else:
6207           # or validate/reserve the current one
6208           if self.cfg.IsMacInUse(nic_mac):
6209             raise errors.OpPrereqError("MAC address %s already in use"
6210                                        " in cluster" % nic_mac)
6211
6212     # DISK processing
6213     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6214       raise errors.OpPrereqError("Disk operations not supported for"
6215                                  " diskless instances")
6216     for disk_op, disk_dict in self.op.disks:
6217       if disk_op == constants.DDM_REMOVE:
6218         if len(instance.disks) == 1:
6219           raise errors.OpPrereqError("Cannot remove the last disk of"
6220                                      " an instance")
6221         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6222         ins_l = ins_l[pnode]
6223         msg = ins_l.RemoteFailMsg()
6224         if msg:
6225           raise errors.OpPrereqError("Can't contact node %s: %s" %
6226                                      (pnode, msg))
6227         if instance.name in ins_l.payload:
6228           raise errors.OpPrereqError("Instance is running, can't remove"
6229                                      " disks.")
6230
6231       if (disk_op == constants.DDM_ADD and
6232           len(instance.nics) >= constants.MAX_DISKS):
6233         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6234                                    " add more" % constants.MAX_DISKS)
6235       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6236         # an existing disk
6237         if disk_op < 0 or disk_op >= len(instance.disks):
6238           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6239                                      " are 0 to %d" %
6240                                      (disk_op, len(instance.disks)))
6241
6242     return
6243
6244   def Exec(self, feedback_fn):
6245     """Modifies an instance.
6246
6247     All parameters take effect only at the next restart of the instance.
6248
6249     """
6250     # Process here the warnings from CheckPrereq, as we don't have a
6251     # feedback_fn there.
6252     for warn in self.warn:
6253       feedback_fn("WARNING: %s" % warn)
6254
6255     result = []
6256     instance = self.instance
6257     cluster = self.cluster
6258     # disk changes
6259     for disk_op, disk_dict in self.op.disks:
6260       if disk_op == constants.DDM_REMOVE:
6261         # remove the last disk
6262         device = instance.disks.pop()
6263         device_idx = len(instance.disks)
6264         for node, disk in device.ComputeNodeTree(instance.primary_node):
6265           self.cfg.SetDiskID(disk, node)
6266           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6267           if msg:
6268             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6269                             " continuing anyway", device_idx, node, msg)
6270         result.append(("disk/%d" % device_idx, "remove"))
6271       elif disk_op == constants.DDM_ADD:
6272         # add a new disk
6273         if instance.disk_template == constants.DT_FILE:
6274           file_driver, file_path = instance.disks[0].logical_id
6275           file_path = os.path.dirname(file_path)
6276         else:
6277           file_driver = file_path = None
6278         disk_idx_base = len(instance.disks)
6279         new_disk = _GenerateDiskTemplate(self,
6280                                          instance.disk_template,
6281                                          instance.name, instance.primary_node,
6282                                          instance.secondary_nodes,
6283                                          [disk_dict],
6284                                          file_path,
6285                                          file_driver,
6286                                          disk_idx_base)[0]
6287         instance.disks.append(new_disk)
6288         info = _GetInstanceInfoText(instance)
6289
6290         logging.info("Creating volume %s for instance %s",
6291                      new_disk.iv_name, instance.name)
6292         # Note: this needs to be kept in sync with _CreateDisks
6293         #HARDCODE
6294         for node in instance.all_nodes:
6295           f_create = node == instance.primary_node
6296           try:
6297             _CreateBlockDev(self, node, instance, new_disk,
6298                             f_create, info, f_create)
6299           except errors.OpExecError, err:
6300             self.LogWarning("Failed to create volume %s (%s) on"
6301                             " node %s: %s",
6302                             new_disk.iv_name, new_disk, node, err)
6303         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6304                        (new_disk.size, new_disk.mode)))
6305       else:
6306         # change a given disk
6307         instance.disks[disk_op].mode = disk_dict['mode']
6308         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6309     # NIC changes
6310     for nic_op, nic_dict in self.op.nics:
6311       if nic_op == constants.DDM_REMOVE:
6312         # remove the last nic
6313         del instance.nics[-1]
6314         result.append(("nic.%d" % len(instance.nics), "remove"))
6315       elif nic_op == constants.DDM_ADD:
6316         # mac and bridge should be set, by now
6317         mac = nic_dict['mac']
6318         ip = nic_dict.get('ip', None)
6319         nicparams = self.nic_pinst[constants.DDM_ADD]
6320         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6321         instance.nics.append(new_nic)
6322         result.append(("nic.%d" % (len(instance.nics) - 1),
6323                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6324                        (new_nic.mac, new_nic.ip,
6325                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6326                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6327                        )))
6328       else:
6329         for key in 'mac', 'ip':
6330           if key in nic_dict:
6331             setattr(instance.nics[nic_op], key, nic_dict[key])
6332         if nic_op in self.nic_pnew:
6333           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6334         for key, val in nic_dict.iteritems():
6335           result.append(("nic.%s/%d" % (key, nic_op), val))
6336
6337     # hvparams changes
6338     if self.op.hvparams:
6339       instance.hvparams = self.hv_inst
6340       for key, val in self.op.hvparams.iteritems():
6341         result.append(("hv/%s" % key, val))
6342
6343     # beparams changes
6344     if self.op.beparams:
6345       instance.beparams = self.be_inst
6346       for key, val in self.op.beparams.iteritems():
6347         result.append(("be/%s" % key, val))
6348
6349     self.cfg.Update(instance)
6350
6351     return result
6352
6353
6354 class LUQueryExports(NoHooksLU):
6355   """Query the exports list
6356
6357   """
6358   _OP_REQP = ['nodes']
6359   REQ_BGL = False
6360
6361   def ExpandNames(self):
6362     self.needed_locks = {}
6363     self.share_locks[locking.LEVEL_NODE] = 1
6364     if not self.op.nodes:
6365       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6366     else:
6367       self.needed_locks[locking.LEVEL_NODE] = \
6368         _GetWantedNodes(self, self.op.nodes)
6369
6370   def CheckPrereq(self):
6371     """Check prerequisites.
6372
6373     """
6374     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6375
6376   def Exec(self, feedback_fn):
6377     """Compute the list of all the exported system images.
6378
6379     @rtype: dict
6380     @return: a dictionary with the structure node->(export-list)
6381         where export-list is a list of the instances exported on
6382         that node.
6383
6384     """
6385     rpcresult = self.rpc.call_export_list(self.nodes)
6386     result = {}
6387     for node in rpcresult:
6388       if rpcresult[node].RemoteFailMsg():
6389         result[node] = False
6390       else:
6391         result[node] = rpcresult[node].payload
6392
6393     return result
6394
6395
6396 class LUExportInstance(LogicalUnit):
6397   """Export an instance to an image in the cluster.
6398
6399   """
6400   HPATH = "instance-export"
6401   HTYPE = constants.HTYPE_INSTANCE
6402   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6403   REQ_BGL = False
6404
6405   def ExpandNames(self):
6406     self._ExpandAndLockInstance()
6407     # FIXME: lock only instance primary and destination node
6408     #
6409     # Sad but true, for now we have do lock all nodes, as we don't know where
6410     # the previous export might be, and and in this LU we search for it and
6411     # remove it from its current node. In the future we could fix this by:
6412     #  - making a tasklet to search (share-lock all), then create the new one,
6413     #    then one to remove, after
6414     #  - removing the removal operation altoghether
6415     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6416
6417   def DeclareLocks(self, level):
6418     """Last minute lock declaration."""
6419     # All nodes are locked anyway, so nothing to do here.
6420
6421   def BuildHooksEnv(self):
6422     """Build hooks env.
6423
6424     This will run on the master, primary node and target node.
6425
6426     """
6427     env = {
6428       "EXPORT_NODE": self.op.target_node,
6429       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6430       }
6431     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6432     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6433           self.op.target_node]
6434     return env, nl, nl
6435
6436   def CheckPrereq(self):
6437     """Check prerequisites.
6438
6439     This checks that the instance and node names are valid.
6440
6441     """
6442     instance_name = self.op.instance_name
6443     self.instance = self.cfg.GetInstanceInfo(instance_name)
6444     assert self.instance is not None, \
6445           "Cannot retrieve locked instance %s" % self.op.instance_name
6446     _CheckNodeOnline(self, self.instance.primary_node)
6447
6448     self.dst_node = self.cfg.GetNodeInfo(
6449       self.cfg.ExpandNodeName(self.op.target_node))
6450
6451     if self.dst_node is None:
6452       # This is wrong node name, not a non-locked node
6453       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6454     _CheckNodeOnline(self, self.dst_node.name)
6455     _CheckNodeNotDrained(self, self.dst_node.name)
6456
6457     # instance disk type verification
6458     for disk in self.instance.disks:
6459       if disk.dev_type == constants.LD_FILE:
6460         raise errors.OpPrereqError("Export not supported for instances with"
6461                                    " file-based disks")
6462
6463   def Exec(self, feedback_fn):
6464     """Export an instance to an image in the cluster.
6465
6466     """
6467     instance = self.instance
6468     dst_node = self.dst_node
6469     src_node = instance.primary_node
6470     if self.op.shutdown:
6471       # shutdown the instance, but not the disks
6472       result = self.rpc.call_instance_shutdown(src_node, instance)
6473       msg = result.RemoteFailMsg()
6474       if msg:
6475         raise errors.OpExecError("Could not shutdown instance %s on"
6476                                  " node %s: %s" %
6477                                  (instance.name, src_node, msg))
6478
6479     vgname = self.cfg.GetVGName()
6480
6481     snap_disks = []
6482
6483     # set the disks ID correctly since call_instance_start needs the
6484     # correct drbd minor to create the symlinks
6485     for disk in instance.disks:
6486       self.cfg.SetDiskID(disk, src_node)
6487
6488     try:
6489       for disk in instance.disks:
6490         # result.payload will be a snapshot of an lvm leaf of the one we passed
6491         result = self.rpc.call_blockdev_snapshot(src_node, disk)
6492         msg = result.RemoteFailMsg()
6493         if msg:
6494           self.LogWarning("Could not snapshot block device %s on node %s: %s",
6495                           disk.logical_id[1], src_node, msg)
6496           snap_disks.append(False)
6497         else:
6498           disk_id = (vgname, result.payload)
6499           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6500                                  logical_id=disk_id, physical_id=disk_id,
6501                                  iv_name=disk.iv_name)
6502           snap_disks.append(new_dev)
6503
6504     finally:
6505       if self.op.shutdown and instance.admin_up:
6506         result = self.rpc.call_instance_start(src_node, instance, None, None)
6507         msg = result.RemoteFailMsg()
6508         if msg:
6509           _ShutdownInstanceDisks(self, instance)
6510           raise errors.OpExecError("Could not start instance: %s" % msg)
6511
6512     # TODO: check for size
6513
6514     cluster_name = self.cfg.GetClusterName()
6515     for idx, dev in enumerate(snap_disks):
6516       if dev:
6517         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6518                                                instance, cluster_name, idx)
6519         msg = result.RemoteFailMsg()
6520         if msg:
6521           self.LogWarning("Could not export block device %s from node %s to"
6522                           " node %s: %s", dev.logical_id[1], src_node,
6523                           dst_node.name, msg)
6524         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6525         if msg:
6526           self.LogWarning("Could not remove snapshot block device %s from node"
6527                           " %s: %s", dev.logical_id[1], src_node, msg)
6528
6529     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6530     msg = result.RemoteFailMsg()
6531     if msg:
6532       self.LogWarning("Could not finalize export for instance %s"
6533                       " on node %s: %s", instance.name, dst_node.name, msg)
6534
6535     nodelist = self.cfg.GetNodeList()
6536     nodelist.remove(dst_node.name)
6537
6538     # on one-node clusters nodelist will be empty after the removal
6539     # if we proceed the backup would be removed because OpQueryExports
6540     # substitutes an empty list with the full cluster node list.
6541     iname = instance.name
6542     if nodelist:
6543       exportlist = self.rpc.call_export_list(nodelist)
6544       for node in exportlist:
6545         if exportlist[node].RemoteFailMsg():
6546           continue
6547         if iname in exportlist[node].payload:
6548           msg = self.rpc.call_export_remove(node, iname).RemoteFailMsg()
6549           if msg:
6550             self.LogWarning("Could not remove older export for instance %s"
6551                             " on node %s: %s", iname, node, msg)
6552
6553
6554 class LURemoveExport(NoHooksLU):
6555   """Remove exports related to the named instance.
6556
6557   """
6558   _OP_REQP = ["instance_name"]
6559   REQ_BGL = False
6560
6561   def ExpandNames(self):
6562     self.needed_locks = {}
6563     # We need all nodes to be locked in order for RemoveExport to work, but we
6564     # don't need to lock the instance itself, as nothing will happen to it (and
6565     # we can remove exports also for a removed instance)
6566     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6567
6568   def CheckPrereq(self):
6569     """Check prerequisites.
6570     """
6571     pass
6572
6573   def Exec(self, feedback_fn):
6574     """Remove any export.
6575
6576     """
6577     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6578     # If the instance was not found we'll try with the name that was passed in.
6579     # This will only work if it was an FQDN, though.
6580     fqdn_warn = False
6581     if not instance_name:
6582       fqdn_warn = True
6583       instance_name = self.op.instance_name
6584
6585     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6586     exportlist = self.rpc.call_export_list(locked_nodes)
6587     found = False
6588     for node in exportlist:
6589       msg = exportlist[node].RemoteFailMsg()
6590       if msg:
6591         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6592         continue
6593       if instance_name in exportlist[node].payload:
6594         found = True
6595         result = self.rpc.call_export_remove(node, instance_name)
6596         msg = result.RemoteFailMsg()
6597         if msg:
6598           logging.error("Could not remove export for instance %s"
6599                         " on node %s: %s", instance_name, node, msg)
6600
6601     if fqdn_warn and not found:
6602       feedback_fn("Export not found. If trying to remove an export belonging"
6603                   " to a deleted instance please use its Fully Qualified"
6604                   " Domain Name.")
6605
6606
6607 class TagsLU(NoHooksLU):
6608   """Generic tags LU.
6609
6610   This is an abstract class which is the parent of all the other tags LUs.
6611
6612   """
6613
6614   def ExpandNames(self):
6615     self.needed_locks = {}
6616     if self.op.kind == constants.TAG_NODE:
6617       name = self.cfg.ExpandNodeName(self.op.name)
6618       if name is None:
6619         raise errors.OpPrereqError("Invalid node name (%s)" %
6620                                    (self.op.name,))
6621       self.op.name = name
6622       self.needed_locks[locking.LEVEL_NODE] = name
6623     elif self.op.kind == constants.TAG_INSTANCE:
6624       name = self.cfg.ExpandInstanceName(self.op.name)
6625       if name is None:
6626         raise errors.OpPrereqError("Invalid instance name (%s)" %
6627                                    (self.op.name,))
6628       self.op.name = name
6629       self.needed_locks[locking.LEVEL_INSTANCE] = name
6630
6631   def CheckPrereq(self):
6632     """Check prerequisites.
6633
6634     """
6635     if self.op.kind == constants.TAG_CLUSTER:
6636       self.target = self.cfg.GetClusterInfo()
6637     elif self.op.kind == constants.TAG_NODE:
6638       self.target = self.cfg.GetNodeInfo(self.op.name)
6639     elif self.op.kind == constants.TAG_INSTANCE:
6640       self.target = self.cfg.GetInstanceInfo(self.op.name)
6641     else:
6642       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6643                                  str(self.op.kind))
6644
6645
6646 class LUGetTags(TagsLU):
6647   """Returns the tags of a given object.
6648
6649   """
6650   _OP_REQP = ["kind", "name"]
6651   REQ_BGL = False
6652
6653   def Exec(self, feedback_fn):
6654     """Returns the tag list.
6655
6656     """
6657     return list(self.target.GetTags())
6658
6659
6660 class LUSearchTags(NoHooksLU):
6661   """Searches the tags for a given pattern.
6662
6663   """
6664   _OP_REQP = ["pattern"]
6665   REQ_BGL = False
6666
6667   def ExpandNames(self):
6668     self.needed_locks = {}
6669
6670   def CheckPrereq(self):
6671     """Check prerequisites.
6672
6673     This checks the pattern passed for validity by compiling it.
6674
6675     """
6676     try:
6677       self.re = re.compile(self.op.pattern)
6678     except re.error, err:
6679       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6680                                  (self.op.pattern, err))
6681
6682   def Exec(self, feedback_fn):
6683     """Returns the tag list.
6684
6685     """
6686     cfg = self.cfg
6687     tgts = [("/cluster", cfg.GetClusterInfo())]
6688     ilist = cfg.GetAllInstancesInfo().values()
6689     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6690     nlist = cfg.GetAllNodesInfo().values()
6691     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6692     results = []
6693     for path, target in tgts:
6694       for tag in target.GetTags():
6695         if self.re.search(tag):
6696           results.append((path, tag))
6697     return results
6698
6699
6700 class LUAddTags(TagsLU):
6701   """Sets a tag on a given object.
6702
6703   """
6704   _OP_REQP = ["kind", "name", "tags"]
6705   REQ_BGL = False
6706
6707   def CheckPrereq(self):
6708     """Check prerequisites.
6709
6710     This checks the type and length of the tag name and value.
6711
6712     """
6713     TagsLU.CheckPrereq(self)
6714     for tag in self.op.tags:
6715       objects.TaggableObject.ValidateTag(tag)
6716
6717   def Exec(self, feedback_fn):
6718     """Sets the tag.
6719
6720     """
6721     try:
6722       for tag in self.op.tags:
6723         self.target.AddTag(tag)
6724     except errors.TagError, err:
6725       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6726     try:
6727       self.cfg.Update(self.target)
6728     except errors.ConfigurationError:
6729       raise errors.OpRetryError("There has been a modification to the"
6730                                 " config file and the operation has been"
6731                                 " aborted. Please retry.")
6732
6733
6734 class LUDelTags(TagsLU):
6735   """Delete a list of tags from a given object.
6736
6737   """
6738   _OP_REQP = ["kind", "name", "tags"]
6739   REQ_BGL = False
6740
6741   def CheckPrereq(self):
6742     """Check prerequisites.
6743
6744     This checks that we have the given tag.
6745
6746     """
6747     TagsLU.CheckPrereq(self)
6748     for tag in self.op.tags:
6749       objects.TaggableObject.ValidateTag(tag)
6750     del_tags = frozenset(self.op.tags)
6751     cur_tags = self.target.GetTags()
6752     if not del_tags <= cur_tags:
6753       diff_tags = del_tags - cur_tags
6754       diff_names = ["'%s'" % tag for tag in diff_tags]
6755       diff_names.sort()
6756       raise errors.OpPrereqError("Tag(s) %s not found" %
6757                                  (",".join(diff_names)))
6758
6759   def Exec(self, feedback_fn):
6760     """Remove the tag from the object.
6761
6762     """
6763     for tag in self.op.tags:
6764       self.target.RemoveTag(tag)
6765     try:
6766       self.cfg.Update(self.target)
6767     except errors.ConfigurationError:
6768       raise errors.OpRetryError("There has been a modification to the"
6769                                 " config file and the operation has been"
6770                                 " aborted. Please retry.")
6771
6772
6773 class LUTestDelay(NoHooksLU):
6774   """Sleep for a specified amount of time.
6775
6776   This LU sleeps on the master and/or nodes for a specified amount of
6777   time.
6778
6779   """
6780   _OP_REQP = ["duration", "on_master", "on_nodes"]
6781   REQ_BGL = False
6782
6783   def ExpandNames(self):
6784     """Expand names and set required locks.
6785
6786     This expands the node list, if any.
6787
6788     """
6789     self.needed_locks = {}
6790     if self.op.on_nodes:
6791       # _GetWantedNodes can be used here, but is not always appropriate to use
6792       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6793       # more information.
6794       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6795       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6796
6797   def CheckPrereq(self):
6798     """Check prerequisites.
6799
6800     """
6801
6802   def Exec(self, feedback_fn):
6803     """Do the actual sleep.
6804
6805     """
6806     if self.op.on_master:
6807       if not utils.TestDelay(self.op.duration):
6808         raise errors.OpExecError("Error during master delay test")
6809     if self.op.on_nodes:
6810       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6811       if not result:
6812         raise errors.OpExecError("Complete failure from rpc call")
6813       for node, node_result in result.items():
6814         node_result.Raise()
6815         if not node_result.data:
6816           raise errors.OpExecError("Failure during rpc call to node %s,"
6817                                    " result: %s" % (node, node_result.data))
6818
6819
6820 class IAllocator(object):
6821   """IAllocator framework.
6822
6823   An IAllocator instance has three sets of attributes:
6824     - cfg that is needed to query the cluster
6825     - input data (all members of the _KEYS class attribute are required)
6826     - four buffer attributes (in|out_data|text), that represent the
6827       input (to the external script) in text and data structure format,
6828       and the output from it, again in two formats
6829     - the result variables from the script (success, info, nodes) for
6830       easy usage
6831
6832   """
6833   _ALLO_KEYS = [
6834     "mem_size", "disks", "disk_template",
6835     "os", "tags", "nics", "vcpus", "hypervisor",
6836     ]
6837   _RELO_KEYS = [
6838     "relocate_from",
6839     ]
6840
6841   def __init__(self, lu, mode, name, **kwargs):
6842     self.lu = lu
6843     # init buffer variables
6844     self.in_text = self.out_text = self.in_data = self.out_data = None
6845     # init all input fields so that pylint is happy
6846     self.mode = mode
6847     self.name = name
6848     self.mem_size = self.disks = self.disk_template = None
6849     self.os = self.tags = self.nics = self.vcpus = None
6850     self.hypervisor = None
6851     self.relocate_from = None
6852     # computed fields
6853     self.required_nodes = None
6854     # init result fields
6855     self.success = self.info = self.nodes = None
6856     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6857       keyset = self._ALLO_KEYS
6858     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6859       keyset = self._RELO_KEYS
6860     else:
6861       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6862                                    " IAllocator" % self.mode)
6863     for key in kwargs:
6864       if key not in keyset:
6865         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6866                                      " IAllocator" % key)
6867       setattr(self, key, kwargs[key])
6868     for key in keyset:
6869       if key not in kwargs:
6870         raise errors.ProgrammerError("Missing input parameter '%s' to"
6871                                      " IAllocator" % key)
6872     self._BuildInputData()
6873
6874   def _ComputeClusterData(self):
6875     """Compute the generic allocator input data.
6876
6877     This is the data that is independent of the actual operation.
6878
6879     """
6880     cfg = self.lu.cfg
6881     cluster_info = cfg.GetClusterInfo()
6882     # cluster data
6883     data = {
6884       "version": constants.IALLOCATOR_VERSION,
6885       "cluster_name": cfg.GetClusterName(),
6886       "cluster_tags": list(cluster_info.GetTags()),
6887       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6888       # we don't have job IDs
6889       }
6890     iinfo = cfg.GetAllInstancesInfo().values()
6891     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6892
6893     # node data
6894     node_results = {}
6895     node_list = cfg.GetNodeList()
6896
6897     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6898       hypervisor_name = self.hypervisor
6899     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6900       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6901
6902     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6903                                            hypervisor_name)
6904     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6905                        cluster_info.enabled_hypervisors)
6906     for nname, nresult in node_data.items():
6907       # first fill in static (config-based) values
6908       ninfo = cfg.GetNodeInfo(nname)
6909       pnr = {
6910         "tags": list(ninfo.GetTags()),
6911         "primary_ip": ninfo.primary_ip,
6912         "secondary_ip": ninfo.secondary_ip,
6913         "offline": ninfo.offline,
6914         "drained": ninfo.drained,
6915         "master_candidate": ninfo.master_candidate,
6916         }
6917
6918       if not ninfo.offline:
6919         nresult.Raise()
6920         if not isinstance(nresult.data, dict):
6921           raise errors.OpExecError("Can't get data for node %s" % nname)
6922         msg = node_iinfo[nname].RemoteFailMsg()
6923         if msg:
6924           raise errors.OpExecError("Can't get node instance info"
6925                                    " from node %s: %s" % (nname, msg))
6926         remote_info = nresult.data
6927         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6928                      'vg_size', 'vg_free', 'cpu_total']:
6929           if attr not in remote_info:
6930             raise errors.OpExecError("Node '%s' didn't return attribute"
6931                                      " '%s'" % (nname, attr))
6932           try:
6933             remote_info[attr] = int(remote_info[attr])
6934           except ValueError, err:
6935             raise errors.OpExecError("Node '%s' returned invalid value"
6936                                      " for '%s': %s" % (nname, attr, err))
6937         # compute memory used by primary instances
6938         i_p_mem = i_p_up_mem = 0
6939         for iinfo, beinfo in i_list:
6940           if iinfo.primary_node == nname:
6941             i_p_mem += beinfo[constants.BE_MEMORY]
6942             if iinfo.name not in node_iinfo[nname].payload:
6943               i_used_mem = 0
6944             else:
6945               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6946             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6947             remote_info['memory_free'] -= max(0, i_mem_diff)
6948
6949             if iinfo.admin_up:
6950               i_p_up_mem += beinfo[constants.BE_MEMORY]
6951
6952         # compute memory used by instances
6953         pnr_dyn = {
6954           "total_memory": remote_info['memory_total'],
6955           "reserved_memory": remote_info['memory_dom0'],
6956           "free_memory": remote_info['memory_free'],
6957           "total_disk": remote_info['vg_size'],
6958           "free_disk": remote_info['vg_free'],
6959           "total_cpus": remote_info['cpu_total'],
6960           "i_pri_memory": i_p_mem,
6961           "i_pri_up_memory": i_p_up_mem,
6962           }
6963         pnr.update(pnr_dyn)
6964
6965       node_results[nname] = pnr
6966     data["nodes"] = node_results
6967
6968     # instance data
6969     instance_data = {}
6970     for iinfo, beinfo in i_list:
6971       nic_data = []
6972       for nic in iinfo.nics:
6973         filled_params = objects.FillDict(
6974             cluster_info.nicparams[constants.PP_DEFAULT],
6975             nic.nicparams)
6976         nic_dict = {"mac": nic.mac,
6977                     "ip": nic.ip,
6978                     "mode": filled_params[constants.NIC_MODE],
6979                     "link": filled_params[constants.NIC_LINK],
6980                    }
6981         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
6982           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
6983         nic_data.append(nic_dict)
6984       pir = {
6985         "tags": list(iinfo.GetTags()),
6986         "admin_up": iinfo.admin_up,
6987         "vcpus": beinfo[constants.BE_VCPUS],
6988         "memory": beinfo[constants.BE_MEMORY],
6989         "os": iinfo.os,
6990         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6991         "nics": nic_data,
6992         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6993         "disk_template": iinfo.disk_template,
6994         "hypervisor": iinfo.hypervisor,
6995         }
6996       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6997                                                  pir["disks"])
6998       instance_data[iinfo.name] = pir
6999
7000     data["instances"] = instance_data
7001
7002     self.in_data = data
7003
7004   def _AddNewInstance(self):
7005     """Add new instance data to allocator structure.
7006
7007     This in combination with _AllocatorGetClusterData will create the
7008     correct structure needed as input for the allocator.
7009
7010     The checks for the completeness of the opcode must have already been
7011     done.
7012
7013     """
7014     data = self.in_data
7015
7016     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7017
7018     if self.disk_template in constants.DTS_NET_MIRROR:
7019       self.required_nodes = 2
7020     else:
7021       self.required_nodes = 1
7022     request = {
7023       "type": "allocate",
7024       "name": self.name,
7025       "disk_template": self.disk_template,
7026       "tags": self.tags,
7027       "os": self.os,
7028       "vcpus": self.vcpus,
7029       "memory": self.mem_size,
7030       "disks": self.disks,
7031       "disk_space_total": disk_space,
7032       "nics": self.nics,
7033       "required_nodes": self.required_nodes,
7034       }
7035     data["request"] = request
7036
7037   def _AddRelocateInstance(self):
7038     """Add relocate instance data to allocator structure.
7039
7040     This in combination with _IAllocatorGetClusterData will create the
7041     correct structure needed as input for the allocator.
7042
7043     The checks for the completeness of the opcode must have already been
7044     done.
7045
7046     """
7047     instance = self.lu.cfg.GetInstanceInfo(self.name)
7048     if instance is None:
7049       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7050                                    " IAllocator" % self.name)
7051
7052     if instance.disk_template not in constants.DTS_NET_MIRROR:
7053       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7054
7055     if len(instance.secondary_nodes) != 1:
7056       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7057
7058     self.required_nodes = 1
7059     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7060     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7061
7062     request = {
7063       "type": "relocate",
7064       "name": self.name,
7065       "disk_space_total": disk_space,
7066       "required_nodes": self.required_nodes,
7067       "relocate_from": self.relocate_from,
7068       }
7069     self.in_data["request"] = request
7070
7071   def _BuildInputData(self):
7072     """Build input data structures.
7073
7074     """
7075     self._ComputeClusterData()
7076
7077     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7078       self._AddNewInstance()
7079     else:
7080       self._AddRelocateInstance()
7081
7082     self.in_text = serializer.Dump(self.in_data)
7083
7084   def Run(self, name, validate=True, call_fn=None):
7085     """Run an instance allocator and return the results.
7086
7087     """
7088     if call_fn is None:
7089       call_fn = self.lu.rpc.call_iallocator_runner
7090     data = self.in_text
7091
7092     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7093     result.Raise()
7094
7095     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7096       raise errors.OpExecError("Invalid result from master iallocator runner")
7097
7098     rcode, stdout, stderr, fail = result.data
7099
7100     if rcode == constants.IARUN_NOTFOUND:
7101       raise errors.OpExecError("Can't find allocator '%s'" % name)
7102     elif rcode == constants.IARUN_FAILURE:
7103       raise errors.OpExecError("Instance allocator call failed: %s,"
7104                                " output: %s" % (fail, stdout+stderr))
7105     self.out_text = stdout
7106     if validate:
7107       self._ValidateResult()
7108
7109   def _ValidateResult(self):
7110     """Process the allocator results.
7111
7112     This will process and if successful save the result in
7113     self.out_data and the other parameters.
7114
7115     """
7116     try:
7117       rdict = serializer.Load(self.out_text)
7118     except Exception, err:
7119       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7120
7121     if not isinstance(rdict, dict):
7122       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7123
7124     for key in "success", "info", "nodes":
7125       if key not in rdict:
7126         raise errors.OpExecError("Can't parse iallocator results:"
7127                                  " missing key '%s'" % key)
7128       setattr(self, key, rdict[key])
7129
7130     if not isinstance(rdict["nodes"], list):
7131       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7132                                " is not a list")
7133     self.out_data = rdict
7134
7135
7136 class LUTestAllocator(NoHooksLU):
7137   """Run allocator tests.
7138
7139   This LU runs the allocator tests
7140
7141   """
7142   _OP_REQP = ["direction", "mode", "name"]
7143
7144   def CheckPrereq(self):
7145     """Check prerequisites.
7146
7147     This checks the opcode parameters depending on the director and mode test.
7148
7149     """
7150     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7151       for attr in ["name", "mem_size", "disks", "disk_template",
7152                    "os", "tags", "nics", "vcpus"]:
7153         if not hasattr(self.op, attr):
7154           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7155                                      attr)
7156       iname = self.cfg.ExpandInstanceName(self.op.name)
7157       if iname is not None:
7158         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7159                                    iname)
7160       if not isinstance(self.op.nics, list):
7161         raise errors.OpPrereqError("Invalid parameter 'nics'")
7162       for row in self.op.nics:
7163         if (not isinstance(row, dict) or
7164             "mac" not in row or
7165             "ip" not in row or
7166             "bridge" not in row):
7167           raise errors.OpPrereqError("Invalid contents of the"
7168                                      " 'nics' parameter")
7169       if not isinstance(self.op.disks, list):
7170         raise errors.OpPrereqError("Invalid parameter 'disks'")
7171       for row in self.op.disks:
7172         if (not isinstance(row, dict) or
7173             "size" not in row or
7174             not isinstance(row["size"], int) or
7175             "mode" not in row or
7176             row["mode"] not in ['r', 'w']):
7177           raise errors.OpPrereqError("Invalid contents of the"
7178                                      " 'disks' parameter")
7179       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7180         self.op.hypervisor = self.cfg.GetHypervisorType()
7181     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7182       if not hasattr(self.op, "name"):
7183         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7184       fname = self.cfg.ExpandInstanceName(self.op.name)
7185       if fname is None:
7186         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7187                                    self.op.name)
7188       self.op.name = fname
7189       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7190     else:
7191       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7192                                  self.op.mode)
7193
7194     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7195       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7196         raise errors.OpPrereqError("Missing allocator name")
7197     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7198       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7199                                  self.op.direction)
7200
7201   def Exec(self, feedback_fn):
7202     """Run the allocator test.
7203
7204     """
7205     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7206       ial = IAllocator(self,
7207                        mode=self.op.mode,
7208                        name=self.op.name,
7209                        mem_size=self.op.mem_size,
7210                        disks=self.op.disks,
7211                        disk_template=self.op.disk_template,
7212                        os=self.op.os,
7213                        tags=self.op.tags,
7214                        nics=self.op.nics,
7215                        vcpus=self.op.vcpus,
7216                        hypervisor=self.op.hypervisor,
7217                        )
7218     else:
7219       ial = IAllocator(self,
7220                        mode=self.op.mode,
7221                        name=self.op.name,
7222                        relocate_from=list(self.relocate_from),
7223                        )
7224
7225     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7226       result = ial.in_text
7227     else:
7228       ial.Run(self.op.allocator, validate=False)
7229       result = ial.out_text
7230     return result