Merge branch 'next' into branch-2.1
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks,
457                           bep, hvp, hypervisor):
458   """Builds instance related env variables for hooks
459
460   This builds the hook environment from individual variables.
461
462   @type name: string
463   @param name: the name of the instance
464   @type primary_node: string
465   @param primary_node: the name of the instance's primary node
466   @type secondary_nodes: list
467   @param secondary_nodes: list of secondary nodes as strings
468   @type os_type: string
469   @param os_type: the name of the instance's OS
470   @type status: boolean
471   @param status: the should_run status of the instance
472   @type memory: string
473   @param memory: the memory size of the instance
474   @type vcpus: string
475   @param vcpus: the count of VCPUs the instance has
476   @type nics: list
477   @param nics: list of tuples (ip, bridge, mac) representing
478       the NICs the instance  has
479   @type disk_template: string
480   @param disk_template: the distk template of the instance
481   @type disks: list
482   @param disks: the list of (size, mode) pairs
483   @type bep: dict
484   @param bep: the backend parameters for the instance
485   @type hvp: dict
486   @param hvp: the hypervisor parameters for the instance
487   @type hypervisor: string
488   @param hypervisor: the hypervisor for the instance
489   @rtype: dict
490   @return: the hook environment for this instance
491
492   """
493   if status:
494     str_status = "up"
495   else:
496     str_status = "down"
497   env = {
498     "OP_TARGET": name,
499     "INSTANCE_NAME": name,
500     "INSTANCE_PRIMARY": primary_node,
501     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
502     "INSTANCE_OS_TYPE": os_type,
503     "INSTANCE_STATUS": str_status,
504     "INSTANCE_MEMORY": memory,
505     "INSTANCE_VCPUS": vcpus,
506     "INSTANCE_DISK_TEMPLATE": disk_template,
507     "INSTANCE_HYPERVISOR": hypervisor,
508   }
509
510   if nics:
511     nic_count = len(nics)
512     for idx, (ip, mac, mode, link) in enumerate(nics):
513       if ip is None:
514         ip = ""
515       env["INSTANCE_NIC%d_IP" % idx] = ip
516       env["INSTANCE_NIC%d_MAC" % idx] = mac
517       env["INSTANCE_NIC%d_MODE" % idx] = mode
518       env["INSTANCE_NIC%d_LINK" % idx] = link
519       if mode == constants.NIC_MODE_BRIDGED:
520         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
521   else:
522     nic_count = 0
523
524   env["INSTANCE_NIC_COUNT"] = nic_count
525
526   if disks:
527     disk_count = len(disks)
528     for idx, (size, mode) in enumerate(disks):
529       env["INSTANCE_DISK%d_SIZE" % idx] = size
530       env["INSTANCE_DISK%d_MODE" % idx] = mode
531   else:
532     disk_count = 0
533
534   env["INSTANCE_DISK_COUNT"] = disk_count
535
536   for source, kind in [(bep, "BE"), (hvp, "HV")]:
537     for key, value in source.items():
538       env["INSTANCE_%s_%s" % (kind, key)] = value
539
540   return env
541
542 def _PreBuildNICHooksList(lu, nics):
543   """Build a list of nic information tuples.
544
545   This list is suitable to be passed to _BuildInstanceHookEnv.
546
547   @type lu:  L{LogicalUnit}
548   @param lu: the logical unit on whose behalf we execute
549   @type nics: list of L{objects.NIC}
550   @param nics: list of nics to convert to hooks tuples
551
552   """
553   hooks_nics = []
554   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
555   for nic in nics:
556     ip = nic.ip
557     mac = nic.mac
558     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
559     mode = filled_params[constants.NIC_MODE]
560     link = filled_params[constants.NIC_LINK]
561     hooks_nics.append((ip, mac, mode, link))
562   return hooks_nics
563
564 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
565   """Builds instance related env variables for hooks from an object.
566
567   @type lu: L{LogicalUnit}
568   @param lu: the logical unit on whose behalf we execute
569   @type instance: L{objects.Instance}
570   @param instance: the instance for which we should build the
571       environment
572   @type override: dict
573   @param override: dictionary with key/values that will override
574       our values
575   @rtype: dict
576   @return: the hook environment dictionary
577
578   """
579   cluster = lu.cfg.GetClusterInfo()
580   bep = cluster.FillBE(instance)
581   hvp = cluster.FillHV(instance)
582   args = {
583     'name': instance.name,
584     'primary_node': instance.primary_node,
585     'secondary_nodes': instance.secondary_nodes,
586     'os_type': instance.os,
587     'status': instance.admin_up,
588     'memory': bep[constants.BE_MEMORY],
589     'vcpus': bep[constants.BE_VCPUS],
590     'nics': _PreBuildNICHooksList(lu, instance.nics),
591     'disk_template': instance.disk_template,
592     'disks': [(disk.size, disk.mode) for disk in instance.disks],
593     'bep': bep,
594     'hvp': hvp,
595     'hypervisor': instance.hypervisor,
596   }
597   if override:
598     args.update(override)
599   return _BuildInstanceHookEnv(**args)
600
601
602 def _AdjustCandidatePool(lu):
603   """Adjust the candidate pool after node operations.
604
605   """
606   mod_list = lu.cfg.MaintainCandidatePool()
607   if mod_list:
608     lu.LogInfo("Promoted nodes to master candidate role: %s",
609                ", ".join(node.name for node in mod_list))
610     for name in mod_list:
611       lu.context.ReaddNode(name)
612   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
613   if mc_now > mc_max:
614     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
615                (mc_now, mc_max))
616
617
618 def _CheckNicsBridgesExist(lu, target_nics, target_node,
619                                profile=constants.PP_DEFAULT):
620   """Check that the brigdes needed by a list of nics exist.
621
622   """
623   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
624   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
625                 for nic in target_nics]
626   brlist = [params[constants.NIC_LINK] for params in paramslist
627             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
628   if brlist:
629     result = lu.rpc.call_bridges_exist(target_node, brlist)
630     result.Raise("Error checking bridges on destination node '%s'" %
631                  target_node, prereq=True)
632
633
634 def _CheckInstanceBridgesExist(lu, instance, node=None):
635   """Check that the brigdes needed by an instance exist.
636
637   """
638   if node is None:
639     node=instance.primary_node
640   _CheckNicsBridgesExist(lu, instance.nics, node)
641
642
643 class LUDestroyCluster(NoHooksLU):
644   """Logical unit for destroying the cluster.
645
646   """
647   _OP_REQP = []
648
649   def CheckPrereq(self):
650     """Check prerequisites.
651
652     This checks whether the cluster is empty.
653
654     Any errors are signalled by raising errors.OpPrereqError.
655
656     """
657     master = self.cfg.GetMasterNode()
658
659     nodelist = self.cfg.GetNodeList()
660     if len(nodelist) != 1 or nodelist[0] != master:
661       raise errors.OpPrereqError("There are still %d node(s) in"
662                                  " this cluster." % (len(nodelist) - 1))
663     instancelist = self.cfg.GetInstanceList()
664     if instancelist:
665       raise errors.OpPrereqError("There are still %d instance(s) in"
666                                  " this cluster." % len(instancelist))
667
668   def Exec(self, feedback_fn):
669     """Destroys the cluster.
670
671     """
672     master = self.cfg.GetMasterNode()
673     result = self.rpc.call_node_stop_master(master, False)
674     result.Raise("Could not disable the master role")
675     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
676     utils.CreateBackup(priv_key)
677     utils.CreateBackup(pub_key)
678     return master
679
680
681 class LUVerifyCluster(LogicalUnit):
682   """Verifies the cluster status.
683
684   """
685   HPATH = "cluster-verify"
686   HTYPE = constants.HTYPE_CLUSTER
687   _OP_REQP = ["skip_checks"]
688   REQ_BGL = False
689
690   def ExpandNames(self):
691     self.needed_locks = {
692       locking.LEVEL_NODE: locking.ALL_SET,
693       locking.LEVEL_INSTANCE: locking.ALL_SET,
694     }
695     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
696
697   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
698                   node_result, feedback_fn, master_files,
699                   drbd_map, vg_name):
700     """Run multiple tests against a node.
701
702     Test list:
703
704       - compares ganeti version
705       - checks vg existance and size > 20G
706       - checks config file checksum
707       - checks ssh to other nodes
708
709     @type nodeinfo: L{objects.Node}
710     @param nodeinfo: the node to check
711     @param file_list: required list of files
712     @param local_cksum: dictionary of local files and their checksums
713     @param node_result: the results from the node
714     @param feedback_fn: function used to accumulate results
715     @param master_files: list of files that only masters should have
716     @param drbd_map: the useddrbd minors for this node, in
717         form of minor: (instance, must_exist) which correspond to instances
718         and their running status
719     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
720
721     """
722     node = nodeinfo.name
723
724     # main result, node_result should be a non-empty dict
725     if not node_result or not isinstance(node_result, dict):
726       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
727       return True
728
729     # compares ganeti version
730     local_version = constants.PROTOCOL_VERSION
731     remote_version = node_result.get('version', None)
732     if not (remote_version and isinstance(remote_version, (list, tuple)) and
733             len(remote_version) == 2):
734       feedback_fn("  - ERROR: connection to %s failed" % (node))
735       return True
736
737     if local_version != remote_version[0]:
738       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
739                   " node %s %s" % (local_version, node, remote_version[0]))
740       return True
741
742     # node seems compatible, we can actually try to look into its results
743
744     bad = False
745
746     # full package version
747     if constants.RELEASE_VERSION != remote_version[1]:
748       feedback_fn("  - WARNING: software version mismatch: master %s,"
749                   " node %s %s" %
750                   (constants.RELEASE_VERSION, node, remote_version[1]))
751
752     # checks vg existence and size > 20G
753     if vg_name is not None:
754       vglist = node_result.get(constants.NV_VGLIST, None)
755       if not vglist:
756         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
757                         (node,))
758         bad = True
759       else:
760         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
761                                               constants.MIN_VG_SIZE)
762         if vgstatus:
763           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
764           bad = True
765
766     # checks config file checksum
767
768     remote_cksum = node_result.get(constants.NV_FILELIST, None)
769     if not isinstance(remote_cksum, dict):
770       bad = True
771       feedback_fn("  - ERROR: node hasn't returned file checksum data")
772     else:
773       for file_name in file_list:
774         node_is_mc = nodeinfo.master_candidate
775         must_have_file = file_name not in master_files
776         if file_name not in remote_cksum:
777           if node_is_mc or must_have_file:
778             bad = True
779             feedback_fn("  - ERROR: file '%s' missing" % file_name)
780         elif remote_cksum[file_name] != local_cksum[file_name]:
781           if node_is_mc or must_have_file:
782             bad = True
783             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
784           else:
785             # not candidate and this is not a must-have file
786             bad = True
787             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
788                         " '%s'" % file_name)
789         else:
790           # all good, except non-master/non-must have combination
791           if not node_is_mc and not must_have_file:
792             feedback_fn("  - ERROR: file '%s' should not exist on non master"
793                         " candidates" % file_name)
794
795     # checks ssh to any
796
797     if constants.NV_NODELIST not in node_result:
798       bad = True
799       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
800     else:
801       if node_result[constants.NV_NODELIST]:
802         bad = True
803         for node in node_result[constants.NV_NODELIST]:
804           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
805                           (node, node_result[constants.NV_NODELIST][node]))
806
807     if constants.NV_NODENETTEST not in node_result:
808       bad = True
809       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
810     else:
811       if node_result[constants.NV_NODENETTEST]:
812         bad = True
813         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
814         for node in nlist:
815           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
816                           (node, node_result[constants.NV_NODENETTEST][node]))
817
818     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
819     if isinstance(hyp_result, dict):
820       for hv_name, hv_result in hyp_result.iteritems():
821         if hv_result is not None:
822           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
823                       (hv_name, hv_result))
824
825     # check used drbd list
826     if vg_name is not None:
827       used_minors = node_result.get(constants.NV_DRBDLIST, [])
828       if not isinstance(used_minors, (tuple, list)):
829         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
830                     str(used_minors))
831       else:
832         for minor, (iname, must_exist) in drbd_map.items():
833           if minor not in used_minors and must_exist:
834             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
835                         " not active" % (minor, iname))
836             bad = True
837         for minor in used_minors:
838           if minor not in drbd_map:
839             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
840                         minor)
841             bad = True
842
843     return bad
844
845   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
846                       node_instance, feedback_fn, n_offline):
847     """Verify an instance.
848
849     This function checks to see if the required block devices are
850     available on the instance's node.
851
852     """
853     bad = False
854
855     node_current = instanceconfig.primary_node
856
857     node_vol_should = {}
858     instanceconfig.MapLVsByNode(node_vol_should)
859
860     for node in node_vol_should:
861       if node in n_offline:
862         # ignore missing volumes on offline nodes
863         continue
864       for volume in node_vol_should[node]:
865         if node not in node_vol_is or volume not in node_vol_is[node]:
866           feedback_fn("  - ERROR: volume %s missing on node %s" %
867                           (volume, node))
868           bad = True
869
870     if instanceconfig.admin_up:
871       if ((node_current not in node_instance or
872           not instance in node_instance[node_current]) and
873           node_current not in n_offline):
874         feedback_fn("  - ERROR: instance %s not running on node %s" %
875                         (instance, node_current))
876         bad = True
877
878     for node in node_instance:
879       if (not node == node_current):
880         if instance in node_instance[node]:
881           feedback_fn("  - ERROR: instance %s should not run on node %s" %
882                           (instance, node))
883           bad = True
884
885     return bad
886
887   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
888     """Verify if there are any unknown volumes in the cluster.
889
890     The .os, .swap and backup volumes are ignored. All other volumes are
891     reported as unknown.
892
893     """
894     bad = False
895
896     for node in node_vol_is:
897       for volume in node_vol_is[node]:
898         if node not in node_vol_should or volume not in node_vol_should[node]:
899           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
900                       (volume, node))
901           bad = True
902     return bad
903
904   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
905     """Verify the list of running instances.
906
907     This checks what instances are running but unknown to the cluster.
908
909     """
910     bad = False
911     for node in node_instance:
912       for runninginstance in node_instance[node]:
913         if runninginstance not in instancelist:
914           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
915                           (runninginstance, node))
916           bad = True
917     return bad
918
919   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
920     """Verify N+1 Memory Resilience.
921
922     Check that if one single node dies we can still start all the instances it
923     was primary for.
924
925     """
926     bad = False
927
928     for node, nodeinfo in node_info.iteritems():
929       # This code checks that every node which is now listed as secondary has
930       # enough memory to host all instances it is supposed to should a single
931       # other node in the cluster fail.
932       # FIXME: not ready for failover to an arbitrary node
933       # FIXME: does not support file-backed instances
934       # WARNING: we currently take into account down instances as well as up
935       # ones, considering that even if they're down someone might want to start
936       # them even in the event of a node failure.
937       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
938         needed_mem = 0
939         for instance in instances:
940           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
941           if bep[constants.BE_AUTO_BALANCE]:
942             needed_mem += bep[constants.BE_MEMORY]
943         if nodeinfo['mfree'] < needed_mem:
944           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
945                       " failovers should node %s fail" % (node, prinode))
946           bad = True
947     return bad
948
949   def CheckPrereq(self):
950     """Check prerequisites.
951
952     Transform the list of checks we're going to skip into a set and check that
953     all its members are valid.
954
955     """
956     self.skip_set = frozenset(self.op.skip_checks)
957     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
958       raise errors.OpPrereqError("Invalid checks to be skipped specified")
959
960   def BuildHooksEnv(self):
961     """Build hooks env.
962
963     Cluster-Verify hooks just rone in the post phase and their failure makes
964     the output be logged in the verify output and the verification to fail.
965
966     """
967     all_nodes = self.cfg.GetNodeList()
968     env = {
969       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
970       }
971     for node in self.cfg.GetAllNodesInfo().values():
972       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
973
974     return env, [], all_nodes
975
976   def Exec(self, feedback_fn):
977     """Verify integrity of cluster, performing various test on nodes.
978
979     """
980     bad = False
981     feedback_fn("* Verifying global settings")
982     for msg in self.cfg.VerifyConfig():
983       feedback_fn("  - ERROR: %s" % msg)
984
985     vg_name = self.cfg.GetVGName()
986     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
987     nodelist = utils.NiceSort(self.cfg.GetNodeList())
988     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
989     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
990     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
991                         for iname in instancelist)
992     i_non_redundant = [] # Non redundant instances
993     i_non_a_balanced = [] # Non auto-balanced instances
994     n_offline = [] # List of offline nodes
995     n_drained = [] # List of nodes being drained
996     node_volume = {}
997     node_instance = {}
998     node_info = {}
999     instance_cfg = {}
1000
1001     # FIXME: verify OS list
1002     # do local checksums
1003     master_files = [constants.CLUSTER_CONF_FILE]
1004
1005     file_names = ssconf.SimpleStore().GetFileList()
1006     file_names.append(constants.SSL_CERT_FILE)
1007     file_names.append(constants.RAPI_CERT_FILE)
1008     file_names.extend(master_files)
1009
1010     local_checksums = utils.FingerprintFiles(file_names)
1011
1012     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1013     node_verify_param = {
1014       constants.NV_FILELIST: file_names,
1015       constants.NV_NODELIST: [node.name for node in nodeinfo
1016                               if not node.offline],
1017       constants.NV_HYPERVISOR: hypervisors,
1018       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1019                                   node.secondary_ip) for node in nodeinfo
1020                                  if not node.offline],
1021       constants.NV_INSTANCELIST: hypervisors,
1022       constants.NV_VERSION: None,
1023       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1024       }
1025     if vg_name is not None:
1026       node_verify_param[constants.NV_VGLIST] = None
1027       node_verify_param[constants.NV_LVLIST] = vg_name
1028       node_verify_param[constants.NV_DRBDLIST] = None
1029     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1030                                            self.cfg.GetClusterName())
1031
1032     cluster = self.cfg.GetClusterInfo()
1033     master_node = self.cfg.GetMasterNode()
1034     all_drbd_map = self.cfg.ComputeDRBDMap()
1035
1036     for node_i in nodeinfo:
1037       node = node_i.name
1038
1039       if node_i.offline:
1040         feedback_fn("* Skipping offline node %s" % (node,))
1041         n_offline.append(node)
1042         continue
1043
1044       if node == master_node:
1045         ntype = "master"
1046       elif node_i.master_candidate:
1047         ntype = "master candidate"
1048       elif node_i.drained:
1049         ntype = "drained"
1050         n_drained.append(node)
1051       else:
1052         ntype = "regular"
1053       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1054
1055       msg = all_nvinfo[node].fail_msg
1056       if msg:
1057         feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1058         bad = True
1059         continue
1060
1061       nresult = all_nvinfo[node].payload
1062       node_drbd = {}
1063       for minor, instance in all_drbd_map[node].items():
1064         if instance not in instanceinfo:
1065           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1066                       instance)
1067           # ghost instance should not be running, but otherwise we
1068           # don't give double warnings (both ghost instance and
1069           # unallocated minor in use)
1070           node_drbd[minor] = (instance, False)
1071         else:
1072           instance = instanceinfo[instance]
1073           node_drbd[minor] = (instance.name, instance.admin_up)
1074       result = self._VerifyNode(node_i, file_names, local_checksums,
1075                                 nresult, feedback_fn, master_files,
1076                                 node_drbd, vg_name)
1077       bad = bad or result
1078
1079       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1080       if vg_name is None:
1081         node_volume[node] = {}
1082       elif isinstance(lvdata, basestring):
1083         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1084                     (node, utils.SafeEncode(lvdata)))
1085         bad = True
1086         node_volume[node] = {}
1087       elif not isinstance(lvdata, dict):
1088         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1089         bad = True
1090         continue
1091       else:
1092         node_volume[node] = lvdata
1093
1094       # node_instance
1095       idata = nresult.get(constants.NV_INSTANCELIST, None)
1096       if not isinstance(idata, list):
1097         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1098                     (node,))
1099         bad = True
1100         continue
1101
1102       node_instance[node] = idata
1103
1104       # node_info
1105       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1106       if not isinstance(nodeinfo, dict):
1107         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1108         bad = True
1109         continue
1110
1111       try:
1112         node_info[node] = {
1113           "mfree": int(nodeinfo['memory_free']),
1114           "pinst": [],
1115           "sinst": [],
1116           # dictionary holding all instances this node is secondary for,
1117           # grouped by their primary node. Each key is a cluster node, and each
1118           # value is a list of instances which have the key as primary and the
1119           # current node as secondary.  this is handy to calculate N+1 memory
1120           # availability if you can only failover from a primary to its
1121           # secondary.
1122           "sinst-by-pnode": {},
1123         }
1124         # FIXME: devise a free space model for file based instances as well
1125         if vg_name is not None:
1126           if (constants.NV_VGLIST not in nresult or
1127               vg_name not in nresult[constants.NV_VGLIST]):
1128             feedback_fn("  - ERROR: node %s didn't return data for the"
1129                         " volume group '%s' - it is either missing or broken" %
1130                         (node, vg_name))
1131             bad = True
1132             continue
1133           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1134       except (ValueError, KeyError):
1135         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1136                     " from node %s" % (node,))
1137         bad = True
1138         continue
1139
1140     node_vol_should = {}
1141
1142     for instance in instancelist:
1143       feedback_fn("* Verifying instance %s" % instance)
1144       inst_config = instanceinfo[instance]
1145       result =  self._VerifyInstance(instance, inst_config, node_volume,
1146                                      node_instance, feedback_fn, n_offline)
1147       bad = bad or result
1148       inst_nodes_offline = []
1149
1150       inst_config.MapLVsByNode(node_vol_should)
1151
1152       instance_cfg[instance] = inst_config
1153
1154       pnode = inst_config.primary_node
1155       if pnode in node_info:
1156         node_info[pnode]['pinst'].append(instance)
1157       elif pnode not in n_offline:
1158         feedback_fn("  - ERROR: instance %s, connection to primary node"
1159                     " %s failed" % (instance, pnode))
1160         bad = True
1161
1162       if pnode in n_offline:
1163         inst_nodes_offline.append(pnode)
1164
1165       # If the instance is non-redundant we cannot survive losing its primary
1166       # node, so we are not N+1 compliant. On the other hand we have no disk
1167       # templates with more than one secondary so that situation is not well
1168       # supported either.
1169       # FIXME: does not support file-backed instances
1170       if len(inst_config.secondary_nodes) == 0:
1171         i_non_redundant.append(instance)
1172       elif len(inst_config.secondary_nodes) > 1:
1173         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1174                     % instance)
1175
1176       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1177         i_non_a_balanced.append(instance)
1178
1179       for snode in inst_config.secondary_nodes:
1180         if snode in node_info:
1181           node_info[snode]['sinst'].append(instance)
1182           if pnode not in node_info[snode]['sinst-by-pnode']:
1183             node_info[snode]['sinst-by-pnode'][pnode] = []
1184           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1185         elif snode not in n_offline:
1186           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1187                       " %s failed" % (instance, snode))
1188           bad = True
1189         if snode in n_offline:
1190           inst_nodes_offline.append(snode)
1191
1192       if inst_nodes_offline:
1193         # warn that the instance lives on offline nodes, and set bad=True
1194         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1195                     ", ".join(inst_nodes_offline))
1196         bad = True
1197
1198     feedback_fn("* Verifying orphan volumes")
1199     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1200                                        feedback_fn)
1201     bad = bad or result
1202
1203     feedback_fn("* Verifying remaining instances")
1204     result = self._VerifyOrphanInstances(instancelist, node_instance,
1205                                          feedback_fn)
1206     bad = bad or result
1207
1208     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1209       feedback_fn("* Verifying N+1 Memory redundancy")
1210       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1211       bad = bad or result
1212
1213     feedback_fn("* Other Notes")
1214     if i_non_redundant:
1215       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1216                   % len(i_non_redundant))
1217
1218     if i_non_a_balanced:
1219       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1220                   % len(i_non_a_balanced))
1221
1222     if n_offline:
1223       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1224
1225     if n_drained:
1226       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1227
1228     return not bad
1229
1230   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1231     """Analize the post-hooks' result
1232
1233     This method analyses the hook result, handles it, and sends some
1234     nicely-formatted feedback back to the user.
1235
1236     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1237         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1238     @param hooks_results: the results of the multi-node hooks rpc call
1239     @param feedback_fn: function used send feedback back to the caller
1240     @param lu_result: previous Exec result
1241     @return: the new Exec result, based on the previous result
1242         and hook results
1243
1244     """
1245     # We only really run POST phase hooks, and are only interested in
1246     # their results
1247     if phase == constants.HOOKS_PHASE_POST:
1248       # Used to change hooks' output to proper indentation
1249       indent_re = re.compile('^', re.M)
1250       feedback_fn("* Hooks Results")
1251       if not hooks_results:
1252         feedback_fn("  - ERROR: general communication failure")
1253         lu_result = 1
1254       else:
1255         for node_name in hooks_results:
1256           show_node_header = True
1257           res = hooks_results[node_name]
1258           msg = res.fail_msg
1259           if msg:
1260             if res.offline:
1261               # no need to warn or set fail return value
1262               continue
1263             feedback_fn("    Communication failure in hooks execution: %s" %
1264                         msg)
1265             lu_result = 1
1266             continue
1267           for script, hkr, output in res.payload:
1268             if hkr == constants.HKR_FAIL:
1269               # The node header is only shown once, if there are
1270               # failing hooks on that node
1271               if show_node_header:
1272                 feedback_fn("  Node %s:" % node_name)
1273                 show_node_header = False
1274               feedback_fn("    ERROR: Script %s failed, output:" % script)
1275               output = indent_re.sub('      ', output)
1276               feedback_fn("%s" % output)
1277               lu_result = 1
1278
1279       return lu_result
1280
1281
1282 class LUVerifyDisks(NoHooksLU):
1283   """Verifies the cluster disks status.
1284
1285   """
1286   _OP_REQP = []
1287   REQ_BGL = False
1288
1289   def ExpandNames(self):
1290     self.needed_locks = {
1291       locking.LEVEL_NODE: locking.ALL_SET,
1292       locking.LEVEL_INSTANCE: locking.ALL_SET,
1293     }
1294     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1295
1296   def CheckPrereq(self):
1297     """Check prerequisites.
1298
1299     This has no prerequisites.
1300
1301     """
1302     pass
1303
1304   def Exec(self, feedback_fn):
1305     """Verify integrity of cluster disks.
1306
1307     @rtype: tuple of three items
1308     @return: a tuple of (dict of node-to-node_error, list of instances
1309         which need activate-disks, dict of instance: (node, volume) for
1310         missing volumes
1311
1312     """
1313     result = res_nodes, res_instances, res_missing = {}, [], {}
1314
1315     vg_name = self.cfg.GetVGName()
1316     nodes = utils.NiceSort(self.cfg.GetNodeList())
1317     instances = [self.cfg.GetInstanceInfo(name)
1318                  for name in self.cfg.GetInstanceList()]
1319
1320     nv_dict = {}
1321     for inst in instances:
1322       inst_lvs = {}
1323       if (not inst.admin_up or
1324           inst.disk_template not in constants.DTS_NET_MIRROR):
1325         continue
1326       inst.MapLVsByNode(inst_lvs)
1327       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1328       for node, vol_list in inst_lvs.iteritems():
1329         for vol in vol_list:
1330           nv_dict[(node, vol)] = inst
1331
1332     if not nv_dict:
1333       return result
1334
1335     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1336
1337     to_act = set()
1338     for node in nodes:
1339       # node_volume
1340       node_res = node_lvs[node]
1341       if node_res.offline:
1342         continue
1343       msg = node_res.fail_msg
1344       if msg:
1345         logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1346         res_nodes[node] = msg
1347         continue
1348
1349       lvs = node_res.payload
1350       for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1351         inst = nv_dict.pop((node, lv_name), None)
1352         if (not lv_online and inst is not None
1353             and inst.name not in res_instances):
1354           res_instances.append(inst.name)
1355
1356     # any leftover items in nv_dict are missing LVs, let's arrange the
1357     # data better
1358     for key, inst in nv_dict.iteritems():
1359       if inst.name not in res_missing:
1360         res_missing[inst.name] = []
1361       res_missing[inst.name].append(key)
1362
1363     return result
1364
1365
1366 class LURenameCluster(LogicalUnit):
1367   """Rename the cluster.
1368
1369   """
1370   HPATH = "cluster-rename"
1371   HTYPE = constants.HTYPE_CLUSTER
1372   _OP_REQP = ["name"]
1373
1374   def BuildHooksEnv(self):
1375     """Build hooks env.
1376
1377     """
1378     env = {
1379       "OP_TARGET": self.cfg.GetClusterName(),
1380       "NEW_NAME": self.op.name,
1381       }
1382     mn = self.cfg.GetMasterNode()
1383     return env, [mn], [mn]
1384
1385   def CheckPrereq(self):
1386     """Verify that the passed name is a valid one.
1387
1388     """
1389     hostname = utils.HostInfo(self.op.name)
1390
1391     new_name = hostname.name
1392     self.ip = new_ip = hostname.ip
1393     old_name = self.cfg.GetClusterName()
1394     old_ip = self.cfg.GetMasterIP()
1395     if new_name == old_name and new_ip == old_ip:
1396       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1397                                  " cluster has changed")
1398     if new_ip != old_ip:
1399       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1400         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1401                                    " reachable on the network. Aborting." %
1402                                    new_ip)
1403
1404     self.op.name = new_name
1405
1406   def Exec(self, feedback_fn):
1407     """Rename the cluster.
1408
1409     """
1410     clustername = self.op.name
1411     ip = self.ip
1412
1413     # shutdown the master IP
1414     master = self.cfg.GetMasterNode()
1415     result = self.rpc.call_node_stop_master(master, False)
1416     result.Raise("Could not disable the master role")
1417
1418     try:
1419       cluster = self.cfg.GetClusterInfo()
1420       cluster.cluster_name = clustername
1421       cluster.master_ip = ip
1422       self.cfg.Update(cluster)
1423
1424       # update the known hosts file
1425       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1426       node_list = self.cfg.GetNodeList()
1427       try:
1428         node_list.remove(master)
1429       except ValueError:
1430         pass
1431       result = self.rpc.call_upload_file(node_list,
1432                                          constants.SSH_KNOWN_HOSTS_FILE)
1433       for to_node, to_result in result.iteritems():
1434         msg = to_result.fail_msg
1435         if msg:
1436           msg = ("Copy of file %s to node %s failed: %s" %
1437                  (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1438           self.proc.LogWarning(msg)
1439
1440     finally:
1441       result = self.rpc.call_node_start_master(master, False)
1442       msg = result.fail_msg
1443       if msg:
1444         self.LogWarning("Could not re-enable the master role on"
1445                         " the master, please restart manually: %s", msg)
1446
1447
1448 def _RecursiveCheckIfLVMBased(disk):
1449   """Check if the given disk or its children are lvm-based.
1450
1451   @type disk: L{objects.Disk}
1452   @param disk: the disk to check
1453   @rtype: booleean
1454   @return: boolean indicating whether a LD_LV dev_type was found or not
1455
1456   """
1457   if disk.children:
1458     for chdisk in disk.children:
1459       if _RecursiveCheckIfLVMBased(chdisk):
1460         return True
1461   return disk.dev_type == constants.LD_LV
1462
1463
1464 class LUSetClusterParams(LogicalUnit):
1465   """Change the parameters of the cluster.
1466
1467   """
1468   HPATH = "cluster-modify"
1469   HTYPE = constants.HTYPE_CLUSTER
1470   _OP_REQP = []
1471   REQ_BGL = False
1472
1473   def CheckArguments(self):
1474     """Check parameters
1475
1476     """
1477     if not hasattr(self.op, "candidate_pool_size"):
1478       self.op.candidate_pool_size = None
1479     if self.op.candidate_pool_size is not None:
1480       try:
1481         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1482       except (ValueError, TypeError), err:
1483         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1484                                    str(err))
1485       if self.op.candidate_pool_size < 1:
1486         raise errors.OpPrereqError("At least one master candidate needed")
1487
1488   def ExpandNames(self):
1489     # FIXME: in the future maybe other cluster params won't require checking on
1490     # all nodes to be modified.
1491     self.needed_locks = {
1492       locking.LEVEL_NODE: locking.ALL_SET,
1493     }
1494     self.share_locks[locking.LEVEL_NODE] = 1
1495
1496   def BuildHooksEnv(self):
1497     """Build hooks env.
1498
1499     """
1500     env = {
1501       "OP_TARGET": self.cfg.GetClusterName(),
1502       "NEW_VG_NAME": self.op.vg_name,
1503       }
1504     mn = self.cfg.GetMasterNode()
1505     return env, [mn], [mn]
1506
1507   def CheckPrereq(self):
1508     """Check prerequisites.
1509
1510     This checks whether the given params don't conflict and
1511     if the given volume group is valid.
1512
1513     """
1514     if self.op.vg_name is not None and not self.op.vg_name:
1515       instances = self.cfg.GetAllInstancesInfo().values()
1516       for inst in instances:
1517         for disk in inst.disks:
1518           if _RecursiveCheckIfLVMBased(disk):
1519             raise errors.OpPrereqError("Cannot disable lvm storage while"
1520                                        " lvm-based instances exist")
1521
1522     node_list = self.acquired_locks[locking.LEVEL_NODE]
1523
1524     # if vg_name not None, checks given volume group on all nodes
1525     if self.op.vg_name:
1526       vglist = self.rpc.call_vg_list(node_list)
1527       for node in node_list:
1528         msg = vglist[node].fail_msg
1529         if msg:
1530           # ignoring down node
1531           self.LogWarning("Error while gathering data on node %s"
1532                           " (ignoring node): %s", node, msg)
1533           continue
1534         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1535                                               self.op.vg_name,
1536                                               constants.MIN_VG_SIZE)
1537         if vgstatus:
1538           raise errors.OpPrereqError("Error on node '%s': %s" %
1539                                      (node, vgstatus))
1540
1541     self.cluster = cluster = self.cfg.GetClusterInfo()
1542     # validate params changes
1543     if self.op.beparams:
1544       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1545       self.new_beparams = objects.FillDict(
1546         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1547
1548     if self.op.nicparams:
1549       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1550       self.new_nicparams = objects.FillDict(
1551         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1552       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1553
1554     # hypervisor list/parameters
1555     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1556     if self.op.hvparams:
1557       if not isinstance(self.op.hvparams, dict):
1558         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1559       for hv_name, hv_dict in self.op.hvparams.items():
1560         if hv_name not in self.new_hvparams:
1561           self.new_hvparams[hv_name] = hv_dict
1562         else:
1563           self.new_hvparams[hv_name].update(hv_dict)
1564
1565     if self.op.enabled_hypervisors is not None:
1566       self.hv_list = self.op.enabled_hypervisors
1567     else:
1568       self.hv_list = cluster.enabled_hypervisors
1569
1570     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1571       # either the enabled list has changed, or the parameters have, validate
1572       for hv_name, hv_params in self.new_hvparams.items():
1573         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1574             (self.op.enabled_hypervisors and
1575              hv_name in self.op.enabled_hypervisors)):
1576           # either this is a new hypervisor, or its parameters have changed
1577           hv_class = hypervisor.GetHypervisor(hv_name)
1578           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1579           hv_class.CheckParameterSyntax(hv_params)
1580           _CheckHVParams(self, node_list, hv_name, hv_params)
1581
1582   def Exec(self, feedback_fn):
1583     """Change the parameters of the cluster.
1584
1585     """
1586     if self.op.vg_name is not None:
1587       new_volume = self.op.vg_name
1588       if not new_volume:
1589         new_volume = None
1590       if new_volume != self.cfg.GetVGName():
1591         self.cfg.SetVGName(new_volume)
1592       else:
1593         feedback_fn("Cluster LVM configuration already in desired"
1594                     " state, not changing")
1595     if self.op.hvparams:
1596       self.cluster.hvparams = self.new_hvparams
1597     if self.op.enabled_hypervisors is not None:
1598       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1599     if self.op.beparams:
1600       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1601     if self.op.nicparams:
1602       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1603
1604     if self.op.candidate_pool_size is not None:
1605       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1606
1607     self.cfg.Update(self.cluster)
1608
1609     # we want to update nodes after the cluster so that if any errors
1610     # happen, we have recorded and saved the cluster info
1611     if self.op.candidate_pool_size is not None:
1612       _AdjustCandidatePool(self)
1613
1614
1615 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1616   """Distribute additional files which are part of the cluster configuration.
1617
1618   ConfigWriter takes care of distributing the config and ssconf files, but
1619   there are more files which should be distributed to all nodes. This function
1620   makes sure those are copied.
1621
1622   @param lu: calling logical unit
1623   @param additional_nodes: list of nodes not in the config to distribute to
1624
1625   """
1626   # 1. Gather target nodes
1627   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1628   dist_nodes = lu.cfg.GetNodeList()
1629   if additional_nodes is not None:
1630     dist_nodes.extend(additional_nodes)
1631   if myself.name in dist_nodes:
1632     dist_nodes.remove(myself.name)
1633   # 2. Gather files to distribute
1634   dist_files = set([constants.ETC_HOSTS,
1635                     constants.SSH_KNOWN_HOSTS_FILE,
1636                     constants.RAPI_CERT_FILE,
1637                     constants.RAPI_USERS_FILE,
1638                    ])
1639
1640   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1641   for hv_name in enabled_hypervisors:
1642     hv_class = hypervisor.GetHypervisor(hv_name)
1643     dist_files.update(hv_class.GetAncillaryFiles())
1644
1645   # 3. Perform the files upload
1646   for fname in dist_files:
1647     if os.path.exists(fname):
1648       result = lu.rpc.call_upload_file(dist_nodes, fname)
1649       for to_node, to_result in result.items():
1650         msg = to_result.fail_msg
1651         if msg:
1652           msg = ("Copy of file %s to node %s failed: %s" %
1653                  (fname, to_node, msg))
1654           lu.proc.LogWarning(msg)
1655
1656
1657 class LURedistributeConfig(NoHooksLU):
1658   """Force the redistribution of cluster configuration.
1659
1660   This is a very simple LU.
1661
1662   """
1663   _OP_REQP = []
1664   REQ_BGL = False
1665
1666   def ExpandNames(self):
1667     self.needed_locks = {
1668       locking.LEVEL_NODE: locking.ALL_SET,
1669     }
1670     self.share_locks[locking.LEVEL_NODE] = 1
1671
1672   def CheckPrereq(self):
1673     """Check prerequisites.
1674
1675     """
1676
1677   def Exec(self, feedback_fn):
1678     """Redistribute the configuration.
1679
1680     """
1681     self.cfg.Update(self.cfg.GetClusterInfo())
1682     _RedistributeAncillaryFiles(self)
1683
1684
1685 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1686   """Sleep and poll for an instance's disk to sync.
1687
1688   """
1689   if not instance.disks:
1690     return True
1691
1692   if not oneshot:
1693     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1694
1695   node = instance.primary_node
1696
1697   for dev in instance.disks:
1698     lu.cfg.SetDiskID(dev, node)
1699
1700   retries = 0
1701   degr_retries = 10 # in seconds, as we sleep 1 second each time
1702   while True:
1703     max_time = 0
1704     done = True
1705     cumul_degraded = False
1706     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1707     msg = rstats.fail_msg
1708     if msg:
1709       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1710       retries += 1
1711       if retries >= 10:
1712         raise errors.RemoteError("Can't contact node %s for mirror data,"
1713                                  " aborting." % node)
1714       time.sleep(6)
1715       continue
1716     rstats = rstats.payload
1717     retries = 0
1718     for i, mstat in enumerate(rstats):
1719       if mstat is None:
1720         lu.LogWarning("Can't compute data for node %s/%s",
1721                            node, instance.disks[i].iv_name)
1722         continue
1723       # we ignore the ldisk parameter
1724       perc_done, est_time, is_degraded, _ = mstat
1725       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1726       if perc_done is not None:
1727         done = False
1728         if est_time is not None:
1729           rem_time = "%d estimated seconds remaining" % est_time
1730           max_time = est_time
1731         else:
1732           rem_time = "no time estimate"
1733         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1734                         (instance.disks[i].iv_name, perc_done, rem_time))
1735
1736     # if we're done but degraded, let's do a few small retries, to
1737     # make sure we see a stable and not transient situation; therefore
1738     # we force restart of the loop
1739     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1740       logging.info("Degraded disks found, %d retries left", degr_retries)
1741       degr_retries -= 1
1742       time.sleep(1)
1743       continue
1744
1745     if done or oneshot:
1746       break
1747
1748     time.sleep(min(60, max_time))
1749
1750   if done:
1751     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1752   return not cumul_degraded
1753
1754
1755 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1756   """Check that mirrors are not degraded.
1757
1758   The ldisk parameter, if True, will change the test from the
1759   is_degraded attribute (which represents overall non-ok status for
1760   the device(s)) to the ldisk (representing the local storage status).
1761
1762   """
1763   lu.cfg.SetDiskID(dev, node)
1764   if ldisk:
1765     idx = 6
1766   else:
1767     idx = 5
1768
1769   result = True
1770   if on_primary or dev.AssembleOnSecondary():
1771     rstats = lu.rpc.call_blockdev_find(node, dev)
1772     msg = rstats.fail_msg
1773     if msg:
1774       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1775       result = False
1776     elif not rstats.payload:
1777       lu.LogWarning("Can't find disk on node %s", node)
1778       result = False
1779     else:
1780       result = result and (not rstats.payload[idx])
1781   if dev.children:
1782     for child in dev.children:
1783       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1784
1785   return result
1786
1787
1788 class LUDiagnoseOS(NoHooksLU):
1789   """Logical unit for OS diagnose/query.
1790
1791   """
1792   _OP_REQP = ["output_fields", "names"]
1793   REQ_BGL = False
1794   _FIELDS_STATIC = utils.FieldSet()
1795   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1796
1797   def ExpandNames(self):
1798     if self.op.names:
1799       raise errors.OpPrereqError("Selective OS query not supported")
1800
1801     _CheckOutputFields(static=self._FIELDS_STATIC,
1802                        dynamic=self._FIELDS_DYNAMIC,
1803                        selected=self.op.output_fields)
1804
1805     # Lock all nodes, in shared mode
1806     # Temporary removal of locks, should be reverted later
1807     # TODO: reintroduce locks when they are lighter-weight
1808     self.needed_locks = {}
1809     #self.share_locks[locking.LEVEL_NODE] = 1
1810     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1811
1812   def CheckPrereq(self):
1813     """Check prerequisites.
1814
1815     """
1816
1817   @staticmethod
1818   def _DiagnoseByOS(node_list, rlist):
1819     """Remaps a per-node return list into an a per-os per-node dictionary
1820
1821     @param node_list: a list with the names of all nodes
1822     @param rlist: a map with node names as keys and OS objects as values
1823
1824     @rtype: dict
1825     @return: a dictionary with osnames as keys and as value another map, with
1826         nodes as keys and tuples of (path, status, diagnose) as values, eg::
1827
1828           {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1829                                      (/srv/..., False, "invalid api")],
1830                            "node2": [(/srv/..., True, "")]}
1831           }
1832
1833     """
1834     all_os = {}
1835     # we build here the list of nodes that didn't fail the RPC (at RPC
1836     # level), so that nodes with a non-responding node daemon don't
1837     # make all OSes invalid
1838     good_nodes = [node_name for node_name in rlist
1839                   if not rlist[node_name].fail_msg]
1840     for node_name, nr in rlist.items():
1841       if nr.fail_msg or not nr.payload:
1842         continue
1843       for name, path, status, diagnose in nr.payload:
1844         if name not in all_os:
1845           # build a list of nodes for this os containing empty lists
1846           # for each node in node_list
1847           all_os[name] = {}
1848           for nname in good_nodes:
1849             all_os[name][nname] = []
1850         all_os[name][node_name].append((path, status, diagnose))
1851     return all_os
1852
1853   def Exec(self, feedback_fn):
1854     """Compute the list of OSes.
1855
1856     """
1857     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1858     node_data = self.rpc.call_os_diagnose(valid_nodes)
1859     pol = self._DiagnoseByOS(valid_nodes, node_data)
1860     output = []
1861     for os_name, os_data in pol.items():
1862       row = []
1863       for field in self.op.output_fields:
1864         if field == "name":
1865           val = os_name
1866         elif field == "valid":
1867           val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1868         elif field == "node_status":
1869           # this is just a copy of the dict
1870           val = {}
1871           for node_name, nos_list in os_data.items():
1872             val[node_name] = nos_list
1873         else:
1874           raise errors.ParameterError(field)
1875         row.append(val)
1876       output.append(row)
1877
1878     return output
1879
1880
1881 class LURemoveNode(LogicalUnit):
1882   """Logical unit for removing a node.
1883
1884   """
1885   HPATH = "node-remove"
1886   HTYPE = constants.HTYPE_NODE
1887   _OP_REQP = ["node_name"]
1888
1889   def BuildHooksEnv(self):
1890     """Build hooks env.
1891
1892     This doesn't run on the target node in the pre phase as a failed
1893     node would then be impossible to remove.
1894
1895     """
1896     env = {
1897       "OP_TARGET": self.op.node_name,
1898       "NODE_NAME": self.op.node_name,
1899       }
1900     all_nodes = self.cfg.GetNodeList()
1901     all_nodes.remove(self.op.node_name)
1902     return env, all_nodes, all_nodes
1903
1904   def CheckPrereq(self):
1905     """Check prerequisites.
1906
1907     This checks:
1908      - the node exists in the configuration
1909      - it does not have primary or secondary instances
1910      - it's not the master
1911
1912     Any errors are signalled by raising errors.OpPrereqError.
1913
1914     """
1915     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1916     if node is None:
1917       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1918
1919     instance_list = self.cfg.GetInstanceList()
1920
1921     masternode = self.cfg.GetMasterNode()
1922     if node.name == masternode:
1923       raise errors.OpPrereqError("Node is the master node,"
1924                                  " you need to failover first.")
1925
1926     for instance_name in instance_list:
1927       instance = self.cfg.GetInstanceInfo(instance_name)
1928       if node.name in instance.all_nodes:
1929         raise errors.OpPrereqError("Instance %s is still running on the node,"
1930                                    " please remove first." % instance_name)
1931     self.op.node_name = node.name
1932     self.node = node
1933
1934   def Exec(self, feedback_fn):
1935     """Removes the node from the cluster.
1936
1937     """
1938     node = self.node
1939     logging.info("Stopping the node daemon and removing configs from node %s",
1940                  node.name)
1941
1942     self.context.RemoveNode(node.name)
1943
1944     result = self.rpc.call_node_leave_cluster(node.name)
1945     msg = result.fail_msg
1946     if msg:
1947       self.LogWarning("Errors encountered on the remote node while leaving"
1948                       " the cluster: %s", msg)
1949
1950     # Promote nodes to master candidate as needed
1951     _AdjustCandidatePool(self)
1952
1953
1954 class LUQueryNodes(NoHooksLU):
1955   """Logical unit for querying nodes.
1956
1957   """
1958   _OP_REQP = ["output_fields", "names", "use_locking"]
1959   REQ_BGL = False
1960   _FIELDS_DYNAMIC = utils.FieldSet(
1961     "dtotal", "dfree",
1962     "mtotal", "mnode", "mfree",
1963     "bootid",
1964     "ctotal", "cnodes", "csockets",
1965     )
1966
1967   _FIELDS_STATIC = utils.FieldSet(
1968     "name", "pinst_cnt", "sinst_cnt",
1969     "pinst_list", "sinst_list",
1970     "pip", "sip", "tags",
1971     "serial_no",
1972     "master_candidate",
1973     "master",
1974     "offline",
1975     "drained",
1976     )
1977
1978   def ExpandNames(self):
1979     _CheckOutputFields(static=self._FIELDS_STATIC,
1980                        dynamic=self._FIELDS_DYNAMIC,
1981                        selected=self.op.output_fields)
1982
1983     self.needed_locks = {}
1984     self.share_locks[locking.LEVEL_NODE] = 1
1985
1986     if self.op.names:
1987       self.wanted = _GetWantedNodes(self, self.op.names)
1988     else:
1989       self.wanted = locking.ALL_SET
1990
1991     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1992     self.do_locking = self.do_node_query and self.op.use_locking
1993     if self.do_locking:
1994       # if we don't request only static fields, we need to lock the nodes
1995       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1996
1997
1998   def CheckPrereq(self):
1999     """Check prerequisites.
2000
2001     """
2002     # The validation of the node list is done in the _GetWantedNodes,
2003     # if non empty, and if empty, there's no validation to do
2004     pass
2005
2006   def Exec(self, feedback_fn):
2007     """Computes the list of nodes and their attributes.
2008
2009     """
2010     all_info = self.cfg.GetAllNodesInfo()
2011     if self.do_locking:
2012       nodenames = self.acquired_locks[locking.LEVEL_NODE]
2013     elif self.wanted != locking.ALL_SET:
2014       nodenames = self.wanted
2015       missing = set(nodenames).difference(all_info.keys())
2016       if missing:
2017         raise errors.OpExecError(
2018           "Some nodes were removed before retrieving their data: %s" % missing)
2019     else:
2020       nodenames = all_info.keys()
2021
2022     nodenames = utils.NiceSort(nodenames)
2023     nodelist = [all_info[name] for name in nodenames]
2024
2025     # begin data gathering
2026
2027     if self.do_node_query:
2028       live_data = {}
2029       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2030                                           self.cfg.GetHypervisorType())
2031       for name in nodenames:
2032         nodeinfo = node_data[name]
2033         if not nodeinfo.fail_msg and nodeinfo.payload:
2034           nodeinfo = nodeinfo.payload
2035           fn = utils.TryConvert
2036           live_data[name] = {
2037             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2038             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2039             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2040             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2041             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2042             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2043             "bootid": nodeinfo.get('bootid', None),
2044             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2045             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2046             }
2047         else:
2048           live_data[name] = {}
2049     else:
2050       live_data = dict.fromkeys(nodenames, {})
2051
2052     node_to_primary = dict([(name, set()) for name in nodenames])
2053     node_to_secondary = dict([(name, set()) for name in nodenames])
2054
2055     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2056                              "sinst_cnt", "sinst_list"))
2057     if inst_fields & frozenset(self.op.output_fields):
2058       instancelist = self.cfg.GetInstanceList()
2059
2060       for instance_name in instancelist:
2061         inst = self.cfg.GetInstanceInfo(instance_name)
2062         if inst.primary_node in node_to_primary:
2063           node_to_primary[inst.primary_node].add(inst.name)
2064         for secnode in inst.secondary_nodes:
2065           if secnode in node_to_secondary:
2066             node_to_secondary[secnode].add(inst.name)
2067
2068     master_node = self.cfg.GetMasterNode()
2069
2070     # end data gathering
2071
2072     output = []
2073     for node in nodelist:
2074       node_output = []
2075       for field in self.op.output_fields:
2076         if field == "name":
2077           val = node.name
2078         elif field == "pinst_list":
2079           val = list(node_to_primary[node.name])
2080         elif field == "sinst_list":
2081           val = list(node_to_secondary[node.name])
2082         elif field == "pinst_cnt":
2083           val = len(node_to_primary[node.name])
2084         elif field == "sinst_cnt":
2085           val = len(node_to_secondary[node.name])
2086         elif field == "pip":
2087           val = node.primary_ip
2088         elif field == "sip":
2089           val = node.secondary_ip
2090         elif field == "tags":
2091           val = list(node.GetTags())
2092         elif field == "serial_no":
2093           val = node.serial_no
2094         elif field == "master_candidate":
2095           val = node.master_candidate
2096         elif field == "master":
2097           val = node.name == master_node
2098         elif field == "offline":
2099           val = node.offline
2100         elif field == "drained":
2101           val = node.drained
2102         elif self._FIELDS_DYNAMIC.Matches(field):
2103           val = live_data[node.name].get(field, None)
2104         else:
2105           raise errors.ParameterError(field)
2106         node_output.append(val)
2107       output.append(node_output)
2108
2109     return output
2110
2111
2112 class LUQueryNodeVolumes(NoHooksLU):
2113   """Logical unit for getting volumes on node(s).
2114
2115   """
2116   _OP_REQP = ["nodes", "output_fields"]
2117   REQ_BGL = False
2118   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2119   _FIELDS_STATIC = utils.FieldSet("node")
2120
2121   def ExpandNames(self):
2122     _CheckOutputFields(static=self._FIELDS_STATIC,
2123                        dynamic=self._FIELDS_DYNAMIC,
2124                        selected=self.op.output_fields)
2125
2126     self.needed_locks = {}
2127     self.share_locks[locking.LEVEL_NODE] = 1
2128     if not self.op.nodes:
2129       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2130     else:
2131       self.needed_locks[locking.LEVEL_NODE] = \
2132         _GetWantedNodes(self, self.op.nodes)
2133
2134   def CheckPrereq(self):
2135     """Check prerequisites.
2136
2137     This checks that the fields required are valid output fields.
2138
2139     """
2140     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2141
2142   def Exec(self, feedback_fn):
2143     """Computes the list of nodes and their attributes.
2144
2145     """
2146     nodenames = self.nodes
2147     volumes = self.rpc.call_node_volumes(nodenames)
2148
2149     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2150              in self.cfg.GetInstanceList()]
2151
2152     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2153
2154     output = []
2155     for node in nodenames:
2156       nresult = volumes[node]
2157       if nresult.offline:
2158         continue
2159       msg = nresult.fail_msg
2160       if msg:
2161         self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2162         continue
2163
2164       node_vols = nresult.payload[:]
2165       node_vols.sort(key=lambda vol: vol['dev'])
2166
2167       for vol in node_vols:
2168         node_output = []
2169         for field in self.op.output_fields:
2170           if field == "node":
2171             val = node
2172           elif field == "phys":
2173             val = vol['dev']
2174           elif field == "vg":
2175             val = vol['vg']
2176           elif field == "name":
2177             val = vol['name']
2178           elif field == "size":
2179             val = int(float(vol['size']))
2180           elif field == "instance":
2181             for inst in ilist:
2182               if node not in lv_by_node[inst]:
2183                 continue
2184               if vol['name'] in lv_by_node[inst][node]:
2185                 val = inst.name
2186                 break
2187             else:
2188               val = '-'
2189           else:
2190             raise errors.ParameterError(field)
2191           node_output.append(str(val))
2192
2193         output.append(node_output)
2194
2195     return output
2196
2197
2198 class LUAddNode(LogicalUnit):
2199   """Logical unit for adding node to the cluster.
2200
2201   """
2202   HPATH = "node-add"
2203   HTYPE = constants.HTYPE_NODE
2204   _OP_REQP = ["node_name"]
2205
2206   def BuildHooksEnv(self):
2207     """Build hooks env.
2208
2209     This will run on all nodes before, and on all nodes + the new node after.
2210
2211     """
2212     env = {
2213       "OP_TARGET": self.op.node_name,
2214       "NODE_NAME": self.op.node_name,
2215       "NODE_PIP": self.op.primary_ip,
2216       "NODE_SIP": self.op.secondary_ip,
2217       }
2218     nodes_0 = self.cfg.GetNodeList()
2219     nodes_1 = nodes_0 + [self.op.node_name, ]
2220     return env, nodes_0, nodes_1
2221
2222   def CheckPrereq(self):
2223     """Check prerequisites.
2224
2225     This checks:
2226      - the new node is not already in the config
2227      - it is resolvable
2228      - its parameters (single/dual homed) matches the cluster
2229
2230     Any errors are signalled by raising errors.OpPrereqError.
2231
2232     """
2233     node_name = self.op.node_name
2234     cfg = self.cfg
2235
2236     dns_data = utils.HostInfo(node_name)
2237
2238     node = dns_data.name
2239     primary_ip = self.op.primary_ip = dns_data.ip
2240     secondary_ip = getattr(self.op, "secondary_ip", None)
2241     if secondary_ip is None:
2242       secondary_ip = primary_ip
2243     if not utils.IsValidIP(secondary_ip):
2244       raise errors.OpPrereqError("Invalid secondary IP given")
2245     self.op.secondary_ip = secondary_ip
2246
2247     node_list = cfg.GetNodeList()
2248     if not self.op.readd and node in node_list:
2249       raise errors.OpPrereqError("Node %s is already in the configuration" %
2250                                  node)
2251     elif self.op.readd and node not in node_list:
2252       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2253
2254     for existing_node_name in node_list:
2255       existing_node = cfg.GetNodeInfo(existing_node_name)
2256
2257       if self.op.readd and node == existing_node_name:
2258         if (existing_node.primary_ip != primary_ip or
2259             existing_node.secondary_ip != secondary_ip):
2260           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2261                                      " address configuration as before")
2262         continue
2263
2264       if (existing_node.primary_ip == primary_ip or
2265           existing_node.secondary_ip == primary_ip or
2266           existing_node.primary_ip == secondary_ip or
2267           existing_node.secondary_ip == secondary_ip):
2268         raise errors.OpPrereqError("New node ip address(es) conflict with"
2269                                    " existing node %s" % existing_node.name)
2270
2271     # check that the type of the node (single versus dual homed) is the
2272     # same as for the master
2273     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2274     master_singlehomed = myself.secondary_ip == myself.primary_ip
2275     newbie_singlehomed = secondary_ip == primary_ip
2276     if master_singlehomed != newbie_singlehomed:
2277       if master_singlehomed:
2278         raise errors.OpPrereqError("The master has no private ip but the"
2279                                    " new node has one")
2280       else:
2281         raise errors.OpPrereqError("The master has a private ip but the"
2282                                    " new node doesn't have one")
2283
2284     # checks reachablity
2285     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2286       raise errors.OpPrereqError("Node not reachable by ping")
2287
2288     if not newbie_singlehomed:
2289       # check reachability from my secondary ip to newbie's secondary ip
2290       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2291                            source=myself.secondary_ip):
2292         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2293                                    " based ping to noded port")
2294
2295     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2296     mc_now, _ = self.cfg.GetMasterCandidateStats()
2297     master_candidate = mc_now < cp_size
2298
2299     self.new_node = objects.Node(name=node,
2300                                  primary_ip=primary_ip,
2301                                  secondary_ip=secondary_ip,
2302                                  master_candidate=master_candidate,
2303                                  offline=False, drained=False)
2304
2305   def Exec(self, feedback_fn):
2306     """Adds the new node to the cluster.
2307
2308     """
2309     new_node = self.new_node
2310     node = new_node.name
2311
2312     # check connectivity
2313     result = self.rpc.call_version([node])[node]
2314     result.Raise("Can't get version information from node %s" % node)
2315     if constants.PROTOCOL_VERSION == result.payload:
2316       logging.info("Communication to node %s fine, sw version %s match",
2317                    node, result.payload)
2318     else:
2319       raise errors.OpExecError("Version mismatch master version %s,"
2320                                " node version %s" %
2321                                (constants.PROTOCOL_VERSION, result.payload))
2322
2323     # setup ssh on node
2324     logging.info("Copy ssh key to node %s", node)
2325     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2326     keyarray = []
2327     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2328                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2329                 priv_key, pub_key]
2330
2331     for i in keyfiles:
2332       f = open(i, 'r')
2333       try:
2334         keyarray.append(f.read())
2335       finally:
2336         f.close()
2337
2338     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2339                                     keyarray[2],
2340                                     keyarray[3], keyarray[4], keyarray[5])
2341     result.Raise("Cannot transfer ssh keys to the new node")
2342
2343     # Add node to our /etc/hosts, and add key to known_hosts
2344     if self.cfg.GetClusterInfo().modify_etc_hosts:
2345       utils.AddHostToEtcHosts(new_node.name)
2346
2347     if new_node.secondary_ip != new_node.primary_ip:
2348       result = self.rpc.call_node_has_ip_address(new_node.name,
2349                                                  new_node.secondary_ip)
2350       result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2351                    prereq=True)
2352       if not result.payload:
2353         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2354                                  " you gave (%s). Please fix and re-run this"
2355                                  " command." % new_node.secondary_ip)
2356
2357     node_verify_list = [self.cfg.GetMasterNode()]
2358     node_verify_param = {
2359       'nodelist': [node],
2360       # TODO: do a node-net-test as well?
2361     }
2362
2363     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2364                                        self.cfg.GetClusterName())
2365     for verifier in node_verify_list:
2366       result[verifier].Raise("Cannot communicate with node %s" % verifier)
2367       nl_payload = result[verifier].payload['nodelist']
2368       if nl_payload:
2369         for failed in nl_payload:
2370           feedback_fn("ssh/hostname verification failed %s -> %s" %
2371                       (verifier, nl_payload[failed]))
2372         raise errors.OpExecError("ssh/hostname verification failed.")
2373
2374     if self.op.readd:
2375       _RedistributeAncillaryFiles(self)
2376       self.context.ReaddNode(new_node)
2377     else:
2378       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2379       self.context.AddNode(new_node)
2380
2381
2382 class LUSetNodeParams(LogicalUnit):
2383   """Modifies the parameters of a node.
2384
2385   """
2386   HPATH = "node-modify"
2387   HTYPE = constants.HTYPE_NODE
2388   _OP_REQP = ["node_name"]
2389   REQ_BGL = False
2390
2391   def CheckArguments(self):
2392     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2393     if node_name is None:
2394       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2395     self.op.node_name = node_name
2396     _CheckBooleanOpField(self.op, 'master_candidate')
2397     _CheckBooleanOpField(self.op, 'offline')
2398     _CheckBooleanOpField(self.op, 'drained')
2399     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2400     if all_mods.count(None) == 3:
2401       raise errors.OpPrereqError("Please pass at least one modification")
2402     if all_mods.count(True) > 1:
2403       raise errors.OpPrereqError("Can't set the node into more than one"
2404                                  " state at the same time")
2405
2406   def ExpandNames(self):
2407     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2408
2409   def BuildHooksEnv(self):
2410     """Build hooks env.
2411
2412     This runs on the master node.
2413
2414     """
2415     env = {
2416       "OP_TARGET": self.op.node_name,
2417       "MASTER_CANDIDATE": str(self.op.master_candidate),
2418       "OFFLINE": str(self.op.offline),
2419       "DRAINED": str(self.op.drained),
2420       }
2421     nl = [self.cfg.GetMasterNode(),
2422           self.op.node_name]
2423     return env, nl, nl
2424
2425   def CheckPrereq(self):
2426     """Check prerequisites.
2427
2428     This only checks the instance list against the existing names.
2429
2430     """
2431     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2432
2433     if ((self.op.master_candidate == False or self.op.offline == True or
2434          self.op.drained == True) and node.master_candidate):
2435       # we will demote the node from master_candidate
2436       if self.op.node_name == self.cfg.GetMasterNode():
2437         raise errors.OpPrereqError("The master node has to be a"
2438                                    " master candidate, online and not drained")
2439       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2440       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2441       if num_candidates <= cp_size:
2442         msg = ("Not enough master candidates (desired"
2443                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2444         if self.op.force:
2445           self.LogWarning(msg)
2446         else:
2447           raise errors.OpPrereqError(msg)
2448
2449     if (self.op.master_candidate == True and
2450         ((node.offline and not self.op.offline == False) or
2451          (node.drained and not self.op.drained == False))):
2452       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2453                                  " to master_candidate" % node.name)
2454
2455     return
2456
2457   def Exec(self, feedback_fn):
2458     """Modifies a node.
2459
2460     """
2461     node = self.node
2462
2463     result = []
2464     changed_mc = False
2465
2466     if self.op.offline is not None:
2467       node.offline = self.op.offline
2468       result.append(("offline", str(self.op.offline)))
2469       if self.op.offline == True:
2470         if node.master_candidate:
2471           node.master_candidate = False
2472           changed_mc = True
2473           result.append(("master_candidate", "auto-demotion due to offline"))
2474         if node.drained:
2475           node.drained = False
2476           result.append(("drained", "clear drained status due to offline"))
2477
2478     if self.op.master_candidate is not None:
2479       node.master_candidate = self.op.master_candidate
2480       changed_mc = True
2481       result.append(("master_candidate", str(self.op.master_candidate)))
2482       if self.op.master_candidate == False:
2483         rrc = self.rpc.call_node_demote_from_mc(node.name)
2484         msg = rrc.fail_msg
2485         if msg:
2486           self.LogWarning("Node failed to demote itself: %s" % msg)
2487
2488     if self.op.drained is not None:
2489       node.drained = self.op.drained
2490       result.append(("drained", str(self.op.drained)))
2491       if self.op.drained == True:
2492         if node.master_candidate:
2493           node.master_candidate = False
2494           changed_mc = True
2495           result.append(("master_candidate", "auto-demotion due to drain"))
2496         if node.offline:
2497           node.offline = False
2498           result.append(("offline", "clear offline status due to drain"))
2499
2500     # this will trigger configuration file update, if needed
2501     self.cfg.Update(node)
2502     # this will trigger job queue propagation or cleanup
2503     if changed_mc:
2504       self.context.ReaddNode(node)
2505
2506     return result
2507
2508
2509 class LUPowercycleNode(NoHooksLU):
2510   """Powercycles a node.
2511
2512   """
2513   _OP_REQP = ["node_name", "force"]
2514   REQ_BGL = False
2515
2516   def CheckArguments(self):
2517     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2518     if node_name is None:
2519       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2520     self.op.node_name = node_name
2521     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2522       raise errors.OpPrereqError("The node is the master and the force"
2523                                  " parameter was not set")
2524
2525   def ExpandNames(self):
2526     """Locking for PowercycleNode.
2527
2528     This is a last-resource option and shouldn't block on other
2529     jobs. Therefore, we grab no locks.
2530
2531     """
2532     self.needed_locks = {}
2533
2534   def CheckPrereq(self):
2535     """Check prerequisites.
2536
2537     This LU has no prereqs.
2538
2539     """
2540     pass
2541
2542   def Exec(self, feedback_fn):
2543     """Reboots a node.
2544
2545     """
2546     result = self.rpc.call_node_powercycle(self.op.node_name,
2547                                            self.cfg.GetHypervisorType())
2548     result.Raise("Failed to schedule the reboot")
2549     return result.payload
2550
2551
2552 class LUQueryClusterInfo(NoHooksLU):
2553   """Query cluster configuration.
2554
2555   """
2556   _OP_REQP = []
2557   REQ_BGL = False
2558
2559   def ExpandNames(self):
2560     self.needed_locks = {}
2561
2562   def CheckPrereq(self):
2563     """No prerequsites needed for this LU.
2564
2565     """
2566     pass
2567
2568   def Exec(self, feedback_fn):
2569     """Return cluster config.
2570
2571     """
2572     cluster = self.cfg.GetClusterInfo()
2573     result = {
2574       "software_version": constants.RELEASE_VERSION,
2575       "protocol_version": constants.PROTOCOL_VERSION,
2576       "config_version": constants.CONFIG_VERSION,
2577       "os_api_version": constants.OS_API_VERSION,
2578       "export_version": constants.EXPORT_VERSION,
2579       "architecture": (platform.architecture()[0], platform.machine()),
2580       "name": cluster.cluster_name,
2581       "master": cluster.master_node,
2582       "default_hypervisor": cluster.default_hypervisor,
2583       "enabled_hypervisors": cluster.enabled_hypervisors,
2584       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2585                         for hypervisor in cluster.enabled_hypervisors]),
2586       "beparams": cluster.beparams,
2587       "nicparams": cluster.nicparams,
2588       "candidate_pool_size": cluster.candidate_pool_size,
2589       "master_netdev": cluster.master_netdev,
2590       "volume_group_name": cluster.volume_group_name,
2591       "file_storage_dir": cluster.file_storage_dir,
2592       }
2593
2594     return result
2595
2596
2597 class LUQueryConfigValues(NoHooksLU):
2598   """Return configuration values.
2599
2600   """
2601   _OP_REQP = []
2602   REQ_BGL = False
2603   _FIELDS_DYNAMIC = utils.FieldSet()
2604   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2605
2606   def ExpandNames(self):
2607     self.needed_locks = {}
2608
2609     _CheckOutputFields(static=self._FIELDS_STATIC,
2610                        dynamic=self._FIELDS_DYNAMIC,
2611                        selected=self.op.output_fields)
2612
2613   def CheckPrereq(self):
2614     """No prerequisites.
2615
2616     """
2617     pass
2618
2619   def Exec(self, feedback_fn):
2620     """Dump a representation of the cluster config to the standard output.
2621
2622     """
2623     values = []
2624     for field in self.op.output_fields:
2625       if field == "cluster_name":
2626         entry = self.cfg.GetClusterName()
2627       elif field == "master_node":
2628         entry = self.cfg.GetMasterNode()
2629       elif field == "drain_flag":
2630         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2631       else:
2632         raise errors.ParameterError(field)
2633       values.append(entry)
2634     return values
2635
2636
2637 class LUActivateInstanceDisks(NoHooksLU):
2638   """Bring up an instance's disks.
2639
2640   """
2641   _OP_REQP = ["instance_name"]
2642   REQ_BGL = False
2643
2644   def ExpandNames(self):
2645     self._ExpandAndLockInstance()
2646     self.needed_locks[locking.LEVEL_NODE] = []
2647     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2648
2649   def DeclareLocks(self, level):
2650     if level == locking.LEVEL_NODE:
2651       self._LockInstancesNodes()
2652
2653   def CheckPrereq(self):
2654     """Check prerequisites.
2655
2656     This checks that the instance is in the cluster.
2657
2658     """
2659     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2660     assert self.instance is not None, \
2661       "Cannot retrieve locked instance %s" % self.op.instance_name
2662     _CheckNodeOnline(self, self.instance.primary_node)
2663
2664   def Exec(self, feedback_fn):
2665     """Activate the disks.
2666
2667     """
2668     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2669     if not disks_ok:
2670       raise errors.OpExecError("Cannot activate block devices")
2671
2672     return disks_info
2673
2674
2675 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2676   """Prepare the block devices for an instance.
2677
2678   This sets up the block devices on all nodes.
2679
2680   @type lu: L{LogicalUnit}
2681   @param lu: the logical unit on whose behalf we execute
2682   @type instance: L{objects.Instance}
2683   @param instance: the instance for whose disks we assemble
2684   @type ignore_secondaries: boolean
2685   @param ignore_secondaries: if true, errors on secondary nodes
2686       won't result in an error return from the function
2687   @return: False if the operation failed, otherwise a list of
2688       (host, instance_visible_name, node_visible_name)
2689       with the mapping from node devices to instance devices
2690
2691   """
2692   device_info = []
2693   disks_ok = True
2694   iname = instance.name
2695   # With the two passes mechanism we try to reduce the window of
2696   # opportunity for the race condition of switching DRBD to primary
2697   # before handshaking occured, but we do not eliminate it
2698
2699   # The proper fix would be to wait (with some limits) until the
2700   # connection has been made and drbd transitions from WFConnection
2701   # into any other network-connected state (Connected, SyncTarget,
2702   # SyncSource, etc.)
2703
2704   # 1st pass, assemble on all nodes in secondary mode
2705   for inst_disk in instance.disks:
2706     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2707       lu.cfg.SetDiskID(node_disk, node)
2708       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2709       msg = result.fail_msg
2710       if msg:
2711         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2712                            " (is_primary=False, pass=1): %s",
2713                            inst_disk.iv_name, node, msg)
2714         if not ignore_secondaries:
2715           disks_ok = False
2716
2717   # FIXME: race condition on drbd migration to primary
2718
2719   # 2nd pass, do only the primary node
2720   for inst_disk in instance.disks:
2721     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2722       if node != instance.primary_node:
2723         continue
2724       lu.cfg.SetDiskID(node_disk, node)
2725       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2726       msg = result.fail_msg
2727       if msg:
2728         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2729                            " (is_primary=True, pass=2): %s",
2730                            inst_disk.iv_name, node, msg)
2731         disks_ok = False
2732     device_info.append((instance.primary_node, inst_disk.iv_name,
2733                         result.payload))
2734
2735   # leave the disks configured for the primary node
2736   # this is a workaround that would be fixed better by
2737   # improving the logical/physical id handling
2738   for disk in instance.disks:
2739     lu.cfg.SetDiskID(disk, instance.primary_node)
2740
2741   return disks_ok, device_info
2742
2743
2744 def _StartInstanceDisks(lu, instance, force):
2745   """Start the disks of an instance.
2746
2747   """
2748   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2749                                            ignore_secondaries=force)
2750   if not disks_ok:
2751     _ShutdownInstanceDisks(lu, instance)
2752     if force is not None and not force:
2753       lu.proc.LogWarning("", hint="If the message above refers to a"
2754                          " secondary node,"
2755                          " you can retry the operation using '--force'.")
2756     raise errors.OpExecError("Disk consistency error")
2757
2758
2759 class LUDeactivateInstanceDisks(NoHooksLU):
2760   """Shutdown an instance's disks.
2761
2762   """
2763   _OP_REQP = ["instance_name"]
2764   REQ_BGL = False
2765
2766   def ExpandNames(self):
2767     self._ExpandAndLockInstance()
2768     self.needed_locks[locking.LEVEL_NODE] = []
2769     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2770
2771   def DeclareLocks(self, level):
2772     if level == locking.LEVEL_NODE:
2773       self._LockInstancesNodes()
2774
2775   def CheckPrereq(self):
2776     """Check prerequisites.
2777
2778     This checks that the instance is in the cluster.
2779
2780     """
2781     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2782     assert self.instance is not None, \
2783       "Cannot retrieve locked instance %s" % self.op.instance_name
2784
2785   def Exec(self, feedback_fn):
2786     """Deactivate the disks
2787
2788     """
2789     instance = self.instance
2790     _SafeShutdownInstanceDisks(self, instance)
2791
2792
2793 def _SafeShutdownInstanceDisks(lu, instance):
2794   """Shutdown block devices of an instance.
2795
2796   This function checks if an instance is running, before calling
2797   _ShutdownInstanceDisks.
2798
2799   """
2800   pnode = instance.primary_node
2801   ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2802   ins_l.Raise("Can't contact node %s" % pnode)
2803
2804   if instance.name in ins_l.payload:
2805     raise errors.OpExecError("Instance is running, can't shutdown"
2806                              " block devices.")
2807
2808   _ShutdownInstanceDisks(lu, instance)
2809
2810
2811 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2812   """Shutdown block devices of an instance.
2813
2814   This does the shutdown on all nodes of the instance.
2815
2816   If the ignore_primary is false, errors on the primary node are
2817   ignored.
2818
2819   """
2820   all_result = True
2821   for disk in instance.disks:
2822     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2823       lu.cfg.SetDiskID(top_disk, node)
2824       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2825       msg = result.fail_msg
2826       if msg:
2827         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2828                       disk.iv_name, node, msg)
2829         if not ignore_primary or node != instance.primary_node:
2830           all_result = False
2831   return all_result
2832
2833
2834 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2835   """Checks if a node has enough free memory.
2836
2837   This function check if a given node has the needed amount of free
2838   memory. In case the node has less memory or we cannot get the
2839   information from the node, this function raise an OpPrereqError
2840   exception.
2841
2842   @type lu: C{LogicalUnit}
2843   @param lu: a logical unit from which we get configuration data
2844   @type node: C{str}
2845   @param node: the node to check
2846   @type reason: C{str}
2847   @param reason: string to use in the error message
2848   @type requested: C{int}
2849   @param requested: the amount of memory in MiB to check for
2850   @type hypervisor_name: C{str}
2851   @param hypervisor_name: the hypervisor to ask for memory stats
2852   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2853       we cannot check the node
2854
2855   """
2856   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2857   nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2858   free_mem = nodeinfo[node].payload.get('memory_free', None)
2859   if not isinstance(free_mem, int):
2860     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2861                                " was '%s'" % (node, free_mem))
2862   if requested > free_mem:
2863     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2864                                " needed %s MiB, available %s MiB" %
2865                                (node, reason, requested, free_mem))
2866
2867
2868 class LUStartupInstance(LogicalUnit):
2869   """Starts an instance.
2870
2871   """
2872   HPATH = "instance-start"
2873   HTYPE = constants.HTYPE_INSTANCE
2874   _OP_REQP = ["instance_name", "force"]
2875   REQ_BGL = False
2876
2877   def ExpandNames(self):
2878     self._ExpandAndLockInstance()
2879
2880   def BuildHooksEnv(self):
2881     """Build hooks env.
2882
2883     This runs on master, primary and secondary nodes of the instance.
2884
2885     """
2886     env = {
2887       "FORCE": self.op.force,
2888       }
2889     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2890     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2891     return env, nl, nl
2892
2893   def CheckPrereq(self):
2894     """Check prerequisites.
2895
2896     This checks that the instance is in the cluster.
2897
2898     """
2899     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2900     assert self.instance is not None, \
2901       "Cannot retrieve locked instance %s" % self.op.instance_name
2902
2903     # extra beparams
2904     self.beparams = getattr(self.op, "beparams", {})
2905     if self.beparams:
2906       if not isinstance(self.beparams, dict):
2907         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2908                                    " dict" % (type(self.beparams), ))
2909       # fill the beparams dict
2910       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2911       self.op.beparams = self.beparams
2912
2913     # extra hvparams
2914     self.hvparams = getattr(self.op, "hvparams", {})
2915     if self.hvparams:
2916       if not isinstance(self.hvparams, dict):
2917         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2918                                    " dict" % (type(self.hvparams), ))
2919
2920       # check hypervisor parameter syntax (locally)
2921       cluster = self.cfg.GetClusterInfo()
2922       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2923       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2924                                     instance.hvparams)
2925       filled_hvp.update(self.hvparams)
2926       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2927       hv_type.CheckParameterSyntax(filled_hvp)
2928       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2929       self.op.hvparams = self.hvparams
2930
2931     _CheckNodeOnline(self, instance.primary_node)
2932
2933     bep = self.cfg.GetClusterInfo().FillBE(instance)
2934     # check bridges existance
2935     _CheckInstanceBridgesExist(self, instance)
2936
2937     remote_info = self.rpc.call_instance_info(instance.primary_node,
2938                                               instance.name,
2939                                               instance.hypervisor)
2940     remote_info.Raise("Error checking node %s" % instance.primary_node,
2941                       prereq=True)
2942     if not remote_info.payload: # not running already
2943       _CheckNodeFreeMemory(self, instance.primary_node,
2944                            "starting instance %s" % instance.name,
2945                            bep[constants.BE_MEMORY], instance.hypervisor)
2946
2947   def Exec(self, feedback_fn):
2948     """Start the instance.
2949
2950     """
2951     instance = self.instance
2952     force = self.op.force
2953
2954     self.cfg.MarkInstanceUp(instance.name)
2955
2956     node_current = instance.primary_node
2957
2958     _StartInstanceDisks(self, instance, force)
2959
2960     result = self.rpc.call_instance_start(node_current, instance,
2961                                           self.hvparams, self.beparams)
2962     msg = result.fail_msg
2963     if msg:
2964       _ShutdownInstanceDisks(self, instance)
2965       raise errors.OpExecError("Could not start instance: %s" % msg)
2966
2967
2968 class LURebootInstance(LogicalUnit):
2969   """Reboot an instance.
2970
2971   """
2972   HPATH = "instance-reboot"
2973   HTYPE = constants.HTYPE_INSTANCE
2974   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2975   REQ_BGL = False
2976
2977   def ExpandNames(self):
2978     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2979                                    constants.INSTANCE_REBOOT_HARD,
2980                                    constants.INSTANCE_REBOOT_FULL]:
2981       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2982                                   (constants.INSTANCE_REBOOT_SOFT,
2983                                    constants.INSTANCE_REBOOT_HARD,
2984                                    constants.INSTANCE_REBOOT_FULL))
2985     self._ExpandAndLockInstance()
2986
2987   def BuildHooksEnv(self):
2988     """Build hooks env.
2989
2990     This runs on master, primary and secondary nodes of the instance.
2991
2992     """
2993     env = {
2994       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2995       "REBOOT_TYPE": self.op.reboot_type,
2996       }
2997     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2998     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2999     return env, nl, nl
3000
3001   def CheckPrereq(self):
3002     """Check prerequisites.
3003
3004     This checks that the instance is in the cluster.
3005
3006     """
3007     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3008     assert self.instance is not None, \
3009       "Cannot retrieve locked instance %s" % self.op.instance_name
3010
3011     _CheckNodeOnline(self, instance.primary_node)
3012
3013     # check bridges existance
3014     _CheckInstanceBridgesExist(self, instance)
3015
3016   def Exec(self, feedback_fn):
3017     """Reboot the instance.
3018
3019     """
3020     instance = self.instance
3021     ignore_secondaries = self.op.ignore_secondaries
3022     reboot_type = self.op.reboot_type
3023
3024     node_current = instance.primary_node
3025
3026     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3027                        constants.INSTANCE_REBOOT_HARD]:
3028       for disk in instance.disks:
3029         self.cfg.SetDiskID(disk, node_current)
3030       result = self.rpc.call_instance_reboot(node_current, instance,
3031                                              reboot_type)
3032       result.Raise("Could not reboot instance")
3033     else:
3034       result = self.rpc.call_instance_shutdown(node_current, instance)
3035       result.Raise("Could not shutdown instance for full reboot")
3036       _ShutdownInstanceDisks(self, instance)
3037       _StartInstanceDisks(self, instance, ignore_secondaries)
3038       result = self.rpc.call_instance_start(node_current, instance, None, None)
3039       msg = result.fail_msg
3040       if msg:
3041         _ShutdownInstanceDisks(self, instance)
3042         raise errors.OpExecError("Could not start instance for"
3043                                  " full reboot: %s" % msg)
3044
3045     self.cfg.MarkInstanceUp(instance.name)
3046
3047
3048 class LUShutdownInstance(LogicalUnit):
3049   """Shutdown an instance.
3050
3051   """
3052   HPATH = "instance-stop"
3053   HTYPE = constants.HTYPE_INSTANCE
3054   _OP_REQP = ["instance_name"]
3055   REQ_BGL = False
3056
3057   def ExpandNames(self):
3058     self._ExpandAndLockInstance()
3059
3060   def BuildHooksEnv(self):
3061     """Build hooks env.
3062
3063     This runs on master, primary and secondary nodes of the instance.
3064
3065     """
3066     env = _BuildInstanceHookEnvByObject(self, self.instance)
3067     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3068     return env, nl, nl
3069
3070   def CheckPrereq(self):
3071     """Check prerequisites.
3072
3073     This checks that the instance is in the cluster.
3074
3075     """
3076     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3077     assert self.instance is not None, \
3078       "Cannot retrieve locked instance %s" % self.op.instance_name
3079     _CheckNodeOnline(self, self.instance.primary_node)
3080
3081   def Exec(self, feedback_fn):
3082     """Shutdown the instance.
3083
3084     """
3085     instance = self.instance
3086     node_current = instance.primary_node
3087     self.cfg.MarkInstanceDown(instance.name)
3088     result = self.rpc.call_instance_shutdown(node_current, instance)
3089     msg = result.fail_msg
3090     if msg:
3091       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3092
3093     _ShutdownInstanceDisks(self, instance)
3094
3095
3096 class LUReinstallInstance(LogicalUnit):
3097   """Reinstall an instance.
3098
3099   """
3100   HPATH = "instance-reinstall"
3101   HTYPE = constants.HTYPE_INSTANCE
3102   _OP_REQP = ["instance_name"]
3103   REQ_BGL = False
3104
3105   def ExpandNames(self):
3106     self._ExpandAndLockInstance()
3107
3108   def BuildHooksEnv(self):
3109     """Build hooks env.
3110
3111     This runs on master, primary and secondary nodes of the instance.
3112
3113     """
3114     env = _BuildInstanceHookEnvByObject(self, self.instance)
3115     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3116     return env, nl, nl
3117
3118   def CheckPrereq(self):
3119     """Check prerequisites.
3120
3121     This checks that the instance is in the cluster and is not running.
3122
3123     """
3124     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3125     assert instance is not None, \
3126       "Cannot retrieve locked instance %s" % self.op.instance_name
3127     _CheckNodeOnline(self, instance.primary_node)
3128
3129     if instance.disk_template == constants.DT_DISKLESS:
3130       raise errors.OpPrereqError("Instance '%s' has no disks" %
3131                                  self.op.instance_name)
3132     if instance.admin_up:
3133       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3134                                  self.op.instance_name)
3135     remote_info = self.rpc.call_instance_info(instance.primary_node,
3136                                               instance.name,
3137                                               instance.hypervisor)
3138     remote_info.Raise("Error checking node %s" % instance.primary_node,
3139                       prereq=True)
3140     if remote_info.payload:
3141       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3142                                  (self.op.instance_name,
3143                                   instance.primary_node))
3144
3145     self.op.os_type = getattr(self.op, "os_type", None)
3146     if self.op.os_type is not None:
3147       # OS verification
3148       pnode = self.cfg.GetNodeInfo(
3149         self.cfg.ExpandNodeName(instance.primary_node))
3150       if pnode is None:
3151         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3152                                    self.op.pnode)
3153       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3154       result.Raise("OS '%s' not in supported OS list for primary node %s" %
3155                    (self.op.os_type, pnode.name), prereq=True)
3156
3157     self.instance = instance
3158
3159   def Exec(self, feedback_fn):
3160     """Reinstall the instance.
3161
3162     """
3163     inst = self.instance
3164
3165     if self.op.os_type is not None:
3166       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3167       inst.os = self.op.os_type
3168       self.cfg.Update(inst)
3169
3170     _StartInstanceDisks(self, inst, None)
3171     try:
3172       feedback_fn("Running the instance OS create scripts...")
3173       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3174       result.Raise("Could not install OS for instance %s on node %s" %
3175                    (inst.name, inst.primary_node))
3176     finally:
3177       _ShutdownInstanceDisks(self, inst)
3178
3179
3180 class LURenameInstance(LogicalUnit):
3181   """Rename an instance.
3182
3183   """
3184   HPATH = "instance-rename"
3185   HTYPE = constants.HTYPE_INSTANCE
3186   _OP_REQP = ["instance_name", "new_name"]
3187
3188   def BuildHooksEnv(self):
3189     """Build hooks env.
3190
3191     This runs on master, primary and secondary nodes of the instance.
3192
3193     """
3194     env = _BuildInstanceHookEnvByObject(self, self.instance)
3195     env["INSTANCE_NEW_NAME"] = self.op.new_name
3196     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3197     return env, nl, nl
3198
3199   def CheckPrereq(self):
3200     """Check prerequisites.
3201
3202     This checks that the instance is in the cluster and is not running.
3203
3204     """
3205     instance = self.cfg.GetInstanceInfo(
3206       self.cfg.ExpandInstanceName(self.op.instance_name))
3207     if instance is None:
3208       raise errors.OpPrereqError("Instance '%s' not known" %
3209                                  self.op.instance_name)
3210     _CheckNodeOnline(self, instance.primary_node)
3211
3212     if instance.admin_up:
3213       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3214                                  self.op.instance_name)
3215     remote_info = self.rpc.call_instance_info(instance.primary_node,
3216                                               instance.name,
3217                                               instance.hypervisor)
3218     remote_info.Raise("Error checking node %s" % instance.primary_node,
3219                       prereq=True)
3220     if remote_info.payload:
3221       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3222                                  (self.op.instance_name,
3223                                   instance.primary_node))
3224     self.instance = instance
3225
3226     # new name verification
3227     name_info = utils.HostInfo(self.op.new_name)
3228
3229     self.op.new_name = new_name = name_info.name
3230     instance_list = self.cfg.GetInstanceList()
3231     if new_name in instance_list:
3232       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3233                                  new_name)
3234
3235     if not getattr(self.op, "ignore_ip", False):
3236       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3237         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3238                                    (name_info.ip, new_name))
3239
3240
3241   def Exec(self, feedback_fn):
3242     """Reinstall the instance.
3243
3244     """
3245     inst = self.instance
3246     old_name = inst.name
3247
3248     if inst.disk_template == constants.DT_FILE:
3249       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3250
3251     self.cfg.RenameInstance(inst.name, self.op.new_name)
3252     # Change the instance lock. This is definitely safe while we hold the BGL
3253     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3254     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3255
3256     # re-read the instance from the configuration after rename
3257     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3258
3259     if inst.disk_template == constants.DT_FILE:
3260       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3261       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3262                                                      old_file_storage_dir,
3263                                                      new_file_storage_dir)
3264       result.Raise("Could not rename on node %s directory '%s' to '%s'"
3265                    " (but the instance has been renamed in Ganeti)" %
3266                    (inst.primary_node, old_file_storage_dir,
3267                     new_file_storage_dir))
3268
3269     _StartInstanceDisks(self, inst, None)
3270     try:
3271       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3272                                                  old_name)
3273       msg = result.fail_msg
3274       if msg:
3275         msg = ("Could not run OS rename script for instance %s on node %s"
3276                " (but the instance has been renamed in Ganeti): %s" %
3277                (inst.name, inst.primary_node, msg))
3278         self.proc.LogWarning(msg)
3279     finally:
3280       _ShutdownInstanceDisks(self, inst)
3281
3282
3283 class LURemoveInstance(LogicalUnit):
3284   """Remove an instance.
3285
3286   """
3287   HPATH = "instance-remove"
3288   HTYPE = constants.HTYPE_INSTANCE
3289   _OP_REQP = ["instance_name", "ignore_failures"]
3290   REQ_BGL = False
3291
3292   def ExpandNames(self):
3293     self._ExpandAndLockInstance()
3294     self.needed_locks[locking.LEVEL_NODE] = []
3295     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3296
3297   def DeclareLocks(self, level):
3298     if level == locking.LEVEL_NODE:
3299       self._LockInstancesNodes()
3300
3301   def BuildHooksEnv(self):
3302     """Build hooks env.
3303
3304     This runs on master, primary and secondary nodes of the instance.
3305
3306     """
3307     env = _BuildInstanceHookEnvByObject(self, self.instance)
3308     nl = [self.cfg.GetMasterNode()]
3309     return env, nl, nl
3310
3311   def CheckPrereq(self):
3312     """Check prerequisites.
3313
3314     This checks that the instance is in the cluster.
3315
3316     """
3317     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3318     assert self.instance is not None, \
3319       "Cannot retrieve locked instance %s" % self.op.instance_name
3320
3321   def Exec(self, feedback_fn):
3322     """Remove the instance.
3323
3324     """
3325     instance = self.instance
3326     logging.info("Shutting down instance %s on node %s",
3327                  instance.name, instance.primary_node)
3328
3329     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3330     msg = result.fail_msg
3331     if msg:
3332       if self.op.ignore_failures:
3333         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3334       else:
3335         raise errors.OpExecError("Could not shutdown instance %s on"
3336                                  " node %s: %s" %
3337                                  (instance.name, instance.primary_node, msg))
3338
3339     logging.info("Removing block devices for instance %s", instance.name)
3340
3341     if not _RemoveDisks(self, instance):
3342       if self.op.ignore_failures:
3343         feedback_fn("Warning: can't remove instance's disks")
3344       else:
3345         raise errors.OpExecError("Can't remove instance's disks")
3346
3347     logging.info("Removing instance %s out of cluster config", instance.name)
3348
3349     self.cfg.RemoveInstance(instance.name)
3350     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3351
3352
3353 class LUQueryInstances(NoHooksLU):
3354   """Logical unit for querying instances.
3355
3356   """
3357   _OP_REQP = ["output_fields", "names", "use_locking"]
3358   REQ_BGL = False
3359   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3360                                     "admin_state",
3361                                     "disk_template", "ip", "mac", "bridge",
3362                                     "sda_size", "sdb_size", "vcpus", "tags",
3363                                     "network_port", "beparams",
3364                                     r"(disk)\.(size)/([0-9]+)",
3365                                     r"(disk)\.(sizes)", "disk_usage",
3366                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3367                                     r"(nic)\.(macs|ips|bridges)",
3368                                     r"(disk|nic)\.(count)",
3369                                     "serial_no", "hypervisor", "hvparams",] +
3370                                   ["hv/%s" % name
3371                                    for name in constants.HVS_PARAMETERS] +
3372                                   ["be/%s" % name
3373                                    for name in constants.BES_PARAMETERS])
3374   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3375
3376
3377   def ExpandNames(self):
3378     _CheckOutputFields(static=self._FIELDS_STATIC,
3379                        dynamic=self._FIELDS_DYNAMIC,
3380                        selected=self.op.output_fields)
3381
3382     self.needed_locks = {}
3383     self.share_locks[locking.LEVEL_INSTANCE] = 1
3384     self.share_locks[locking.LEVEL_NODE] = 1
3385
3386     if self.op.names:
3387       self.wanted = _GetWantedInstances(self, self.op.names)
3388     else:
3389       self.wanted = locking.ALL_SET
3390
3391     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3392     self.do_locking = self.do_node_query and self.op.use_locking
3393     if self.do_locking:
3394       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3395       self.needed_locks[locking.LEVEL_NODE] = []
3396       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3397
3398   def DeclareLocks(self, level):
3399     if level == locking.LEVEL_NODE and self.do_locking:
3400       self._LockInstancesNodes()
3401
3402   def CheckPrereq(self):
3403     """Check prerequisites.
3404
3405     """
3406     pass
3407
3408   def Exec(self, feedback_fn):
3409     """Computes the list of nodes and their attributes.
3410
3411     """
3412     all_info = self.cfg.GetAllInstancesInfo()
3413     if self.wanted == locking.ALL_SET:
3414       # caller didn't specify instance names, so ordering is not important
3415       if self.do_locking:
3416         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3417       else:
3418         instance_names = all_info.keys()
3419       instance_names = utils.NiceSort(instance_names)
3420     else:
3421       # caller did specify names, so we must keep the ordering
3422       if self.do_locking:
3423         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3424       else:
3425         tgt_set = all_info.keys()
3426       missing = set(self.wanted).difference(tgt_set)
3427       if missing:
3428         raise errors.OpExecError("Some instances were removed before"
3429                                  " retrieving their data: %s" % missing)
3430       instance_names = self.wanted
3431
3432     instance_list = [all_info[iname] for iname in instance_names]
3433
3434     # begin data gathering
3435
3436     nodes = frozenset([inst.primary_node for inst in instance_list])
3437     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3438
3439     bad_nodes = []
3440     off_nodes = []
3441     if self.do_node_query:
3442       live_data = {}
3443       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3444       for name in nodes:
3445         result = node_data[name]
3446         if result.offline:
3447           # offline nodes will be in both lists
3448           off_nodes.append(name)
3449         if result.failed or result.fail_msg:
3450           bad_nodes.append(name)
3451         else:
3452           if result.payload:
3453             live_data.update(result.payload)
3454           # else no instance is alive
3455     else:
3456       live_data = dict([(name, {}) for name in instance_names])
3457
3458     # end data gathering
3459
3460     HVPREFIX = "hv/"
3461     BEPREFIX = "be/"
3462     output = []
3463     for instance in instance_list:
3464       iout = []
3465       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3466       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3467       for field in self.op.output_fields:
3468         st_match = self._FIELDS_STATIC.Matches(field)
3469         if field == "name":
3470           val = instance.name
3471         elif field == "os":
3472           val = instance.os
3473         elif field == "pnode":
3474           val = instance.primary_node
3475         elif field == "snodes":
3476           val = list(instance.secondary_nodes)
3477         elif field == "admin_state":
3478           val = instance.admin_up
3479         elif field == "oper_state":
3480           if instance.primary_node in bad_nodes:
3481             val = None
3482           else:
3483             val = bool(live_data.get(instance.name))
3484         elif field == "status":
3485           if instance.primary_node in off_nodes:
3486             val = "ERROR_nodeoffline"
3487           elif instance.primary_node in bad_nodes:
3488             val = "ERROR_nodedown"
3489           else:
3490             running = bool(live_data.get(instance.name))
3491             if running:
3492               if instance.admin_up:
3493                 val = "running"
3494               else:
3495                 val = "ERROR_up"
3496             else:
3497               if instance.admin_up:
3498                 val = "ERROR_down"
3499               else:
3500                 val = "ADMIN_down"
3501         elif field == "oper_ram":
3502           if instance.primary_node in bad_nodes:
3503             val = None
3504           elif instance.name in live_data:
3505             val = live_data[instance.name].get("memory", "?")
3506           else:
3507             val = "-"
3508         elif field == "disk_template":
3509           val = instance.disk_template
3510         elif field == "ip":
3511           if instance.nics:
3512             val = instance.nics[0].ip
3513           else:
3514             val = None
3515         elif field == "bridge":
3516           if instance.nics:
3517             val = instance.nics[0].bridge
3518           else:
3519             val = None
3520         elif field == "mac":
3521           if instance.nics:
3522             val = instance.nics[0].mac
3523           else:
3524             val = None
3525         elif field == "sda_size" or field == "sdb_size":
3526           idx = ord(field[2]) - ord('a')
3527           try:
3528             val = instance.FindDisk(idx).size
3529           except errors.OpPrereqError:
3530             val = None
3531         elif field == "disk_usage": # total disk usage per node
3532           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3533           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3534         elif field == "tags":
3535           val = list(instance.GetTags())
3536         elif field == "serial_no":
3537           val = instance.serial_no
3538         elif field == "network_port":
3539           val = instance.network_port
3540         elif field == "hypervisor":
3541           val = instance.hypervisor
3542         elif field == "hvparams":
3543           val = i_hv
3544         elif (field.startswith(HVPREFIX) and
3545               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3546           val = i_hv.get(field[len(HVPREFIX):], None)
3547         elif field == "beparams":
3548           val = i_be
3549         elif (field.startswith(BEPREFIX) and
3550               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3551           val = i_be.get(field[len(BEPREFIX):], None)
3552         elif st_match and st_match.groups():
3553           # matches a variable list
3554           st_groups = st_match.groups()
3555           if st_groups and st_groups[0] == "disk":
3556             if st_groups[1] == "count":
3557               val = len(instance.disks)
3558             elif st_groups[1] == "sizes":
3559               val = [disk.size for disk in instance.disks]
3560             elif st_groups[1] == "size":
3561               try:
3562                 val = instance.FindDisk(st_groups[2]).size
3563               except errors.OpPrereqError:
3564                 val = None
3565             else:
3566               assert False, "Unhandled disk parameter"
3567           elif st_groups[0] == "nic":
3568             if st_groups[1] == "count":
3569               val = len(instance.nics)
3570             elif st_groups[1] == "macs":
3571               val = [nic.mac for nic in instance.nics]
3572             elif st_groups[1] == "ips":
3573               val = [nic.ip for nic in instance.nics]
3574             elif st_groups[1] == "bridges":
3575               val = [nic.bridge for nic in instance.nics]
3576             else:
3577               # index-based item
3578               nic_idx = int(st_groups[2])
3579               if nic_idx >= len(instance.nics):
3580                 val = None
3581               else:
3582                 if st_groups[1] == "mac":
3583                   val = instance.nics[nic_idx].mac
3584                 elif st_groups[1] == "ip":
3585                   val = instance.nics[nic_idx].ip
3586                 elif st_groups[1] == "bridge":
3587                   val = instance.nics[nic_idx].bridge
3588                 else:
3589                   assert False, "Unhandled NIC parameter"
3590           else:
3591             assert False, "Unhandled variable parameter"
3592         else:
3593           raise errors.ParameterError(field)
3594         iout.append(val)
3595       output.append(iout)
3596
3597     return output
3598
3599
3600 class LUFailoverInstance(LogicalUnit):
3601   """Failover an instance.
3602
3603   """
3604   HPATH = "instance-failover"
3605   HTYPE = constants.HTYPE_INSTANCE
3606   _OP_REQP = ["instance_name", "ignore_consistency"]
3607   REQ_BGL = False
3608
3609   def ExpandNames(self):
3610     self._ExpandAndLockInstance()
3611     self.needed_locks[locking.LEVEL_NODE] = []
3612     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3613
3614   def DeclareLocks(self, level):
3615     if level == locking.LEVEL_NODE:
3616       self._LockInstancesNodes()
3617
3618   def BuildHooksEnv(self):
3619     """Build hooks env.
3620
3621     This runs on master, primary and secondary nodes of the instance.
3622
3623     """
3624     env = {
3625       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3626       }
3627     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3628     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3629     return env, nl, nl
3630
3631   def CheckPrereq(self):
3632     """Check prerequisites.
3633
3634     This checks that the instance is in the cluster.
3635
3636     """
3637     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3638     assert self.instance is not None, \
3639       "Cannot retrieve locked instance %s" % self.op.instance_name
3640
3641     bep = self.cfg.GetClusterInfo().FillBE(instance)
3642     if instance.disk_template not in constants.DTS_NET_MIRROR:
3643       raise errors.OpPrereqError("Instance's disk layout is not"
3644                                  " network mirrored, cannot failover.")
3645
3646     secondary_nodes = instance.secondary_nodes
3647     if not secondary_nodes:
3648       raise errors.ProgrammerError("no secondary node but using "
3649                                    "a mirrored disk template")
3650
3651     target_node = secondary_nodes[0]
3652     _CheckNodeOnline(self, target_node)
3653     _CheckNodeNotDrained(self, target_node)
3654     if instance.admin_up:
3655       # check memory requirements on the secondary node
3656       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3657                            instance.name, bep[constants.BE_MEMORY],
3658                            instance.hypervisor)
3659     else:
3660       self.LogInfo("Not checking memory on the secondary node as"
3661                    " instance will not be started")
3662
3663     # check bridge existance
3664     _CheckInstanceBridgesExist(self, instance, node=target_node)
3665
3666   def Exec(self, feedback_fn):
3667     """Failover an instance.
3668
3669     The failover is done by shutting it down on its present node and
3670     starting it on the secondary.
3671
3672     """
3673     instance = self.instance
3674
3675     source_node = instance.primary_node
3676     target_node = instance.secondary_nodes[0]
3677
3678     feedback_fn("* checking disk consistency between source and target")
3679     for dev in instance.disks:
3680       # for drbd, these are drbd over lvm
3681       if not _CheckDiskConsistency(self, dev, target_node, False):
3682         if instance.admin_up and not self.op.ignore_consistency:
3683           raise errors.OpExecError("Disk %s is degraded on target node,"
3684                                    " aborting failover." % dev.iv_name)
3685
3686     feedback_fn("* shutting down instance on source node")
3687     logging.info("Shutting down instance %s on node %s",
3688                  instance.name, source_node)
3689
3690     result = self.rpc.call_instance_shutdown(source_node, instance)
3691     msg = result.fail_msg
3692     if msg:
3693       if self.op.ignore_consistency:
3694         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3695                              " Proceeding anyway. Please make sure node"
3696                              " %s is down. Error details: %s",
3697                              instance.name, source_node, source_node, msg)
3698       else:
3699         raise errors.OpExecError("Could not shutdown instance %s on"
3700                                  " node %s: %s" %
3701                                  (instance.name, source_node, msg))
3702
3703     feedback_fn("* deactivating the instance's disks on source node")
3704     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3705       raise errors.OpExecError("Can't shut down the instance's disks.")
3706
3707     instance.primary_node = target_node
3708     # distribute new instance config to the other nodes
3709     self.cfg.Update(instance)
3710
3711     # Only start the instance if it's marked as up
3712     if instance.admin_up:
3713       feedback_fn("* activating the instance's disks on target node")
3714       logging.info("Starting instance %s on node %s",
3715                    instance.name, target_node)
3716
3717       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3718                                                ignore_secondaries=True)
3719       if not disks_ok:
3720         _ShutdownInstanceDisks(self, instance)
3721         raise errors.OpExecError("Can't activate the instance's disks")
3722
3723       feedback_fn("* starting the instance on the target node")
3724       result = self.rpc.call_instance_start(target_node, instance, None, None)
3725       msg = result.fail_msg
3726       if msg:
3727         _ShutdownInstanceDisks(self, instance)
3728         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3729                                  (instance.name, target_node, msg))
3730
3731
3732 class LUMigrateInstance(LogicalUnit):
3733   """Migrate an instance.
3734
3735   This is migration without shutting down, compared to the failover,
3736   which is done with shutdown.
3737
3738   """
3739   HPATH = "instance-migrate"
3740   HTYPE = constants.HTYPE_INSTANCE
3741   _OP_REQP = ["instance_name", "live", "cleanup"]
3742
3743   REQ_BGL = False
3744
3745   def ExpandNames(self):
3746     self._ExpandAndLockInstance()
3747     self.needed_locks[locking.LEVEL_NODE] = []
3748     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3749
3750   def DeclareLocks(self, level):
3751     if level == locking.LEVEL_NODE:
3752       self._LockInstancesNodes()
3753
3754   def BuildHooksEnv(self):
3755     """Build hooks env.
3756
3757     This runs on master, primary and secondary nodes of the instance.
3758
3759     """
3760     env = _BuildInstanceHookEnvByObject(self, self.instance)
3761     env["MIGRATE_LIVE"] = self.op.live
3762     env["MIGRATE_CLEANUP"] = self.op.cleanup
3763     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3764     return env, nl, nl
3765
3766   def CheckPrereq(self):
3767     """Check prerequisites.
3768
3769     This checks that the instance is in the cluster.
3770
3771     """
3772     instance = self.cfg.GetInstanceInfo(
3773       self.cfg.ExpandInstanceName(self.op.instance_name))
3774     if instance is None:
3775       raise errors.OpPrereqError("Instance '%s' not known" %
3776                                  self.op.instance_name)
3777
3778     if instance.disk_template != constants.DT_DRBD8:
3779       raise errors.OpPrereqError("Instance's disk layout is not"
3780                                  " drbd8, cannot migrate.")
3781
3782     secondary_nodes = instance.secondary_nodes
3783     if not secondary_nodes:
3784       raise errors.ConfigurationError("No secondary node but using"
3785                                       " drbd8 disk template")
3786
3787     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3788
3789     target_node = secondary_nodes[0]
3790     # check memory requirements on the secondary node
3791     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3792                          instance.name, i_be[constants.BE_MEMORY],
3793                          instance.hypervisor)
3794
3795     # check bridge existance
3796     _CheckInstanceBridgesExist(self, instance, node=target_node)
3797
3798     if not self.op.cleanup:
3799       _CheckNodeNotDrained(self, target_node)
3800       result = self.rpc.call_instance_migratable(instance.primary_node,
3801                                                  instance)
3802       result.Raise("Can't migrate, please use failover", prereq=True)
3803
3804     self.instance = instance
3805
3806   def _WaitUntilSync(self):
3807     """Poll with custom rpc for disk sync.
3808
3809     This uses our own step-based rpc call.
3810
3811     """
3812     self.feedback_fn("* wait until resync is done")
3813     all_done = False
3814     while not all_done:
3815       all_done = True
3816       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3817                                             self.nodes_ip,
3818                                             self.instance.disks)
3819       min_percent = 100
3820       for node, nres in result.items():
3821         nres.Raise("Cannot resync disks on node %s" % node)
3822         node_done, node_percent = nres.payload
3823         all_done = all_done and node_done
3824         if node_percent is not None:
3825           min_percent = min(min_percent, node_percent)
3826       if not all_done:
3827         if min_percent < 100:
3828           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3829         time.sleep(2)
3830
3831   def _EnsureSecondary(self, node):
3832     """Demote a node to secondary.
3833
3834     """
3835     self.feedback_fn("* switching node %s to secondary mode" % node)
3836
3837     for dev in self.instance.disks:
3838       self.cfg.SetDiskID(dev, node)
3839
3840     result = self.rpc.call_blockdev_close(node, self.instance.name,
3841                                           self.instance.disks)
3842     result.Raise("Cannot change disk to secondary on node %s" % node)
3843
3844   def _GoStandalone(self):
3845     """Disconnect from the network.
3846
3847     """
3848     self.feedback_fn("* changing into standalone mode")
3849     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3850                                                self.instance.disks)
3851     for node, nres in result.items():
3852       nres.Raise("Cannot disconnect disks node %s" % node)
3853
3854   def _GoReconnect(self, multimaster):
3855     """Reconnect to the network.
3856
3857     """
3858     if multimaster:
3859       msg = "dual-master"
3860     else:
3861       msg = "single-master"
3862     self.feedback_fn("* changing disks into %s mode" % msg)
3863     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3864                                            self.instance.disks,
3865                                            self.instance.name, multimaster)
3866     for node, nres in result.items():
3867       nres.Raise("Cannot change disks config on node %s" % node)
3868
3869   def _ExecCleanup(self):
3870     """Try to cleanup after a failed migration.
3871
3872     The cleanup is done by:
3873       - check that the instance is running only on one node
3874         (and update the config if needed)
3875       - change disks on its secondary node to secondary
3876       - wait until disks are fully synchronized
3877       - disconnect from the network
3878       - change disks into single-master mode
3879       - wait again until disks are fully synchronized
3880
3881     """
3882     instance = self.instance
3883     target_node = self.target_node
3884     source_node = self.source_node
3885
3886     # check running on only one node
3887     self.feedback_fn("* checking where the instance actually runs"
3888                      " (if this hangs, the hypervisor might be in"
3889                      " a bad state)")
3890     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3891     for node, result in ins_l.items():
3892       result.Raise("Can't contact node %s" % node)
3893
3894     runningon_source = instance.name in ins_l[source_node].payload
3895     runningon_target = instance.name in ins_l[target_node].payload
3896
3897     if runningon_source and runningon_target:
3898       raise errors.OpExecError("Instance seems to be running on two nodes,"
3899                                " or the hypervisor is confused. You will have"
3900                                " to ensure manually that it runs only on one"
3901                                " and restart this operation.")
3902
3903     if not (runningon_source or runningon_target):
3904       raise errors.OpExecError("Instance does not seem to be running at all."
3905                                " In this case, it's safer to repair by"
3906                                " running 'gnt-instance stop' to ensure disk"
3907                                " shutdown, and then restarting it.")
3908
3909     if runningon_target:
3910       # the migration has actually succeeded, we need to update the config
3911       self.feedback_fn("* instance running on secondary node (%s),"
3912                        " updating config" % target_node)
3913       instance.primary_node = target_node
3914       self.cfg.Update(instance)
3915       demoted_node = source_node
3916     else:
3917       self.feedback_fn("* instance confirmed to be running on its"
3918                        " primary node (%s)" % source_node)
3919       demoted_node = target_node
3920
3921     self._EnsureSecondary(demoted_node)
3922     try:
3923       self._WaitUntilSync()
3924     except errors.OpExecError:
3925       # we ignore here errors, since if the device is standalone, it
3926       # won't be able to sync
3927       pass
3928     self._GoStandalone()
3929     self._GoReconnect(False)
3930     self._WaitUntilSync()
3931
3932     self.feedback_fn("* done")
3933
3934   def _RevertDiskStatus(self):
3935     """Try to revert the disk status after a failed migration.
3936
3937     """
3938     target_node = self.target_node
3939     try:
3940       self._EnsureSecondary(target_node)
3941       self._GoStandalone()
3942       self._GoReconnect(False)
3943       self._WaitUntilSync()
3944     except errors.OpExecError, err:
3945       self.LogWarning("Migration failed and I can't reconnect the"
3946                       " drives: error '%s'\n"
3947                       "Please look and recover the instance status" %
3948                       str(err))
3949
3950   def _AbortMigration(self):
3951     """Call the hypervisor code to abort a started migration.
3952
3953     """
3954     instance = self.instance
3955     target_node = self.target_node
3956     migration_info = self.migration_info
3957
3958     abort_result = self.rpc.call_finalize_migration(target_node,
3959                                                     instance,
3960                                                     migration_info,
3961                                                     False)
3962     abort_msg = abort_result.fail_msg
3963     if abort_msg:
3964       logging.error("Aborting migration failed on target node %s: %s" %
3965                     (target_node, abort_msg))
3966       # Don't raise an exception here, as we stil have to try to revert the
3967       # disk status, even if this step failed.
3968
3969   def _ExecMigration(self):
3970     """Migrate an instance.
3971
3972     The migrate is done by:
3973       - change the disks into dual-master mode
3974       - wait until disks are fully synchronized again
3975       - migrate the instance
3976       - change disks on the new secondary node (the old primary) to secondary
3977       - wait until disks are fully synchronized
3978       - change disks into single-master mode
3979
3980     """
3981     instance = self.instance
3982     target_node = self.target_node
3983     source_node = self.source_node
3984
3985     self.feedback_fn("* checking disk consistency between source and target")
3986     for dev in instance.disks:
3987       if not _CheckDiskConsistency(self, dev, target_node, False):
3988         raise errors.OpExecError("Disk %s is degraded or not fully"
3989                                  " synchronized on target node,"
3990                                  " aborting migrate." % dev.iv_name)
3991
3992     # First get the migration information from the remote node
3993     result = self.rpc.call_migration_info(source_node, instance)
3994     msg = result.fail_msg
3995     if msg:
3996       log_err = ("Failed fetching source migration information from %s: %s" %
3997                  (source_node, msg))
3998       logging.error(log_err)
3999       raise errors.OpExecError(log_err)
4000
4001     self.migration_info = migration_info = result.payload
4002
4003     # Then switch the disks to master/master mode
4004     self._EnsureSecondary(target_node)
4005     self._GoStandalone()
4006     self._GoReconnect(True)
4007     self._WaitUntilSync()
4008
4009     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4010     result = self.rpc.call_accept_instance(target_node,
4011                                            instance,
4012                                            migration_info,
4013                                            self.nodes_ip[target_node])
4014
4015     msg = result.fail_msg
4016     if msg:
4017       logging.error("Instance pre-migration failed, trying to revert"
4018                     " disk status: %s", msg)
4019       self._AbortMigration()
4020       self._RevertDiskStatus()
4021       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4022                                (instance.name, msg))
4023
4024     self.feedback_fn("* migrating instance to %s" % target_node)
4025     time.sleep(10)
4026     result = self.rpc.call_instance_migrate(source_node, instance,
4027                                             self.nodes_ip[target_node],
4028                                             self.op.live)
4029     msg = result.fail_msg
4030     if msg:
4031       logging.error("Instance migration failed, trying to revert"
4032                     " disk status: %s", msg)
4033       self._AbortMigration()
4034       self._RevertDiskStatus()
4035       raise errors.OpExecError("Could not migrate instance %s: %s" %
4036                                (instance.name, msg))
4037     time.sleep(10)
4038
4039     instance.primary_node = target_node
4040     # distribute new instance config to the other nodes
4041     self.cfg.Update(instance)
4042
4043     result = self.rpc.call_finalize_migration(target_node,
4044                                               instance,
4045                                               migration_info,
4046                                               True)
4047     msg = result.fail_msg
4048     if msg:
4049       logging.error("Instance migration succeeded, but finalization failed:"
4050                     " %s" % msg)
4051       raise errors.OpExecError("Could not finalize instance migration: %s" %
4052                                msg)
4053
4054     self._EnsureSecondary(source_node)
4055     self._WaitUntilSync()
4056     self._GoStandalone()
4057     self._GoReconnect(False)
4058     self._WaitUntilSync()
4059
4060     self.feedback_fn("* done")
4061
4062   def Exec(self, feedback_fn):
4063     """Perform the migration.
4064
4065     """
4066     self.feedback_fn = feedback_fn
4067
4068     self.source_node = self.instance.primary_node
4069     self.target_node = self.instance.secondary_nodes[0]
4070     self.all_nodes = [self.source_node, self.target_node]
4071     self.nodes_ip = {
4072       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4073       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4074       }
4075     if self.op.cleanup:
4076       return self._ExecCleanup()
4077     else:
4078       return self._ExecMigration()
4079
4080
4081 def _CreateBlockDev(lu, node, instance, device, force_create,
4082                     info, force_open):
4083   """Create a tree of block devices on a given node.
4084
4085   If this device type has to be created on secondaries, create it and
4086   all its children.
4087
4088   If not, just recurse to children keeping the same 'force' value.
4089
4090   @param lu: the lu on whose behalf we execute
4091   @param node: the node on which to create the device
4092   @type instance: L{objects.Instance}
4093   @param instance: the instance which owns the device
4094   @type device: L{objects.Disk}
4095   @param device: the device to create
4096   @type force_create: boolean
4097   @param force_create: whether to force creation of this device; this
4098       will be change to True whenever we find a device which has
4099       CreateOnSecondary() attribute
4100   @param info: the extra 'metadata' we should attach to the device
4101       (this will be represented as a LVM tag)
4102   @type force_open: boolean
4103   @param force_open: this parameter will be passes to the
4104       L{backend.BlockdevCreate} function where it specifies
4105       whether we run on primary or not, and it affects both
4106       the child assembly and the device own Open() execution
4107
4108   """
4109   if device.CreateOnSecondary():
4110     force_create = True
4111
4112   if device.children:
4113     for child in device.children:
4114       _CreateBlockDev(lu, node, instance, child, force_create,
4115                       info, force_open)
4116
4117   if not force_create:
4118     return
4119
4120   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4121
4122
4123 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4124   """Create a single block device on a given node.
4125
4126   This will not recurse over children of the device, so they must be
4127   created in advance.
4128
4129   @param lu: the lu on whose behalf we execute
4130   @param node: the node on which to create the device
4131   @type instance: L{objects.Instance}
4132   @param instance: the instance which owns the device
4133   @type device: L{objects.Disk}
4134   @param device: the device to create
4135   @param info: the extra 'metadata' we should attach to the device
4136       (this will be represented as a LVM tag)
4137   @type force_open: boolean
4138   @param force_open: this parameter will be passes to the
4139       L{backend.BlockdevCreate} function where it specifies
4140       whether we run on primary or not, and it affects both
4141       the child assembly and the device own Open() execution
4142
4143   """
4144   lu.cfg.SetDiskID(device, node)
4145   result = lu.rpc.call_blockdev_create(node, device, device.size,
4146                                        instance.name, force_open, info)
4147   result.Raise("Can't create block device %s on"
4148                " node %s for instance %s" % (device, node, instance.name))
4149   if device.physical_id is None:
4150     device.physical_id = result.payload
4151
4152
4153 def _GenerateUniqueNames(lu, exts):
4154   """Generate a suitable LV name.
4155
4156   This will generate a logical volume name for the given instance.
4157
4158   """
4159   results = []
4160   for val in exts:
4161     new_id = lu.cfg.GenerateUniqueID()
4162     results.append("%s%s" % (new_id, val))
4163   return results
4164
4165
4166 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4167                          p_minor, s_minor):
4168   """Generate a drbd8 device complete with its children.
4169
4170   """
4171   port = lu.cfg.AllocatePort()
4172   vgname = lu.cfg.GetVGName()
4173   shared_secret = lu.cfg.GenerateDRBDSecret()
4174   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4175                           logical_id=(vgname, names[0]))
4176   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4177                           logical_id=(vgname, names[1]))
4178   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4179                           logical_id=(primary, secondary, port,
4180                                       p_minor, s_minor,
4181                                       shared_secret),
4182                           children=[dev_data, dev_meta],
4183                           iv_name=iv_name)
4184   return drbd_dev
4185
4186
4187 def _GenerateDiskTemplate(lu, template_name,
4188                           instance_name, primary_node,
4189                           secondary_nodes, disk_info,
4190                           file_storage_dir, file_driver,
4191                           base_index):
4192   """Generate the entire disk layout for a given template type.
4193
4194   """
4195   #TODO: compute space requirements
4196
4197   vgname = lu.cfg.GetVGName()
4198   disk_count = len(disk_info)
4199   disks = []
4200   if template_name == constants.DT_DISKLESS:
4201     pass
4202   elif template_name == constants.DT_PLAIN:
4203     if len(secondary_nodes) != 0:
4204       raise errors.ProgrammerError("Wrong template configuration")
4205
4206     names = _GenerateUniqueNames(lu, [".disk%d" % i
4207                                       for i in range(disk_count)])
4208     for idx, disk in enumerate(disk_info):
4209       disk_index = idx + base_index
4210       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4211                               logical_id=(vgname, names[idx]),
4212                               iv_name="disk/%d" % disk_index,
4213                               mode=disk["mode"])
4214       disks.append(disk_dev)
4215   elif template_name == constants.DT_DRBD8:
4216     if len(secondary_nodes) != 1:
4217       raise errors.ProgrammerError("Wrong template configuration")
4218     remote_node = secondary_nodes[0]
4219     minors = lu.cfg.AllocateDRBDMinor(
4220       [primary_node, remote_node] * len(disk_info), instance_name)
4221
4222     names = []
4223     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4224                                                for i in range(disk_count)]):
4225       names.append(lv_prefix + "_data")
4226       names.append(lv_prefix + "_meta")
4227     for idx, disk in enumerate(disk_info):
4228       disk_index = idx + base_index
4229       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4230                                       disk["size"], names[idx*2:idx*2+2],
4231                                       "disk/%d" % disk_index,
4232                                       minors[idx*2], minors[idx*2+1])
4233       disk_dev.mode = disk["mode"]
4234       disks.append(disk_dev)
4235   elif template_name == constants.DT_FILE:
4236     if len(secondary_nodes) != 0:
4237       raise errors.ProgrammerError("Wrong template configuration")
4238
4239     for idx, disk in enumerate(disk_info):
4240       disk_index = idx + base_index
4241       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4242                               iv_name="disk/%d" % disk_index,
4243                               logical_id=(file_driver,
4244                                           "%s/disk%d" % (file_storage_dir,
4245                                                          disk_index)),
4246                               mode=disk["mode"])
4247       disks.append(disk_dev)
4248   else:
4249     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4250   return disks
4251
4252
4253 def _GetInstanceInfoText(instance):
4254   """Compute that text that should be added to the disk's metadata.
4255
4256   """
4257   return "originstname+%s" % instance.name
4258
4259
4260 def _CreateDisks(lu, instance):
4261   """Create all disks for an instance.
4262
4263   This abstracts away some work from AddInstance.
4264
4265   @type lu: L{LogicalUnit}
4266   @param lu: the logical unit on whose behalf we execute
4267   @type instance: L{objects.Instance}
4268   @param instance: the instance whose disks we should create
4269   @rtype: boolean
4270   @return: the success of the creation
4271
4272   """
4273   info = _GetInstanceInfoText(instance)
4274   pnode = instance.primary_node
4275
4276   if instance.disk_template == constants.DT_FILE:
4277     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4278     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4279
4280     result.Raise("Failed to create directory '%s' on"
4281                  " node %s: %s" % (file_storage_dir, pnode))
4282
4283   # Note: this needs to be kept in sync with adding of disks in
4284   # LUSetInstanceParams
4285   for device in instance.disks:
4286     logging.info("Creating volume %s for instance %s",
4287                  device.iv_name, instance.name)
4288     #HARDCODE
4289     for node in instance.all_nodes:
4290       f_create = node == pnode
4291       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4292
4293
4294 def _RemoveDisks(lu, instance):
4295   """Remove all disks for an instance.
4296
4297   This abstracts away some work from `AddInstance()` and
4298   `RemoveInstance()`. Note that in case some of the devices couldn't
4299   be removed, the removal will continue with the other ones (compare
4300   with `_CreateDisks()`).
4301
4302   @type lu: L{LogicalUnit}
4303   @param lu: the logical unit on whose behalf we execute
4304   @type instance: L{objects.Instance}
4305   @param instance: the instance whose disks we should remove
4306   @rtype: boolean
4307   @return: the success of the removal
4308
4309   """
4310   logging.info("Removing block devices for instance %s", instance.name)
4311
4312   all_result = True
4313   for device in instance.disks:
4314     for node, disk in device.ComputeNodeTree(instance.primary_node):
4315       lu.cfg.SetDiskID(disk, node)
4316       msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4317       if msg:
4318         lu.LogWarning("Could not remove block device %s on node %s,"
4319                       " continuing anyway: %s", device.iv_name, node, msg)
4320         all_result = False
4321
4322   if instance.disk_template == constants.DT_FILE:
4323     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4324     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4325                                                  file_storage_dir)
4326     msg = result.fail_msg
4327     if msg:
4328       lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4329                     file_storage_dir, instance.primary_node, msg)
4330       all_result = False
4331
4332   return all_result
4333
4334
4335 def _ComputeDiskSize(disk_template, disks):
4336   """Compute disk size requirements in the volume group
4337
4338   """
4339   # Required free disk space as a function of disk and swap space
4340   req_size_dict = {
4341     constants.DT_DISKLESS: None,
4342     constants.DT_PLAIN: sum(d["size"] for d in disks),
4343     # 128 MB are added for drbd metadata for each disk
4344     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4345     constants.DT_FILE: None,
4346   }
4347
4348   if disk_template not in req_size_dict:
4349     raise errors.ProgrammerError("Disk template '%s' size requirement"
4350                                  " is unknown" %  disk_template)
4351
4352   return req_size_dict[disk_template]
4353
4354
4355 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4356   """Hypervisor parameter validation.
4357
4358   This function abstract the hypervisor parameter validation to be
4359   used in both instance create and instance modify.
4360
4361   @type lu: L{LogicalUnit}
4362   @param lu: the logical unit for which we check
4363   @type nodenames: list
4364   @param nodenames: the list of nodes on which we should check
4365   @type hvname: string
4366   @param hvname: the name of the hypervisor we should use
4367   @type hvparams: dict
4368   @param hvparams: the parameters which we need to check
4369   @raise errors.OpPrereqError: if the parameters are not valid
4370
4371   """
4372   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4373                                                   hvname,
4374                                                   hvparams)
4375   for node in nodenames:
4376     info = hvinfo[node]
4377     if info.offline:
4378       continue
4379     info.Raise("Hypervisor parameter validation failed on node %s" % node)
4380
4381
4382 class LUCreateInstance(LogicalUnit):
4383   """Create an instance.
4384
4385   """
4386   HPATH = "instance-add"
4387   HTYPE = constants.HTYPE_INSTANCE
4388   _OP_REQP = ["instance_name", "disks", "disk_template",
4389               "mode", "start",
4390               "wait_for_sync", "ip_check", "nics",
4391               "hvparams", "beparams"]
4392   REQ_BGL = False
4393
4394   def _ExpandNode(self, node):
4395     """Expands and checks one node name.
4396
4397     """
4398     node_full = self.cfg.ExpandNodeName(node)
4399     if node_full is None:
4400       raise errors.OpPrereqError("Unknown node %s" % node)
4401     return node_full
4402
4403   def ExpandNames(self):
4404     """ExpandNames for CreateInstance.
4405
4406     Figure out the right locks for instance creation.
4407
4408     """
4409     self.needed_locks = {}
4410
4411     # set optional parameters to none if they don't exist
4412     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4413       if not hasattr(self.op, attr):
4414         setattr(self.op, attr, None)
4415
4416     # cheap checks, mostly valid constants given
4417
4418     # verify creation mode
4419     if self.op.mode not in (constants.INSTANCE_CREATE,
4420                             constants.INSTANCE_IMPORT):
4421       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4422                                  self.op.mode)
4423
4424     # disk template and mirror node verification
4425     if self.op.disk_template not in constants.DISK_TEMPLATES:
4426       raise errors.OpPrereqError("Invalid disk template name")
4427
4428     if self.op.hypervisor is None:
4429       self.op.hypervisor = self.cfg.GetHypervisorType()
4430
4431     cluster = self.cfg.GetClusterInfo()
4432     enabled_hvs = cluster.enabled_hypervisors
4433     if self.op.hypervisor not in enabled_hvs:
4434       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4435                                  " cluster (%s)" % (self.op.hypervisor,
4436                                   ",".join(enabled_hvs)))
4437
4438     # check hypervisor parameter syntax (locally)
4439     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4440     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4441                                   self.op.hvparams)
4442     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4443     hv_type.CheckParameterSyntax(filled_hvp)
4444     self.hv_full = filled_hvp
4445
4446     # fill and remember the beparams dict
4447     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4448     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4449                                     self.op.beparams)
4450
4451     #### instance parameters check
4452
4453     # instance name verification
4454     hostname1 = utils.HostInfo(self.op.instance_name)
4455     self.op.instance_name = instance_name = hostname1.name
4456
4457     # this is just a preventive check, but someone might still add this
4458     # instance in the meantime, and creation will fail at lock-add time
4459     if instance_name in self.cfg.GetInstanceList():
4460       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4461                                  instance_name)
4462
4463     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4464
4465     # NIC buildup
4466     self.nics = []
4467     for idx, nic in enumerate(self.op.nics):
4468       nic_mode_req = nic.get("mode", None)
4469       nic_mode = nic_mode_req
4470       if nic_mode is None:
4471         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4472
4473       # in routed mode, for the first nic, the default ip is 'auto'
4474       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4475         default_ip_mode = constants.VALUE_AUTO
4476       else:
4477         default_ip_mode = constants.VALUE_NONE
4478
4479       # ip validity checks
4480       ip = nic.get("ip", default_ip_mode)
4481       if ip is None or ip.lower() == constants.VALUE_NONE:
4482         nic_ip = None
4483       elif ip.lower() == constants.VALUE_AUTO:
4484         nic_ip = hostname1.ip
4485       else:
4486         if not utils.IsValidIP(ip):
4487           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4488                                      " like a valid IP" % ip)
4489         nic_ip = ip
4490
4491       # TODO: check the ip for uniqueness !!
4492       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4493         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4494
4495       # MAC address verification
4496       mac = nic.get("mac", constants.VALUE_AUTO)
4497       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4498         if not utils.IsValidMac(mac.lower()):
4499           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4500                                      mac)
4501       # bridge verification
4502       bridge = nic.get("bridge", None)
4503       link = nic.get("link", None)
4504       if bridge and link:
4505         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4506       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4507         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4508       elif bridge:
4509         link = bridge
4510
4511       nicparams = {}
4512       if nic_mode_req:
4513         nicparams[constants.NIC_MODE] = nic_mode_req
4514       if link:
4515         nicparams[constants.NIC_LINK] = link
4516
4517       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4518                                       nicparams)
4519       objects.NIC.CheckParameterSyntax(check_params)
4520       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4521
4522     # disk checks/pre-build
4523     self.disks = []
4524     for disk in self.op.disks:
4525       mode = disk.get("mode", constants.DISK_RDWR)
4526       if mode not in constants.DISK_ACCESS_SET:
4527         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4528                                    mode)
4529       size = disk.get("size", None)
4530       if size is None:
4531         raise errors.OpPrereqError("Missing disk size")
4532       try:
4533         size = int(size)
4534       except ValueError:
4535         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4536       self.disks.append({"size": size, "mode": mode})
4537
4538     # used in CheckPrereq for ip ping check
4539     self.check_ip = hostname1.ip
4540
4541     # file storage checks
4542     if (self.op.file_driver and
4543         not self.op.file_driver in constants.FILE_DRIVER):
4544       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4545                                  self.op.file_driver)
4546
4547     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4548       raise errors.OpPrereqError("File storage directory path not absolute")
4549
4550     ### Node/iallocator related checks
4551     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4552       raise errors.OpPrereqError("One and only one of iallocator and primary"
4553                                  " node must be given")
4554
4555     if self.op.iallocator:
4556       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4557     else:
4558       self.op.pnode = self._ExpandNode(self.op.pnode)
4559       nodelist = [self.op.pnode]
4560       if self.op.snode is not None:
4561         self.op.snode = self._ExpandNode(self.op.snode)
4562         nodelist.append(self.op.snode)
4563       self.needed_locks[locking.LEVEL_NODE] = nodelist
4564
4565     # in case of import lock the source node too
4566     if self.op.mode == constants.INSTANCE_IMPORT:
4567       src_node = getattr(self.op, "src_node", None)
4568       src_path = getattr(self.op, "src_path", None)
4569
4570       if src_path is None:
4571         self.op.src_path = src_path = self.op.instance_name
4572
4573       if src_node is None:
4574         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4575         self.op.src_node = None
4576         if os.path.isabs(src_path):
4577           raise errors.OpPrereqError("Importing an instance from an absolute"
4578                                      " path requires a source node option.")
4579       else:
4580         self.op.src_node = src_node = self._ExpandNode(src_node)
4581         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4582           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4583         if not os.path.isabs(src_path):
4584           self.op.src_path = src_path = \
4585             os.path.join(constants.EXPORT_DIR, src_path)
4586
4587     else: # INSTANCE_CREATE
4588       if getattr(self.op, "os_type", None) is None:
4589         raise errors.OpPrereqError("No guest OS specified")
4590
4591   def _RunAllocator(self):
4592     """Run the allocator based on input opcode.
4593
4594     """
4595     nics = [n.ToDict() for n in self.nics]
4596     ial = IAllocator(self,
4597                      mode=constants.IALLOCATOR_MODE_ALLOC,
4598                      name=self.op.instance_name,
4599                      disk_template=self.op.disk_template,
4600                      tags=[],
4601                      os=self.op.os_type,
4602                      vcpus=self.be_full[constants.BE_VCPUS],
4603                      mem_size=self.be_full[constants.BE_MEMORY],
4604                      disks=self.disks,
4605                      nics=nics,
4606                      hypervisor=self.op.hypervisor,
4607                      )
4608
4609     ial.Run(self.op.iallocator)
4610
4611     if not ial.success:
4612       raise errors.OpPrereqError("Can't compute nodes using"
4613                                  " iallocator '%s': %s" % (self.op.iallocator,
4614                                                            ial.info))
4615     if len(ial.nodes) != ial.required_nodes:
4616       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4617                                  " of nodes (%s), required %s" %
4618                                  (self.op.iallocator, len(ial.nodes),
4619                                   ial.required_nodes))
4620     self.op.pnode = ial.nodes[0]
4621     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4622                  self.op.instance_name, self.op.iallocator,
4623                  ", ".join(ial.nodes))
4624     if ial.required_nodes == 2:
4625       self.op.snode = ial.nodes[1]
4626
4627   def BuildHooksEnv(self):
4628     """Build hooks env.
4629
4630     This runs on master, primary and secondary nodes of the instance.
4631
4632     """
4633     env = {
4634       "ADD_MODE": self.op.mode,
4635       }
4636     if self.op.mode == constants.INSTANCE_IMPORT:
4637       env["SRC_NODE"] = self.op.src_node
4638       env["SRC_PATH"] = self.op.src_path
4639       env["SRC_IMAGES"] = self.src_images
4640
4641     env.update(_BuildInstanceHookEnv(
4642       name=self.op.instance_name,
4643       primary_node=self.op.pnode,
4644       secondary_nodes=self.secondaries,
4645       status=self.op.start,
4646       os_type=self.op.os_type,
4647       memory=self.be_full[constants.BE_MEMORY],
4648       vcpus=self.be_full[constants.BE_VCPUS],
4649       nics=_PreBuildNICHooksList(self, self.nics),
4650       disk_template=self.op.disk_template,
4651       disks=[(d["size"], d["mode"]) for d in self.disks],
4652       bep=self.be_full,
4653       hvp=self.hv_full,
4654       hypervisor=self.op.hypervisor,
4655     ))
4656
4657     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4658           self.secondaries)
4659     return env, nl, nl
4660
4661
4662   def CheckPrereq(self):
4663     """Check prerequisites.
4664
4665     """
4666     if (not self.cfg.GetVGName() and
4667         self.op.disk_template not in constants.DTS_NOT_LVM):
4668       raise errors.OpPrereqError("Cluster does not support lvm-based"
4669                                  " instances")
4670
4671     if self.op.mode == constants.INSTANCE_IMPORT:
4672       src_node = self.op.src_node
4673       src_path = self.op.src_path
4674
4675       if src_node is None:
4676         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4677         exp_list = self.rpc.call_export_list(locked_nodes)
4678         found = False
4679         for node in exp_list:
4680           if exp_list[node].fail_msg:
4681             continue
4682           if src_path in exp_list[node].payload:
4683             found = True
4684             self.op.src_node = src_node = node
4685             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4686                                                        src_path)
4687             break
4688         if not found:
4689           raise errors.OpPrereqError("No export found for relative path %s" %
4690                                       src_path)
4691
4692       _CheckNodeOnline(self, src_node)
4693       result = self.rpc.call_export_info(src_node, src_path)
4694       result.Raise("No export or invalid export found in dir %s" % src_path)
4695
4696       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4697       if not export_info.has_section(constants.INISECT_EXP):
4698         raise errors.ProgrammerError("Corrupted export config")
4699
4700       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4701       if (int(ei_version) != constants.EXPORT_VERSION):
4702         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4703                                    (ei_version, constants.EXPORT_VERSION))
4704
4705       # Check that the new instance doesn't have less disks than the export
4706       instance_disks = len(self.disks)
4707       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4708       if instance_disks < export_disks:
4709         raise errors.OpPrereqError("Not enough disks to import."
4710                                    " (instance: %d, export: %d)" %
4711                                    (instance_disks, export_disks))
4712
4713       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4714       disk_images = []
4715       for idx in range(export_disks):
4716         option = 'disk%d_dump' % idx
4717         if export_info.has_option(constants.INISECT_INS, option):
4718           # FIXME: are the old os-es, disk sizes, etc. useful?
4719           export_name = export_info.get(constants.INISECT_INS, option)
4720           image = os.path.join(src_path, export_name)
4721           disk_images.append(image)
4722         else:
4723           disk_images.append(False)
4724
4725       self.src_images = disk_images
4726
4727       old_name = export_info.get(constants.INISECT_INS, 'name')
4728       # FIXME: int() here could throw a ValueError on broken exports
4729       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4730       if self.op.instance_name == old_name:
4731         for idx, nic in enumerate(self.nics):
4732           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4733             nic_mac_ini = 'nic%d_mac' % idx
4734             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4735
4736     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4737     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4738     if self.op.start and not self.op.ip_check:
4739       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4740                                  " adding an instance in start mode")
4741
4742     if self.op.ip_check:
4743       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4744         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4745                                    (self.check_ip, self.op.instance_name))
4746
4747     #### mac address generation
4748     # By generating here the mac address both the allocator and the hooks get
4749     # the real final mac address rather than the 'auto' or 'generate' value.
4750     # There is a race condition between the generation and the instance object
4751     # creation, which means that we know the mac is valid now, but we're not
4752     # sure it will be when we actually add the instance. If things go bad
4753     # adding the instance will abort because of a duplicate mac, and the
4754     # creation job will fail.
4755     for nic in self.nics:
4756       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4757         nic.mac = self.cfg.GenerateMAC()
4758
4759     #### allocator run
4760
4761     if self.op.iallocator is not None:
4762       self._RunAllocator()
4763
4764     #### node related checks
4765
4766     # check primary node
4767     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4768     assert self.pnode is not None, \
4769       "Cannot retrieve locked node %s" % self.op.pnode
4770     if pnode.offline:
4771       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4772                                  pnode.name)
4773     if pnode.drained:
4774       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4775                                  pnode.name)
4776
4777     self.secondaries = []
4778
4779     # mirror node verification
4780     if self.op.disk_template in constants.DTS_NET_MIRROR:
4781       if self.op.snode is None:
4782         raise errors.OpPrereqError("The networked disk templates need"
4783                                    " a mirror node")
4784       if self.op.snode == pnode.name:
4785         raise errors.OpPrereqError("The secondary node cannot be"
4786                                    " the primary node.")
4787       _CheckNodeOnline(self, self.op.snode)
4788       _CheckNodeNotDrained(self, self.op.snode)
4789       self.secondaries.append(self.op.snode)
4790
4791     nodenames = [pnode.name] + self.secondaries
4792
4793     req_size = _ComputeDiskSize(self.op.disk_template,
4794                                 self.disks)
4795
4796     # Check lv size requirements
4797     if req_size is not None:
4798       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4799                                          self.op.hypervisor)
4800       for node in nodenames:
4801         info = nodeinfo[node]
4802         info.Raise("Cannot get current information from node %s" % node)
4803         info = info.payload
4804         vg_free = info.get('vg_free', None)
4805         if not isinstance(vg_free, int):
4806           raise errors.OpPrereqError("Can't compute free disk space on"
4807                                      " node %s" % node)
4808         if req_size > vg_free:
4809           raise errors.OpPrereqError("Not enough disk space on target node %s."
4810                                      " %d MB available, %d MB required" %
4811                                      (node, vg_free, req_size))
4812
4813     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4814
4815     # os verification
4816     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4817     result.Raise("OS '%s' not in supported os list for primary node %s" %
4818                  (self.op.os_type, pnode.name), prereq=True)
4819
4820     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4821
4822     # memory check on primary node
4823     if self.op.start:
4824       _CheckNodeFreeMemory(self, self.pnode.name,
4825                            "creating instance %s" % self.op.instance_name,
4826                            self.be_full[constants.BE_MEMORY],
4827                            self.op.hypervisor)
4828
4829   def Exec(self, feedback_fn):
4830     """Create and add the instance to the cluster.
4831
4832     """
4833     instance = self.op.instance_name
4834     pnode_name = self.pnode.name
4835
4836     ht_kind = self.op.hypervisor
4837     if ht_kind in constants.HTS_REQ_PORT:
4838       network_port = self.cfg.AllocatePort()
4839     else:
4840       network_port = None
4841
4842     ##if self.op.vnc_bind_address is None:
4843     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4844
4845     # this is needed because os.path.join does not accept None arguments
4846     if self.op.file_storage_dir is None:
4847       string_file_storage_dir = ""
4848     else:
4849       string_file_storage_dir = self.op.file_storage_dir
4850
4851     # build the full file storage dir path
4852     file_storage_dir = os.path.normpath(os.path.join(
4853                                         self.cfg.GetFileStorageDir(),
4854                                         string_file_storage_dir, instance))
4855
4856
4857     disks = _GenerateDiskTemplate(self,
4858                                   self.op.disk_template,
4859                                   instance, pnode_name,
4860                                   self.secondaries,
4861                                   self.disks,
4862                                   file_storage_dir,
4863                                   self.op.file_driver,
4864                                   0)
4865
4866     iobj = objects.Instance(name=instance, os=self.op.os_type,
4867                             primary_node=pnode_name,
4868                             nics=self.nics, disks=disks,
4869                             disk_template=self.op.disk_template,
4870                             admin_up=False,
4871                             network_port=network_port,
4872                             beparams=self.op.beparams,
4873                             hvparams=self.op.hvparams,
4874                             hypervisor=self.op.hypervisor,
4875                             )
4876
4877     feedback_fn("* creating instance disks...")
4878     try:
4879       _CreateDisks(self, iobj)
4880     except errors.OpExecError:
4881       self.LogWarning("Device creation failed, reverting...")
4882       try:
4883         _RemoveDisks(self, iobj)
4884       finally:
4885         self.cfg.ReleaseDRBDMinors(instance)
4886         raise
4887
4888     feedback_fn("adding instance %s to cluster config" % instance)
4889
4890     self.cfg.AddInstance(iobj)
4891     # Declare that we don't want to remove the instance lock anymore, as we've
4892     # added the instance to the config
4893     del self.remove_locks[locking.LEVEL_INSTANCE]
4894     # Unlock all the nodes
4895     if self.op.mode == constants.INSTANCE_IMPORT:
4896       nodes_keep = [self.op.src_node]
4897       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4898                        if node != self.op.src_node]
4899       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4900       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4901     else:
4902       self.context.glm.release(locking.LEVEL_NODE)
4903       del self.acquired_locks[locking.LEVEL_NODE]
4904
4905     if self.op.wait_for_sync:
4906       disk_abort = not _WaitForSync(self, iobj)
4907     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4908       # make sure the disks are not degraded (still sync-ing is ok)
4909       time.sleep(15)
4910       feedback_fn("* checking mirrors status")
4911       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4912     else:
4913       disk_abort = False
4914
4915     if disk_abort:
4916       _RemoveDisks(self, iobj)
4917       self.cfg.RemoveInstance(iobj.name)
4918       # Make sure the instance lock gets removed
4919       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4920       raise errors.OpExecError("There are some degraded disks for"
4921                                " this instance")
4922
4923     feedback_fn("creating os for instance %s on node %s" %
4924                 (instance, pnode_name))
4925
4926     if iobj.disk_template != constants.DT_DISKLESS:
4927       if self.op.mode == constants.INSTANCE_CREATE:
4928         feedback_fn("* running the instance OS create scripts...")
4929         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4930         result.Raise("Could not add os for instance %s"
4931                      " on node %s" % (instance, pnode_name))
4932
4933       elif self.op.mode == constants.INSTANCE_IMPORT:
4934         feedback_fn("* running the instance OS import scripts...")
4935         src_node = self.op.src_node
4936         src_images = self.src_images
4937         cluster_name = self.cfg.GetClusterName()
4938         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4939                                                          src_node, src_images,
4940                                                          cluster_name)
4941         msg = import_result.fail_msg
4942         if msg:
4943           self.LogWarning("Error while importing the disk images for instance"
4944                           " %s on node %s: %s" % (instance, pnode_name, msg))
4945       else:
4946         # also checked in the prereq part
4947         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4948                                      % self.op.mode)
4949
4950     if self.op.start:
4951       iobj.admin_up = True
4952       self.cfg.Update(iobj)
4953       logging.info("Starting instance %s on node %s", instance, pnode_name)
4954       feedback_fn("* starting instance...")
4955       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4956       result.Raise("Could not start instance")
4957
4958
4959 class LUConnectConsole(NoHooksLU):
4960   """Connect to an instance's console.
4961
4962   This is somewhat special in that it returns the command line that
4963   you need to run on the master node in order to connect to the
4964   console.
4965
4966   """
4967   _OP_REQP = ["instance_name"]
4968   REQ_BGL = False
4969
4970   def ExpandNames(self):
4971     self._ExpandAndLockInstance()
4972
4973   def CheckPrereq(self):
4974     """Check prerequisites.
4975
4976     This checks that the instance is in the cluster.
4977
4978     """
4979     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4980     assert self.instance is not None, \
4981       "Cannot retrieve locked instance %s" % self.op.instance_name
4982     _CheckNodeOnline(self, self.instance.primary_node)
4983
4984   def Exec(self, feedback_fn):
4985     """Connect to the console of an instance
4986
4987     """
4988     instance = self.instance
4989     node = instance.primary_node
4990
4991     node_insts = self.rpc.call_instance_list([node],
4992                                              [instance.hypervisor])[node]
4993     node_insts.Raise("Can't get node information from %s" % node)
4994
4995     if instance.name not in node_insts.payload:
4996       raise errors.OpExecError("Instance %s is not running." % instance.name)
4997
4998     logging.debug("Connecting to console of %s on %s", instance.name, node)
4999
5000     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5001     cluster = self.cfg.GetClusterInfo()
5002     # beparams and hvparams are passed separately, to avoid editing the
5003     # instance and then saving the defaults in the instance itself.
5004     hvparams = cluster.FillHV(instance)
5005     beparams = cluster.FillBE(instance)
5006     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5007
5008     # build ssh cmdline
5009     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5010
5011
5012 class LUReplaceDisks(LogicalUnit):
5013   """Replace the disks of an instance.
5014
5015   """
5016   HPATH = "mirrors-replace"
5017   HTYPE = constants.HTYPE_INSTANCE
5018   _OP_REQP = ["instance_name", "mode", "disks"]
5019   REQ_BGL = False
5020
5021   def CheckArguments(self):
5022     if not hasattr(self.op, "remote_node"):
5023       self.op.remote_node = None
5024     if not hasattr(self.op, "iallocator"):
5025       self.op.iallocator = None
5026
5027     # check for valid parameter combination
5028     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5029     if self.op.mode == constants.REPLACE_DISK_CHG:
5030       if cnt == 2:
5031         raise errors.OpPrereqError("When changing the secondary either an"
5032                                    " iallocator script must be used or the"
5033                                    " new node given")
5034       elif cnt == 0:
5035         raise errors.OpPrereqError("Give either the iallocator or the new"
5036                                    " secondary, not both")
5037     else: # not replacing the secondary
5038       if cnt != 2:
5039         raise errors.OpPrereqError("The iallocator and new node options can"
5040                                    " be used only when changing the"
5041                                    " secondary node")
5042
5043   def ExpandNames(self):
5044     self._ExpandAndLockInstance()
5045
5046     if self.op.iallocator is not None:
5047       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5048     elif self.op.remote_node is not None:
5049       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5050       if remote_node is None:
5051         raise errors.OpPrereqError("Node '%s' not known" %
5052                                    self.op.remote_node)
5053       self.op.remote_node = remote_node
5054       # Warning: do not remove the locking of the new secondary here
5055       # unless DRBD8.AddChildren is changed to work in parallel;
5056       # currently it doesn't since parallel invocations of
5057       # FindUnusedMinor will conflict
5058       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5059       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5060     else:
5061       self.needed_locks[locking.LEVEL_NODE] = []
5062       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5063
5064   def DeclareLocks(self, level):
5065     # If we're not already locking all nodes in the set we have to declare the
5066     # instance's primary/secondary nodes.
5067     if (level == locking.LEVEL_NODE and
5068         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5069       self._LockInstancesNodes()
5070
5071   def _RunAllocator(self):
5072     """Compute a new secondary node using an IAllocator.
5073
5074     """
5075     ial = IAllocator(self,
5076                      mode=constants.IALLOCATOR_MODE_RELOC,
5077                      name=self.op.instance_name,
5078                      relocate_from=[self.sec_node])
5079
5080     ial.Run(self.op.iallocator)
5081
5082     if not ial.success:
5083       raise errors.OpPrereqError("Can't compute nodes using"
5084                                  " iallocator '%s': %s" % (self.op.iallocator,
5085                                                            ial.info))
5086     if len(ial.nodes) != ial.required_nodes:
5087       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5088                                  " of nodes (%s), required %s" %
5089                                  (len(ial.nodes), ial.required_nodes))
5090     self.op.remote_node = ial.nodes[0]
5091     self.LogInfo("Selected new secondary for the instance: %s",
5092                  self.op.remote_node)
5093
5094   def BuildHooksEnv(self):
5095     """Build hooks env.
5096
5097     This runs on the master, the primary and all the secondaries.
5098
5099     """
5100     env = {
5101       "MODE": self.op.mode,
5102       "NEW_SECONDARY": self.op.remote_node,
5103       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5104       }
5105     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5106     nl = [
5107       self.cfg.GetMasterNode(),
5108       self.instance.primary_node,
5109       ]
5110     if self.op.remote_node is not None:
5111       nl.append(self.op.remote_node)
5112     return env, nl, nl
5113
5114   def CheckPrereq(self):
5115     """Check prerequisites.
5116
5117     This checks that the instance is in the cluster.
5118
5119     """
5120     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5121     assert instance is not None, \
5122       "Cannot retrieve locked instance %s" % self.op.instance_name
5123     self.instance = instance
5124
5125     if instance.disk_template != constants.DT_DRBD8:
5126       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5127                                  " instances")
5128
5129     if len(instance.secondary_nodes) != 1:
5130       raise errors.OpPrereqError("The instance has a strange layout,"
5131                                  " expected one secondary but found %d" %
5132                                  len(instance.secondary_nodes))
5133
5134     self.sec_node = instance.secondary_nodes[0]
5135
5136     if self.op.iallocator is not None:
5137       self._RunAllocator()
5138
5139     remote_node = self.op.remote_node
5140     if remote_node is not None:
5141       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5142       assert self.remote_node_info is not None, \
5143         "Cannot retrieve locked node %s" % remote_node
5144     else:
5145       self.remote_node_info = None
5146     if remote_node == instance.primary_node:
5147       raise errors.OpPrereqError("The specified node is the primary node of"
5148                                  " the instance.")
5149     elif remote_node == self.sec_node:
5150       raise errors.OpPrereqError("The specified node is already the"
5151                                  " secondary node of the instance.")
5152
5153     if self.op.mode == constants.REPLACE_DISK_PRI:
5154       n1 = self.tgt_node = instance.primary_node
5155       n2 = self.oth_node = self.sec_node
5156     elif self.op.mode == constants.REPLACE_DISK_SEC:
5157       n1 = self.tgt_node = self.sec_node
5158       n2 = self.oth_node = instance.primary_node
5159     elif self.op.mode == constants.REPLACE_DISK_CHG:
5160       n1 = self.new_node = remote_node
5161       n2 = self.oth_node = instance.primary_node
5162       self.tgt_node = self.sec_node
5163       _CheckNodeNotDrained(self, remote_node)
5164     else:
5165       raise errors.ProgrammerError("Unhandled disk replace mode")
5166
5167     _CheckNodeOnline(self, n1)
5168     _CheckNodeOnline(self, n2)
5169
5170     if not self.op.disks:
5171       self.op.disks = range(len(instance.disks))
5172
5173     for disk_idx in self.op.disks:
5174       instance.FindDisk(disk_idx)
5175
5176   def _ExecD8DiskOnly(self, feedback_fn):
5177     """Replace a disk on the primary or secondary for dbrd8.
5178
5179     The algorithm for replace is quite complicated:
5180
5181       1. for each disk to be replaced:
5182
5183         1. create new LVs on the target node with unique names
5184         1. detach old LVs from the drbd device
5185         1. rename old LVs to name_replaced.<time_t>
5186         1. rename new LVs to old LVs
5187         1. attach the new LVs (with the old names now) to the drbd device
5188
5189       1. wait for sync across all devices
5190
5191       1. for each modified disk:
5192
5193         1. remove old LVs (which have the name name_replaces.<time_t>)
5194
5195     Failures are not very well handled.
5196
5197     """
5198     steps_total = 6
5199     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5200     instance = self.instance
5201     iv_names = {}
5202     vgname = self.cfg.GetVGName()
5203     # start of work
5204     cfg = self.cfg
5205     tgt_node = self.tgt_node
5206     oth_node = self.oth_node
5207
5208     # Step: check device activation
5209     self.proc.LogStep(1, steps_total, "check device existence")
5210     info("checking volume groups")
5211     my_vg = cfg.GetVGName()
5212     results = self.rpc.call_vg_list([oth_node, tgt_node])
5213     if not results:
5214       raise errors.OpExecError("Can't list volume groups on the nodes")
5215     for node in oth_node, tgt_node:
5216       res = results[node]
5217       res.Raise("Error checking node %s" % node)
5218       if my_vg not in res.payload:
5219         raise errors.OpExecError("Volume group '%s' not found on %s" %
5220                                  (my_vg, node))
5221     for idx, dev in enumerate(instance.disks):
5222       if idx not in self.op.disks:
5223         continue
5224       for node in tgt_node, oth_node:
5225         info("checking disk/%d on %s" % (idx, node))
5226         cfg.SetDiskID(dev, node)
5227         result = self.rpc.call_blockdev_find(node, dev)
5228         msg = result.fail_msg
5229         if not msg and not result.payload:
5230           msg = "disk not found"
5231         if msg:
5232           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5233                                    (idx, node, msg))
5234
5235     # Step: check other node consistency
5236     self.proc.LogStep(2, steps_total, "check peer consistency")
5237     for idx, dev in enumerate(instance.disks):
5238       if idx not in self.op.disks:
5239         continue
5240       info("checking disk/%d consistency on %s" % (idx, oth_node))
5241       if not _CheckDiskConsistency(self, dev, oth_node,
5242                                    oth_node==instance.primary_node):
5243         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5244                                  " to replace disks on this node (%s)" %
5245                                  (oth_node, tgt_node))
5246
5247     # Step: create new storage
5248     self.proc.LogStep(3, steps_total, "allocate new storage")
5249     for idx, dev in enumerate(instance.disks):
5250       if idx not in self.op.disks:
5251         continue
5252       size = dev.size
5253       cfg.SetDiskID(dev, tgt_node)
5254       lv_names = [".disk%d_%s" % (idx, suf)
5255                   for suf in ["data", "meta"]]
5256       names = _GenerateUniqueNames(self, lv_names)
5257       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5258                              logical_id=(vgname, names[0]))
5259       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5260                              logical_id=(vgname, names[1]))
5261       new_lvs = [lv_data, lv_meta]
5262       old_lvs = dev.children
5263       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5264       info("creating new local storage on %s for %s" %
5265            (tgt_node, dev.iv_name))
5266       # we pass force_create=True to force the LVM creation
5267       for new_lv in new_lvs:
5268         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5269                         _GetInstanceInfoText(instance), False)
5270
5271     # Step: for each lv, detach+rename*2+attach
5272     self.proc.LogStep(4, steps_total, "change drbd configuration")
5273     for dev, old_lvs, new_lvs in iv_names.itervalues():
5274       info("detaching %s drbd from local storage" % dev.iv_name)
5275       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5276       result.Raise("Can't detach drbd from local storage on node"
5277                    " %s for device %s" % (tgt_node, dev.iv_name))
5278       #dev.children = []
5279       #cfg.Update(instance)
5280
5281       # ok, we created the new LVs, so now we know we have the needed
5282       # storage; as such, we proceed on the target node to rename
5283       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5284       # using the assumption that logical_id == physical_id (which in
5285       # turn is the unique_id on that node)
5286
5287       # FIXME(iustin): use a better name for the replaced LVs
5288       temp_suffix = int(time.time())
5289       ren_fn = lambda d, suff: (d.physical_id[0],
5290                                 d.physical_id[1] + "_replaced-%s" % suff)
5291       # build the rename list based on what LVs exist on the node
5292       rlist = []
5293       for to_ren in old_lvs:
5294         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5295         if not result.fail_msg and result.payload:
5296           # device exists
5297           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5298
5299       info("renaming the old LVs on the target node")
5300       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5301       result.Raise("Can't rename old LVs on node %s" % tgt_node)
5302       # now we rename the new LVs to the old LVs
5303       info("renaming the new LVs on the target node")
5304       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5305       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5306       result.Raise("Can't rename new LVs on node %s" % tgt_node)
5307
5308       for old, new in zip(old_lvs, new_lvs):
5309         new.logical_id = old.logical_id
5310         cfg.SetDiskID(new, tgt_node)
5311
5312       for disk in old_lvs:
5313         disk.logical_id = ren_fn(disk, temp_suffix)
5314         cfg.SetDiskID(disk, tgt_node)
5315
5316       # now that the new lvs have the old name, we can add them to the device
5317       info("adding new mirror component on %s" % tgt_node)
5318       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5319       msg = result.fail_msg
5320       if msg:
5321         for new_lv in new_lvs:
5322           msg2 = self.rpc.call_blockdev_remove(tgt_node, new_lv).fail_msg
5323           if msg2:
5324             warning("Can't rollback device %s: %s", dev, msg2,
5325                     hint="cleanup manually the unused logical volumes")
5326         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5327
5328       dev.children = new_lvs
5329       cfg.Update(instance)
5330
5331     # Step: wait for sync
5332
5333     # this can fail as the old devices are degraded and _WaitForSync
5334     # does a combined result over all disks, so we don't check its
5335     # return value
5336     self.proc.LogStep(5, steps_total, "sync devices")
5337     _WaitForSync(self, instance, unlock=True)
5338
5339     # so check manually all the devices
5340     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5341       cfg.SetDiskID(dev, instance.primary_node)
5342       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5343       msg = result.fail_msg
5344       if not msg and not result.payload:
5345         msg = "disk not found"
5346       if msg:
5347         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5348                                  (name, msg))
5349       if result.payload[5]:
5350         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5351
5352     # Step: remove old storage
5353     self.proc.LogStep(6, steps_total, "removing old storage")
5354     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5355       info("remove logical volumes for %s" % name)
5356       for lv in old_lvs:
5357         cfg.SetDiskID(lv, tgt_node)
5358         msg = self.rpc.call_blockdev_remove(tgt_node, lv).fail_msg
5359         if msg:
5360           warning("Can't remove old LV: %s" % msg,
5361                   hint="manually remove unused LVs")
5362           continue
5363
5364   def _ExecD8Secondary(self, feedback_fn):
5365     """Replace the secondary node for drbd8.
5366
5367     The algorithm for replace is quite complicated:
5368       - for all disks of the instance:
5369         - create new LVs on the new node with same names
5370         - shutdown the drbd device on the old secondary
5371         - disconnect the drbd network on the primary
5372         - create the drbd device on the new secondary
5373         - network attach the drbd on the primary, using an artifice:
5374           the drbd code for Attach() will connect to the network if it
5375           finds a device which is connected to the good local disks but
5376           not network enabled
5377       - wait for sync across all devices
5378       - remove all disks from the old secondary
5379
5380     Failures are not very well handled.
5381
5382     """
5383     steps_total = 6
5384     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5385     instance = self.instance
5386     iv_names = {}
5387     # start of work
5388     cfg = self.cfg
5389     old_node = self.tgt_node
5390     new_node = self.new_node
5391     pri_node = instance.primary_node
5392     nodes_ip = {
5393       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5394       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5395       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5396       }
5397
5398     # Step: check device activation
5399     self.proc.LogStep(1, steps_total, "check device existence")
5400     info("checking volume groups")
5401     my_vg = cfg.GetVGName()
5402     results = self.rpc.call_vg_list([pri_node, new_node])
5403     for node in pri_node, new_node:
5404       res = results[node]
5405       res.Raise("Error checking node %s" % node)
5406       if my_vg not in res.payload:
5407         raise errors.OpExecError("Volume group '%s' not found on %s" %
5408                                  (my_vg, node))
5409     for idx, dev in enumerate(instance.disks):
5410       if idx not in self.op.disks:
5411         continue
5412       info("checking disk/%d on %s" % (idx, pri_node))
5413       cfg.SetDiskID(dev, pri_node)
5414       result = self.rpc.call_blockdev_find(pri_node, dev)
5415       msg = result.fail_msg
5416       if not msg and not result.payload:
5417         msg = "disk not found"
5418       if msg:
5419         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5420                                  (idx, pri_node, msg))
5421
5422     # Step: check other node consistency
5423     self.proc.LogStep(2, steps_total, "check peer consistency")
5424     for idx, dev in enumerate(instance.disks):
5425       if idx not in self.op.disks:
5426         continue
5427       info("checking disk/%d consistency on %s" % (idx, pri_node))
5428       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5429         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5430                                  " unsafe to replace the secondary" %
5431                                  pri_node)
5432
5433     # Step: create new storage
5434     self.proc.LogStep(3, steps_total, "allocate new storage")
5435     for idx, dev in enumerate(instance.disks):
5436       info("adding new local storage on %s for disk/%d" %
5437            (new_node, idx))
5438       # we pass force_create=True to force LVM creation
5439       for new_lv in dev.children:
5440         _CreateBlockDev(self, new_node, instance, new_lv, True,
5441                         _GetInstanceInfoText(instance), False)
5442
5443     # Step 4: dbrd minors and drbd setups changes
5444     # after this, we must manually remove the drbd minors on both the
5445     # error and the success paths
5446     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5447                                    instance.name)
5448     logging.debug("Allocated minors %s" % (minors,))
5449     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5450     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5451       size = dev.size
5452       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5453       # create new devices on new_node; note that we create two IDs:
5454       # one without port, so the drbd will be activated without
5455       # networking information on the new node at this stage, and one
5456       # with network, for the latter activation in step 4
5457       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5458       if pri_node == o_node1:
5459         p_minor = o_minor1
5460       else:
5461         p_minor = o_minor2
5462
5463       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5464       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5465
5466       iv_names[idx] = (dev, dev.children, new_net_id)
5467       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5468                     new_net_id)
5469       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5470                               logical_id=new_alone_id,
5471                               children=dev.children,
5472                               size=dev.size)
5473       try:
5474         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5475                               _GetInstanceInfoText(instance), False)
5476       except errors.GenericError:
5477         self.cfg.ReleaseDRBDMinors(instance.name)
5478         raise
5479
5480     for idx, dev in enumerate(instance.disks):
5481       # we have new devices, shutdown the drbd on the old secondary
5482       info("shutting down drbd for disk/%d on old node" % idx)
5483       cfg.SetDiskID(dev, old_node)
5484       msg = self.rpc.call_blockdev_shutdown(old_node, dev).fail_msg
5485       if msg:
5486         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5487                 (idx, msg),
5488                 hint="Please cleanup this device manually as soon as possible")
5489
5490     info("detaching primary drbds from the network (=> standalone)")
5491     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5492                                                instance.disks)[pri_node]
5493
5494     msg = result.fail_msg
5495     if msg:
5496       # detaches didn't succeed (unlikely)
5497       self.cfg.ReleaseDRBDMinors(instance.name)
5498       raise errors.OpExecError("Can't detach the disks from the network on"
5499                                " old node: %s" % (msg,))
5500
5501     # if we managed to detach at least one, we update all the disks of
5502     # the instance to point to the new secondary
5503     info("updating instance configuration")
5504     for dev, _, new_logical_id in iv_names.itervalues():
5505       dev.logical_id = new_logical_id
5506       cfg.SetDiskID(dev, pri_node)
5507     cfg.Update(instance)
5508
5509     # and now perform the drbd attach
5510     info("attaching primary drbds to new secondary (standalone => connected)")
5511     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5512                                            instance.disks, instance.name,
5513                                            False)
5514     for to_node, to_result in result.items():
5515       msg = to_result.fail_msg
5516       if msg:
5517         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5518                 hint="please do a gnt-instance info to see the"
5519                 " status of disks")
5520
5521     # this can fail as the old devices are degraded and _WaitForSync
5522     # does a combined result over all disks, so we don't check its
5523     # return value
5524     self.proc.LogStep(5, steps_total, "sync devices")
5525     _WaitForSync(self, instance, unlock=True)
5526
5527     # so check manually all the devices
5528     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5529       cfg.SetDiskID(dev, pri_node)
5530       result = self.rpc.call_blockdev_find(pri_node, dev)
5531       msg = result.fail_msg
5532       if not msg and not result.payload:
5533         msg = "disk not found"
5534       if msg:
5535         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5536                                  (idx, msg))
5537       if result.payload[5]:
5538         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5539
5540     self.proc.LogStep(6, steps_total, "removing old storage")
5541     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5542       info("remove logical volumes for disk/%d" % idx)
5543       for lv in old_lvs:
5544         cfg.SetDiskID(lv, old_node)
5545         msg = self.rpc.call_blockdev_remove(old_node, lv).fail_msg
5546         if msg:
5547           warning("Can't remove LV on old secondary: %s", msg,
5548                   hint="Cleanup stale volumes by hand")
5549
5550   def Exec(self, feedback_fn):
5551     """Execute disk replacement.
5552
5553     This dispatches the disk replacement to the appropriate handler.
5554
5555     """
5556     instance = self.instance
5557
5558     # Activate the instance disks if we're replacing them on a down instance
5559     if not instance.admin_up:
5560       _StartInstanceDisks(self, instance, True)
5561
5562     if self.op.mode == constants.REPLACE_DISK_CHG:
5563       fn = self._ExecD8Secondary
5564     else:
5565       fn = self._ExecD8DiskOnly
5566
5567     ret = fn(feedback_fn)
5568
5569     # Deactivate the instance disks if we're replacing them on a down instance
5570     if not instance.admin_up:
5571       _SafeShutdownInstanceDisks(self, instance)
5572
5573     return ret
5574
5575
5576 class LUGrowDisk(LogicalUnit):
5577   """Grow a disk of an instance.
5578
5579   """
5580   HPATH = "disk-grow"
5581   HTYPE = constants.HTYPE_INSTANCE
5582   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5583   REQ_BGL = False
5584
5585   def ExpandNames(self):
5586     self._ExpandAndLockInstance()
5587     self.needed_locks[locking.LEVEL_NODE] = []
5588     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5589
5590   def DeclareLocks(self, level):
5591     if level == locking.LEVEL_NODE:
5592       self._LockInstancesNodes()
5593
5594   def BuildHooksEnv(self):
5595     """Build hooks env.
5596
5597     This runs on the master, the primary and all the secondaries.
5598
5599     """
5600     env = {
5601       "DISK": self.op.disk,
5602       "AMOUNT": self.op.amount,
5603       }
5604     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5605     nl = [
5606       self.cfg.GetMasterNode(),
5607       self.instance.primary_node,
5608       ]
5609     return env, nl, nl
5610
5611   def CheckPrereq(self):
5612     """Check prerequisites.
5613
5614     This checks that the instance is in the cluster.
5615
5616     """
5617     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5618     assert instance is not None, \
5619       "Cannot retrieve locked instance %s" % self.op.instance_name
5620     nodenames = list(instance.all_nodes)
5621     for node in nodenames:
5622       _CheckNodeOnline(self, node)
5623
5624
5625     self.instance = instance
5626
5627     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5628       raise errors.OpPrereqError("Instance's disk layout does not support"
5629                                  " growing.")
5630
5631     self.disk = instance.FindDisk(self.op.disk)
5632
5633     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5634                                        instance.hypervisor)
5635     for node in nodenames:
5636       info = nodeinfo[node]
5637       info.Raise("Cannot get current information from node %s" % node)
5638       vg_free = info.payload.get('vg_free', None)
5639       if not isinstance(vg_free, int):
5640         raise errors.OpPrereqError("Can't compute free disk space on"
5641                                    " node %s" % node)
5642       if self.op.amount > vg_free:
5643         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5644                                    " %d MiB available, %d MiB required" %
5645                                    (node, vg_free, self.op.amount))
5646
5647   def Exec(self, feedback_fn):
5648     """Execute disk grow.
5649
5650     """
5651     instance = self.instance
5652     disk = self.disk
5653     for node in instance.all_nodes:
5654       self.cfg.SetDiskID(disk, node)
5655       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5656       result.Raise("Grow request failed to node %s" % node)
5657     disk.RecordGrow(self.op.amount)
5658     self.cfg.Update(instance)
5659     if self.op.wait_for_sync:
5660       disk_abort = not _WaitForSync(self, instance)
5661       if disk_abort:
5662         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5663                              " status.\nPlease check the instance.")
5664
5665
5666 class LUQueryInstanceData(NoHooksLU):
5667   """Query runtime instance data.
5668
5669   """
5670   _OP_REQP = ["instances", "static"]
5671   REQ_BGL = False
5672
5673   def ExpandNames(self):
5674     self.needed_locks = {}
5675     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5676
5677     if not isinstance(self.op.instances, list):
5678       raise errors.OpPrereqError("Invalid argument type 'instances'")
5679
5680     if self.op.instances:
5681       self.wanted_names = []
5682       for name in self.op.instances:
5683         full_name = self.cfg.ExpandInstanceName(name)
5684         if full_name is None:
5685           raise errors.OpPrereqError("Instance '%s' not known" % name)
5686         self.wanted_names.append(full_name)
5687       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5688     else:
5689       self.wanted_names = None
5690       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5691
5692     self.needed_locks[locking.LEVEL_NODE] = []
5693     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5694
5695   def DeclareLocks(self, level):
5696     if level == locking.LEVEL_NODE:
5697       self._LockInstancesNodes()
5698
5699   def CheckPrereq(self):
5700     """Check prerequisites.
5701
5702     This only checks the optional instance list against the existing names.
5703
5704     """
5705     if self.wanted_names is None:
5706       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5707
5708     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5709                              in self.wanted_names]
5710     return
5711
5712   def _ComputeDiskStatus(self, instance, snode, dev):
5713     """Compute block device status.
5714
5715     """
5716     static = self.op.static
5717     if not static:
5718       self.cfg.SetDiskID(dev, instance.primary_node)
5719       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5720       if dev_pstatus.offline:
5721         dev_pstatus = None
5722       else:
5723         dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
5724         dev_pstatus = dev_pstatus.payload
5725     else:
5726       dev_pstatus = None
5727
5728     if dev.dev_type in constants.LDS_DRBD:
5729       # we change the snode then (otherwise we use the one passed in)
5730       if dev.logical_id[0] == instance.primary_node:
5731         snode = dev.logical_id[1]
5732       else:
5733         snode = dev.logical_id[0]
5734
5735     if snode and not static:
5736       self.cfg.SetDiskID(dev, snode)
5737       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5738       if dev_sstatus.offline:
5739         dev_sstatus = None
5740       else:
5741         dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
5742         dev_sstatus = dev_sstatus.payload
5743     else:
5744       dev_sstatus = None
5745
5746     if dev.children:
5747       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5748                       for child in dev.children]
5749     else:
5750       dev_children = []
5751
5752     data = {
5753       "iv_name": dev.iv_name,
5754       "dev_type": dev.dev_type,
5755       "logical_id": dev.logical_id,
5756       "physical_id": dev.physical_id,
5757       "pstatus": dev_pstatus,
5758       "sstatus": dev_sstatus,
5759       "children": dev_children,
5760       "mode": dev.mode,
5761       }
5762
5763     return data
5764
5765   def Exec(self, feedback_fn):
5766     """Gather and return data"""
5767     result = {}
5768
5769     cluster = self.cfg.GetClusterInfo()
5770
5771     for instance in self.wanted_instances:
5772       if not self.op.static:
5773         remote_info = self.rpc.call_instance_info(instance.primary_node,
5774                                                   instance.name,
5775                                                   instance.hypervisor)
5776         remote_info.Raise("Error checking node %s" % instance.primary_node)
5777         remote_info = remote_info.payload
5778         if remote_info and "state" in remote_info:
5779           remote_state = "up"
5780         else:
5781           remote_state = "down"
5782       else:
5783         remote_state = None
5784       if instance.admin_up:
5785         config_state = "up"
5786       else:
5787         config_state = "down"
5788
5789       disks = [self._ComputeDiskStatus(instance, None, device)
5790                for device in instance.disks]
5791
5792       idict = {
5793         "name": instance.name,
5794         "config_state": config_state,
5795         "run_state": remote_state,
5796         "pnode": instance.primary_node,
5797         "snodes": instance.secondary_nodes,
5798         "os": instance.os,
5799         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5800         "disks": disks,
5801         "hypervisor": instance.hypervisor,
5802         "network_port": instance.network_port,
5803         "hv_instance": instance.hvparams,
5804         "hv_actual": cluster.FillHV(instance),
5805         "be_instance": instance.beparams,
5806         "be_actual": cluster.FillBE(instance),
5807         }
5808
5809       result[instance.name] = idict
5810
5811     return result
5812
5813
5814 class LUSetInstanceParams(LogicalUnit):
5815   """Modifies an instances's parameters.
5816
5817   """
5818   HPATH = "instance-modify"
5819   HTYPE = constants.HTYPE_INSTANCE
5820   _OP_REQP = ["instance_name"]
5821   REQ_BGL = False
5822
5823   def CheckArguments(self):
5824     if not hasattr(self.op, 'nics'):
5825       self.op.nics = []
5826     if not hasattr(self.op, 'disks'):
5827       self.op.disks = []
5828     if not hasattr(self.op, 'beparams'):
5829       self.op.beparams = {}
5830     if not hasattr(self.op, 'hvparams'):
5831       self.op.hvparams = {}
5832     self.op.force = getattr(self.op, "force", False)
5833     if not (self.op.nics or self.op.disks or
5834             self.op.hvparams or self.op.beparams):
5835       raise errors.OpPrereqError("No changes submitted")
5836
5837     # Disk validation
5838     disk_addremove = 0
5839     for disk_op, disk_dict in self.op.disks:
5840       if disk_op == constants.DDM_REMOVE:
5841         disk_addremove += 1
5842         continue
5843       elif disk_op == constants.DDM_ADD:
5844         disk_addremove += 1
5845       else:
5846         if not isinstance(disk_op, int):
5847           raise errors.OpPrereqError("Invalid disk index")
5848       if disk_op == constants.DDM_ADD:
5849         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5850         if mode not in constants.DISK_ACCESS_SET:
5851           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5852         size = disk_dict.get('size', None)
5853         if size is None:
5854           raise errors.OpPrereqError("Required disk parameter size missing")
5855         try:
5856           size = int(size)
5857         except ValueError, err:
5858           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5859                                      str(err))
5860         disk_dict['size'] = size
5861       else:
5862         # modification of disk
5863         if 'size' in disk_dict:
5864           raise errors.OpPrereqError("Disk size change not possible, use"
5865                                      " grow-disk")
5866
5867     if disk_addremove > 1:
5868       raise errors.OpPrereqError("Only one disk add or remove operation"
5869                                  " supported at a time")
5870
5871     # NIC validation
5872     nic_addremove = 0
5873     for nic_op, nic_dict in self.op.nics:
5874       if nic_op == constants.DDM_REMOVE:
5875         nic_addremove += 1
5876         continue
5877       elif nic_op == constants.DDM_ADD:
5878         nic_addremove += 1
5879       else:
5880         if not isinstance(nic_op, int):
5881           raise errors.OpPrereqError("Invalid nic index")
5882
5883       # nic_dict should be a dict
5884       nic_ip = nic_dict.get('ip', None)
5885       if nic_ip is not None:
5886         if nic_ip.lower() == constants.VALUE_NONE:
5887           nic_dict['ip'] = None
5888         else:
5889           if not utils.IsValidIP(nic_ip):
5890             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5891
5892       nic_bridge = nic_dict.get('bridge', None)
5893       nic_link = nic_dict.get('link', None)
5894       if nic_bridge and nic_link:
5895         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5896       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5897         nic_dict['bridge'] = None
5898       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5899         nic_dict['link'] = None
5900
5901       if nic_op == constants.DDM_ADD:
5902         nic_mac = nic_dict.get('mac', None)
5903         if nic_mac is None:
5904           nic_dict['mac'] = constants.VALUE_AUTO
5905
5906       if 'mac' in nic_dict:
5907         nic_mac = nic_dict['mac']
5908         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5909           if not utils.IsValidMac(nic_mac):
5910             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5911         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5912           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5913                                      " modifying an existing nic")
5914
5915     if nic_addremove > 1:
5916       raise errors.OpPrereqError("Only one NIC add or remove operation"
5917                                  " supported at a time")
5918
5919   def ExpandNames(self):
5920     self._ExpandAndLockInstance()
5921     self.needed_locks[locking.LEVEL_NODE] = []
5922     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5923
5924   def DeclareLocks(self, level):
5925     if level == locking.LEVEL_NODE:
5926       self._LockInstancesNodes()
5927
5928   def BuildHooksEnv(self):
5929     """Build hooks env.
5930
5931     This runs on the master, primary and secondaries.
5932
5933     """
5934     args = dict()
5935     if constants.BE_MEMORY in self.be_new:
5936       args['memory'] = self.be_new[constants.BE_MEMORY]
5937     if constants.BE_VCPUS in self.be_new:
5938       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5939     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5940     # information at all.
5941     if self.op.nics:
5942       args['nics'] = []
5943       nic_override = dict(self.op.nics)
5944       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
5945       for idx, nic in enumerate(self.instance.nics):
5946         if idx in nic_override:
5947           this_nic_override = nic_override[idx]
5948         else:
5949           this_nic_override = {}
5950         if 'ip' in this_nic_override:
5951           ip = this_nic_override['ip']
5952         else:
5953           ip = nic.ip
5954         if 'mac' in this_nic_override:
5955           mac = this_nic_override['mac']
5956         else:
5957           mac = nic.mac
5958         if idx in self.nic_pnew:
5959           nicparams = self.nic_pnew[idx]
5960         else:
5961           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
5962         mode = nicparams[constants.NIC_MODE]
5963         link = nicparams[constants.NIC_LINK]
5964         args['nics'].append((ip, mac, mode, link))
5965       if constants.DDM_ADD in nic_override:
5966         ip = nic_override[constants.DDM_ADD].get('ip', None)
5967         mac = nic_override[constants.DDM_ADD]['mac']
5968         nicparams = self.nic_pnew[constants.DDM_ADD]
5969         mode = nicparams[constants.NIC_MODE]
5970         link = nicparams[constants.NIC_LINK]
5971         args['nics'].append((ip, mac, mode, link))
5972       elif constants.DDM_REMOVE in nic_override:
5973         del args['nics'][-1]
5974
5975     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5976     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5977     return env, nl, nl
5978
5979   def _GetUpdatedParams(self, old_params, update_dict,
5980                         default_values, parameter_types):
5981     """Return the new params dict for the given params.
5982
5983     @type old_params: dict
5984     @type old_params: old parameters
5985     @type update_dict: dict
5986     @type update_dict: dict containing new parameter values,
5987                        or constants.VALUE_DEFAULT to reset the
5988                        parameter to its default value
5989     @type default_values: dict
5990     @param default_values: default values for the filled parameters
5991     @type parameter_types: dict
5992     @param parameter_types: dict mapping target dict keys to types
5993                             in constants.ENFORCEABLE_TYPES
5994     @rtype: (dict, dict)
5995     @return: (new_parameters, filled_parameters)
5996
5997     """
5998     params_copy = copy.deepcopy(old_params)
5999     for key, val in update_dict.iteritems():
6000       if val == constants.VALUE_DEFAULT:
6001         try:
6002           del params_copy[key]
6003         except KeyError:
6004           pass
6005       else:
6006         params_copy[key] = val
6007     utils.ForceDictType(params_copy, parameter_types)
6008     params_filled = objects.FillDict(default_values, params_copy)
6009     return (params_copy, params_filled)
6010
6011   def CheckPrereq(self):
6012     """Check prerequisites.
6013
6014     This only checks the instance list against the existing names.
6015
6016     """
6017     force = self.force = self.op.force
6018
6019     # checking the new params on the primary/secondary nodes
6020
6021     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6022     cluster = self.cluster = self.cfg.GetClusterInfo()
6023     assert self.instance is not None, \
6024       "Cannot retrieve locked instance %s" % self.op.instance_name
6025     pnode = instance.primary_node
6026     nodelist = list(instance.all_nodes)
6027
6028     # hvparams processing
6029     if self.op.hvparams:
6030       i_hvdict, hv_new = self._GetUpdatedParams(
6031                              instance.hvparams, self.op.hvparams,
6032                              cluster.hvparams[instance.hypervisor],
6033                              constants.HVS_PARAMETER_TYPES)
6034       # local check
6035       hypervisor.GetHypervisor(
6036         instance.hypervisor).CheckParameterSyntax(hv_new)
6037       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6038       self.hv_new = hv_new # the new actual values
6039       self.hv_inst = i_hvdict # the new dict (without defaults)
6040     else:
6041       self.hv_new = self.hv_inst = {}
6042
6043     # beparams processing
6044     if self.op.beparams:
6045       i_bedict, be_new = self._GetUpdatedParams(
6046                              instance.beparams, self.op.beparams,
6047                              cluster.beparams[constants.PP_DEFAULT],
6048                              constants.BES_PARAMETER_TYPES)
6049       self.be_new = be_new # the new actual values
6050       self.be_inst = i_bedict # the new dict (without defaults)
6051     else:
6052       self.be_new = self.be_inst = {}
6053
6054     self.warn = []
6055
6056     if constants.BE_MEMORY in self.op.beparams and not self.force:
6057       mem_check_list = [pnode]
6058       if be_new[constants.BE_AUTO_BALANCE]:
6059         # either we changed auto_balance to yes or it was from before
6060         mem_check_list.extend(instance.secondary_nodes)
6061       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6062                                                   instance.hypervisor)
6063       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6064                                          instance.hypervisor)
6065       pninfo = nodeinfo[pnode]
6066       msg = pninfo.fail_msg
6067       if msg:
6068         # Assume the primary node is unreachable and go ahead
6069         self.warn.append("Can't get info from primary node %s: %s" %
6070                          (pnode,  msg))
6071       elif not isinstance(pninfo.payload.get('memory_free', None), int):
6072         self.warn.append("Node data from primary node %s doesn't contain"
6073                          " free memory information" % pnode)
6074       elif instance_info.fail_msg:
6075         self.warn.append("Can't get instance runtime information: %s" %
6076                         instance_info.fail_msg)
6077       else:
6078         if instance_info.payload:
6079           current_mem = int(instance_info.payload['memory'])
6080         else:
6081           # Assume instance not running
6082           # (there is a slight race condition here, but it's not very probable,
6083           # and we have no other way to check)
6084           current_mem = 0
6085         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6086                     pninfo.payload['memory_free'])
6087         if miss_mem > 0:
6088           raise errors.OpPrereqError("This change will prevent the instance"
6089                                      " from starting, due to %d MB of memory"
6090                                      " missing on its primary node" % miss_mem)
6091
6092       if be_new[constants.BE_AUTO_BALANCE]:
6093         for node, nres in nodeinfo.items():
6094           if node not in instance.secondary_nodes:
6095             continue
6096           msg = nres.fail_msg
6097           if msg:
6098             self.warn.append("Can't get info from secondary node %s: %s" %
6099                              (node, msg))
6100           elif not isinstance(nres.payload.get('memory_free', None), int):
6101             self.warn.append("Secondary node %s didn't return free"
6102                              " memory information" % node)
6103           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6104             self.warn.append("Not enough memory to failover instance to"
6105                              " secondary node %s" % node)
6106
6107     # NIC processing
6108     self.nic_pnew = {}
6109     self.nic_pinst = {}
6110     for nic_op, nic_dict in self.op.nics:
6111       if nic_op == constants.DDM_REMOVE:
6112         if not instance.nics:
6113           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6114         continue
6115       if nic_op != constants.DDM_ADD:
6116         # an existing nic
6117         if nic_op < 0 or nic_op >= len(instance.nics):
6118           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6119                                      " are 0 to %d" %
6120                                      (nic_op, len(instance.nics)))
6121         old_nic_params = instance.nics[nic_op].nicparams
6122         old_nic_ip = instance.nics[nic_op].ip
6123       else:
6124         old_nic_params = {}
6125         old_nic_ip = None
6126
6127       update_params_dict = dict([(key, nic_dict[key])
6128                                  for key in constants.NICS_PARAMETERS
6129                                  if key in nic_dict])
6130
6131       if 'bridge' in nic_dict:
6132         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6133
6134       new_nic_params, new_filled_nic_params = \
6135           self._GetUpdatedParams(old_nic_params, update_params_dict,
6136                                  cluster.nicparams[constants.PP_DEFAULT],
6137                                  constants.NICS_PARAMETER_TYPES)
6138       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6139       self.nic_pinst[nic_op] = new_nic_params
6140       self.nic_pnew[nic_op] = new_filled_nic_params
6141       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6142
6143       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6144         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6145         msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6146         if msg:
6147           msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6148           if self.force:
6149             self.warn.append(msg)
6150           else:
6151             raise errors.OpPrereqError(msg)
6152       if new_nic_mode == constants.NIC_MODE_ROUTED:
6153         if 'ip' in nic_dict:
6154           nic_ip = nic_dict['ip']
6155         else:
6156           nic_ip = old_nic_ip
6157         if nic_ip is None:
6158           raise errors.OpPrereqError('Cannot set the nic ip to None'
6159                                      ' on a routed nic')
6160       if 'mac' in nic_dict:
6161         nic_mac = nic_dict['mac']
6162         if nic_mac is None:
6163           raise errors.OpPrereqError('Cannot set the nic mac to None')
6164         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6165           # otherwise generate the mac
6166           nic_dict['mac'] = self.cfg.GenerateMAC()
6167         else:
6168           # or validate/reserve the current one
6169           if self.cfg.IsMacInUse(nic_mac):
6170             raise errors.OpPrereqError("MAC address %s already in use"
6171                                        " in cluster" % nic_mac)
6172
6173     # DISK processing
6174     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6175       raise errors.OpPrereqError("Disk operations not supported for"
6176                                  " diskless instances")
6177     for disk_op, disk_dict in self.op.disks:
6178       if disk_op == constants.DDM_REMOVE:
6179         if len(instance.disks) == 1:
6180           raise errors.OpPrereqError("Cannot remove the last disk of"
6181                                      " an instance")
6182         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6183         ins_l = ins_l[pnode]
6184         msg = ins_l.fail_msg
6185         if msg:
6186           raise errors.OpPrereqError("Can't contact node %s: %s" %
6187                                      (pnode, msg))
6188         if instance.name in ins_l.payload:
6189           raise errors.OpPrereqError("Instance is running, can't remove"
6190                                      " disks.")
6191
6192       if (disk_op == constants.DDM_ADD and
6193           len(instance.nics) >= constants.MAX_DISKS):
6194         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6195                                    " add more" % constants.MAX_DISKS)
6196       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6197         # an existing disk
6198         if disk_op < 0 or disk_op >= len(instance.disks):
6199           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6200                                      " are 0 to %d" %
6201                                      (disk_op, len(instance.disks)))
6202
6203     return
6204
6205   def Exec(self, feedback_fn):
6206     """Modifies an instance.
6207
6208     All parameters take effect only at the next restart of the instance.
6209
6210     """
6211     # Process here the warnings from CheckPrereq, as we don't have a
6212     # feedback_fn there.
6213     for warn in self.warn:
6214       feedback_fn("WARNING: %s" % warn)
6215
6216     result = []
6217     instance = self.instance
6218     cluster = self.cluster
6219     # disk changes
6220     for disk_op, disk_dict in self.op.disks:
6221       if disk_op == constants.DDM_REMOVE:
6222         # remove the last disk
6223         device = instance.disks.pop()
6224         device_idx = len(instance.disks)
6225         for node, disk in device.ComputeNodeTree(instance.primary_node):
6226           self.cfg.SetDiskID(disk, node)
6227           msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6228           if msg:
6229             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6230                             " continuing anyway", device_idx, node, msg)
6231         result.append(("disk/%d" % device_idx, "remove"))
6232       elif disk_op == constants.DDM_ADD:
6233         # add a new disk
6234         if instance.disk_template == constants.DT_FILE:
6235           file_driver, file_path = instance.disks[0].logical_id
6236           file_path = os.path.dirname(file_path)
6237         else:
6238           file_driver = file_path = None
6239         disk_idx_base = len(instance.disks)
6240         new_disk = _GenerateDiskTemplate(self,
6241                                          instance.disk_template,
6242                                          instance.name, instance.primary_node,
6243                                          instance.secondary_nodes,
6244                                          [disk_dict],
6245                                          file_path,
6246                                          file_driver,
6247                                          disk_idx_base)[0]
6248         instance.disks.append(new_disk)
6249         info = _GetInstanceInfoText(instance)
6250
6251         logging.info("Creating volume %s for instance %s",
6252                      new_disk.iv_name, instance.name)
6253         # Note: this needs to be kept in sync with _CreateDisks
6254         #HARDCODE
6255         for node in instance.all_nodes:
6256           f_create = node == instance.primary_node
6257           try:
6258             _CreateBlockDev(self, node, instance, new_disk,
6259                             f_create, info, f_create)
6260           except errors.OpExecError, err:
6261             self.LogWarning("Failed to create volume %s (%s) on"
6262                             " node %s: %s",
6263                             new_disk.iv_name, new_disk, node, err)
6264         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6265                        (new_disk.size, new_disk.mode)))
6266       else:
6267         # change a given disk
6268         instance.disks[disk_op].mode = disk_dict['mode']
6269         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6270     # NIC changes
6271     for nic_op, nic_dict in self.op.nics:
6272       if nic_op == constants.DDM_REMOVE:
6273         # remove the last nic
6274         del instance.nics[-1]
6275         result.append(("nic.%d" % len(instance.nics), "remove"))
6276       elif nic_op == constants.DDM_ADD:
6277         # mac and bridge should be set, by now
6278         mac = nic_dict['mac']
6279         ip = nic_dict.get('ip', None)
6280         nicparams = self.nic_pinst[constants.DDM_ADD]
6281         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6282         instance.nics.append(new_nic)
6283         result.append(("nic.%d" % (len(instance.nics) - 1),
6284                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6285                        (new_nic.mac, new_nic.ip,
6286                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6287                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6288                        )))
6289       else:
6290         for key in 'mac', 'ip':
6291           if key in nic_dict:
6292             setattr(instance.nics[nic_op], key, nic_dict[key])
6293         if nic_op in self.nic_pnew:
6294           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6295         for key, val in nic_dict.iteritems():
6296           result.append(("nic.%s/%d" % (key, nic_op), val))
6297
6298     # hvparams changes
6299     if self.op.hvparams:
6300       instance.hvparams = self.hv_inst
6301       for key, val in self.op.hvparams.iteritems():
6302         result.append(("hv/%s" % key, val))
6303
6304     # beparams changes
6305     if self.op.beparams:
6306       instance.beparams = self.be_inst
6307       for key, val in self.op.beparams.iteritems():
6308         result.append(("be/%s" % key, val))
6309
6310     self.cfg.Update(instance)
6311
6312     return result
6313
6314
6315 class LUQueryExports(NoHooksLU):
6316   """Query the exports list
6317
6318   """
6319   _OP_REQP = ['nodes']
6320   REQ_BGL = False
6321
6322   def ExpandNames(self):
6323     self.needed_locks = {}
6324     self.share_locks[locking.LEVEL_NODE] = 1
6325     if not self.op.nodes:
6326       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6327     else:
6328       self.needed_locks[locking.LEVEL_NODE] = \
6329         _GetWantedNodes(self, self.op.nodes)
6330
6331   def CheckPrereq(self):
6332     """Check prerequisites.
6333
6334     """
6335     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6336
6337   def Exec(self, feedback_fn):
6338     """Compute the list of all the exported system images.
6339
6340     @rtype: dict
6341     @return: a dictionary with the structure node->(export-list)
6342         where export-list is a list of the instances exported on
6343         that node.
6344
6345     """
6346     rpcresult = self.rpc.call_export_list(self.nodes)
6347     result = {}
6348     for node in rpcresult:
6349       if rpcresult[node].fail_msg:
6350         result[node] = False
6351       else:
6352         result[node] = rpcresult[node].payload
6353
6354     return result
6355
6356
6357 class LUExportInstance(LogicalUnit):
6358   """Export an instance to an image in the cluster.
6359
6360   """
6361   HPATH = "instance-export"
6362   HTYPE = constants.HTYPE_INSTANCE
6363   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6364   REQ_BGL = False
6365
6366   def ExpandNames(self):
6367     self._ExpandAndLockInstance()
6368     # FIXME: lock only instance primary and destination node
6369     #
6370     # Sad but true, for now we have do lock all nodes, as we don't know where
6371     # the previous export might be, and and in this LU we search for it and
6372     # remove it from its current node. In the future we could fix this by:
6373     #  - making a tasklet to search (share-lock all), then create the new one,
6374     #    then one to remove, after
6375     #  - removing the removal operation altoghether
6376     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6377
6378   def DeclareLocks(self, level):
6379     """Last minute lock declaration."""
6380     # All nodes are locked anyway, so nothing to do here.
6381
6382   def BuildHooksEnv(self):
6383     """Build hooks env.
6384
6385     This will run on the master, primary node and target node.
6386
6387     """
6388     env = {
6389       "EXPORT_NODE": self.op.target_node,
6390       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6391       }
6392     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6393     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6394           self.op.target_node]
6395     return env, nl, nl
6396
6397   def CheckPrereq(self):
6398     """Check prerequisites.
6399
6400     This checks that the instance and node names are valid.
6401
6402     """
6403     instance_name = self.op.instance_name
6404     self.instance = self.cfg.GetInstanceInfo(instance_name)
6405     assert self.instance is not None, \
6406           "Cannot retrieve locked instance %s" % self.op.instance_name
6407     _CheckNodeOnline(self, self.instance.primary_node)
6408
6409     self.dst_node = self.cfg.GetNodeInfo(
6410       self.cfg.ExpandNodeName(self.op.target_node))
6411
6412     if self.dst_node is None:
6413       # This is wrong node name, not a non-locked node
6414       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6415     _CheckNodeOnline(self, self.dst_node.name)
6416     _CheckNodeNotDrained(self, self.dst_node.name)
6417
6418     # instance disk type verification
6419     for disk in self.instance.disks:
6420       if disk.dev_type == constants.LD_FILE:
6421         raise errors.OpPrereqError("Export not supported for instances with"
6422                                    " file-based disks")
6423
6424   def Exec(self, feedback_fn):
6425     """Export an instance to an image in the cluster.
6426
6427     """
6428     instance = self.instance
6429     dst_node = self.dst_node
6430     src_node = instance.primary_node
6431     if self.op.shutdown:
6432       # shutdown the instance, but not the disks
6433       result = self.rpc.call_instance_shutdown(src_node, instance)
6434       result.Raise("Could not shutdown instance %s on"
6435                    " node %s" % (instance.name, src_node))
6436
6437     vgname = self.cfg.GetVGName()
6438
6439     snap_disks = []
6440
6441     # set the disks ID correctly since call_instance_start needs the
6442     # correct drbd minor to create the symlinks
6443     for disk in instance.disks:
6444       self.cfg.SetDiskID(disk, src_node)
6445
6446     try:
6447       for idx, disk in enumerate(instance.disks):
6448         # result.payload will be a snapshot of an lvm leaf of the one we passed
6449         result = self.rpc.call_blockdev_snapshot(src_node, disk)
6450         msg = result.fail_msg
6451         if msg:
6452           self.LogWarning("Could not snapshot disk/%s on node %s: %s",
6453                           idx, src_node, msg)
6454           snap_disks.append(False)
6455         else:
6456           disk_id = (vgname, result.payload)
6457           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6458                                  logical_id=disk_id, physical_id=disk_id,
6459                                  iv_name=disk.iv_name)
6460           snap_disks.append(new_dev)
6461
6462     finally:
6463       if self.op.shutdown and instance.admin_up:
6464         result = self.rpc.call_instance_start(src_node, instance, None, None)
6465         msg = result.fail_msg
6466         if msg:
6467           _ShutdownInstanceDisks(self, instance)
6468           raise errors.OpExecError("Could not start instance: %s" % msg)
6469
6470     # TODO: check for size
6471
6472     cluster_name = self.cfg.GetClusterName()
6473     for idx, dev in enumerate(snap_disks):
6474       if dev:
6475         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6476                                                instance, cluster_name, idx)
6477         msg = result.fail_msg
6478         if msg:
6479           self.LogWarning("Could not export disk/%s from node %s to"
6480                           " node %s: %s", idx, src_node, dst_node.name, msg)
6481         msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
6482         if msg:
6483           self.LogWarning("Could not remove snapshot for disk/%d from node"
6484                           " %s: %s", idx, src_node, msg)
6485
6486     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6487     msg = result.fail_msg
6488     if msg:
6489       self.LogWarning("Could not finalize export for instance %s"
6490                       " on node %s: %s", instance.name, dst_node.name, msg)
6491
6492     nodelist = self.cfg.GetNodeList()
6493     nodelist.remove(dst_node.name)
6494
6495     # on one-node clusters nodelist will be empty after the removal
6496     # if we proceed the backup would be removed because OpQueryExports
6497     # substitutes an empty list with the full cluster node list.
6498     iname = instance.name
6499     if nodelist:
6500       exportlist = self.rpc.call_export_list(nodelist)
6501       for node in exportlist:
6502         if exportlist[node].fail_msg:
6503           continue
6504         if iname in exportlist[node].payload:
6505           msg = self.rpc.call_export_remove(node, iname).fail_msg
6506           if msg:
6507             self.LogWarning("Could not remove older export for instance %s"
6508                             " on node %s: %s", iname, node, msg)
6509
6510
6511 class LURemoveExport(NoHooksLU):
6512   """Remove exports related to the named instance.
6513
6514   """
6515   _OP_REQP = ["instance_name"]
6516   REQ_BGL = False
6517
6518   def ExpandNames(self):
6519     self.needed_locks = {}
6520     # We need all nodes to be locked in order for RemoveExport to work, but we
6521     # don't need to lock the instance itself, as nothing will happen to it (and
6522     # we can remove exports also for a removed instance)
6523     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6524
6525   def CheckPrereq(self):
6526     """Check prerequisites.
6527     """
6528     pass
6529
6530   def Exec(self, feedback_fn):
6531     """Remove any export.
6532
6533     """
6534     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6535     # If the instance was not found we'll try with the name that was passed in.
6536     # This will only work if it was an FQDN, though.
6537     fqdn_warn = False
6538     if not instance_name:
6539       fqdn_warn = True
6540       instance_name = self.op.instance_name
6541
6542     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6543     exportlist = self.rpc.call_export_list(locked_nodes)
6544     found = False
6545     for node in exportlist:
6546       msg = exportlist[node].fail_msg
6547       if msg:
6548         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6549         continue
6550       if instance_name in exportlist[node].payload:
6551         found = True
6552         result = self.rpc.call_export_remove(node, instance_name)
6553         msg = result.fail_msg
6554         if msg:
6555           logging.error("Could not remove export for instance %s"
6556                         " on node %s: %s", instance_name, node, msg)
6557
6558     if fqdn_warn and not found:
6559       feedback_fn("Export not found. If trying to remove an export belonging"
6560                   " to a deleted instance please use its Fully Qualified"
6561                   " Domain Name.")
6562
6563
6564 class TagsLU(NoHooksLU):
6565   """Generic tags LU.
6566
6567   This is an abstract class which is the parent of all the other tags LUs.
6568
6569   """
6570
6571   def ExpandNames(self):
6572     self.needed_locks = {}
6573     if self.op.kind == constants.TAG_NODE:
6574       name = self.cfg.ExpandNodeName(self.op.name)
6575       if name is None:
6576         raise errors.OpPrereqError("Invalid node name (%s)" %
6577                                    (self.op.name,))
6578       self.op.name = name
6579       self.needed_locks[locking.LEVEL_NODE] = name
6580     elif self.op.kind == constants.TAG_INSTANCE:
6581       name = self.cfg.ExpandInstanceName(self.op.name)
6582       if name is None:
6583         raise errors.OpPrereqError("Invalid instance name (%s)" %
6584                                    (self.op.name,))
6585       self.op.name = name
6586       self.needed_locks[locking.LEVEL_INSTANCE] = name
6587
6588   def CheckPrereq(self):
6589     """Check prerequisites.
6590
6591     """
6592     if self.op.kind == constants.TAG_CLUSTER:
6593       self.target = self.cfg.GetClusterInfo()
6594     elif self.op.kind == constants.TAG_NODE:
6595       self.target = self.cfg.GetNodeInfo(self.op.name)
6596     elif self.op.kind == constants.TAG_INSTANCE:
6597       self.target = self.cfg.GetInstanceInfo(self.op.name)
6598     else:
6599       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6600                                  str(self.op.kind))
6601
6602
6603 class LUGetTags(TagsLU):
6604   """Returns the tags of a given object.
6605
6606   """
6607   _OP_REQP = ["kind", "name"]
6608   REQ_BGL = False
6609
6610   def Exec(self, feedback_fn):
6611     """Returns the tag list.
6612
6613     """
6614     return list(self.target.GetTags())
6615
6616
6617 class LUSearchTags(NoHooksLU):
6618   """Searches the tags for a given pattern.
6619
6620   """
6621   _OP_REQP = ["pattern"]
6622   REQ_BGL = False
6623
6624   def ExpandNames(self):
6625     self.needed_locks = {}
6626
6627   def CheckPrereq(self):
6628     """Check prerequisites.
6629
6630     This checks the pattern passed for validity by compiling it.
6631
6632     """
6633     try:
6634       self.re = re.compile(self.op.pattern)
6635     except re.error, err:
6636       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6637                                  (self.op.pattern, err))
6638
6639   def Exec(self, feedback_fn):
6640     """Returns the tag list.
6641
6642     """
6643     cfg = self.cfg
6644     tgts = [("/cluster", cfg.GetClusterInfo())]
6645     ilist = cfg.GetAllInstancesInfo().values()
6646     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6647     nlist = cfg.GetAllNodesInfo().values()
6648     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6649     results = []
6650     for path, target in tgts:
6651       for tag in target.GetTags():
6652         if self.re.search(tag):
6653           results.append((path, tag))
6654     return results
6655
6656
6657 class LUAddTags(TagsLU):
6658   """Sets a tag on a given object.
6659
6660   """
6661   _OP_REQP = ["kind", "name", "tags"]
6662   REQ_BGL = False
6663
6664   def CheckPrereq(self):
6665     """Check prerequisites.
6666
6667     This checks the type and length of the tag name and value.
6668
6669     """
6670     TagsLU.CheckPrereq(self)
6671     for tag in self.op.tags:
6672       objects.TaggableObject.ValidateTag(tag)
6673
6674   def Exec(self, feedback_fn):
6675     """Sets the tag.
6676
6677     """
6678     try:
6679       for tag in self.op.tags:
6680         self.target.AddTag(tag)
6681     except errors.TagError, err:
6682       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6683     try:
6684       self.cfg.Update(self.target)
6685     except errors.ConfigurationError:
6686       raise errors.OpRetryError("There has been a modification to the"
6687                                 " config file and the operation has been"
6688                                 " aborted. Please retry.")
6689
6690
6691 class LUDelTags(TagsLU):
6692   """Delete a list of tags from a given object.
6693
6694   """
6695   _OP_REQP = ["kind", "name", "tags"]
6696   REQ_BGL = False
6697
6698   def CheckPrereq(self):
6699     """Check prerequisites.
6700
6701     This checks that we have the given tag.
6702
6703     """
6704     TagsLU.CheckPrereq(self)
6705     for tag in self.op.tags:
6706       objects.TaggableObject.ValidateTag(tag)
6707     del_tags = frozenset(self.op.tags)
6708     cur_tags = self.target.GetTags()
6709     if not del_tags <= cur_tags:
6710       diff_tags = del_tags - cur_tags
6711       diff_names = ["'%s'" % tag for tag in diff_tags]
6712       diff_names.sort()
6713       raise errors.OpPrereqError("Tag(s) %s not found" %
6714                                  (",".join(diff_names)))
6715
6716   def Exec(self, feedback_fn):
6717     """Remove the tag from the object.
6718
6719     """
6720     for tag in self.op.tags:
6721       self.target.RemoveTag(tag)
6722     try:
6723       self.cfg.Update(self.target)
6724     except errors.ConfigurationError:
6725       raise errors.OpRetryError("There has been a modification to the"
6726                                 " config file and the operation has been"
6727                                 " aborted. Please retry.")
6728
6729
6730 class LUTestDelay(NoHooksLU):
6731   """Sleep for a specified amount of time.
6732
6733   This LU sleeps on the master and/or nodes for a specified amount of
6734   time.
6735
6736   """
6737   _OP_REQP = ["duration", "on_master", "on_nodes"]
6738   REQ_BGL = False
6739
6740   def ExpandNames(self):
6741     """Expand names and set required locks.
6742
6743     This expands the node list, if any.
6744
6745     """
6746     self.needed_locks = {}
6747     if self.op.on_nodes:
6748       # _GetWantedNodes can be used here, but is not always appropriate to use
6749       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6750       # more information.
6751       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6752       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6753
6754   def CheckPrereq(self):
6755     """Check prerequisites.
6756
6757     """
6758
6759   def Exec(self, feedback_fn):
6760     """Do the actual sleep.
6761
6762     """
6763     if self.op.on_master:
6764       if not utils.TestDelay(self.op.duration):
6765         raise errors.OpExecError("Error during master delay test")
6766     if self.op.on_nodes:
6767       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6768       for node, node_result in result.items():
6769         node_result.Raise("Failure during rpc call to node %s" % node)
6770
6771
6772 class IAllocator(object):
6773   """IAllocator framework.
6774
6775   An IAllocator instance has three sets of attributes:
6776     - cfg that is needed to query the cluster
6777     - input data (all members of the _KEYS class attribute are required)
6778     - four buffer attributes (in|out_data|text), that represent the
6779       input (to the external script) in text and data structure format,
6780       and the output from it, again in two formats
6781     - the result variables from the script (success, info, nodes) for
6782       easy usage
6783
6784   """
6785   _ALLO_KEYS = [
6786     "mem_size", "disks", "disk_template",
6787     "os", "tags", "nics", "vcpus", "hypervisor",
6788     ]
6789   _RELO_KEYS = [
6790     "relocate_from",
6791     ]
6792
6793   def __init__(self, lu, mode, name, **kwargs):
6794     self.lu = lu
6795     # init buffer variables
6796     self.in_text = self.out_text = self.in_data = self.out_data = None
6797     # init all input fields so that pylint is happy
6798     self.mode = mode
6799     self.name = name
6800     self.mem_size = self.disks = self.disk_template = None
6801     self.os = self.tags = self.nics = self.vcpus = None
6802     self.hypervisor = None
6803     self.relocate_from = None
6804     # computed fields
6805     self.required_nodes = None
6806     # init result fields
6807     self.success = self.info = self.nodes = None
6808     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6809       keyset = self._ALLO_KEYS
6810     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6811       keyset = self._RELO_KEYS
6812     else:
6813       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6814                                    " IAllocator" % self.mode)
6815     for key in kwargs:
6816       if key not in keyset:
6817         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6818                                      " IAllocator" % key)
6819       setattr(self, key, kwargs[key])
6820     for key in keyset:
6821       if key not in kwargs:
6822         raise errors.ProgrammerError("Missing input parameter '%s' to"
6823                                      " IAllocator" % key)
6824     self._BuildInputData()
6825
6826   def _ComputeClusterData(self):
6827     """Compute the generic allocator input data.
6828
6829     This is the data that is independent of the actual operation.
6830
6831     """
6832     cfg = self.lu.cfg
6833     cluster_info = cfg.GetClusterInfo()
6834     # cluster data
6835     data = {
6836       "version": constants.IALLOCATOR_VERSION,
6837       "cluster_name": cfg.GetClusterName(),
6838       "cluster_tags": list(cluster_info.GetTags()),
6839       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6840       # we don't have job IDs
6841       }
6842     iinfo = cfg.GetAllInstancesInfo().values()
6843     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6844
6845     # node data
6846     node_results = {}
6847     node_list = cfg.GetNodeList()
6848
6849     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6850       hypervisor_name = self.hypervisor
6851     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6852       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6853
6854     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6855                                            hypervisor_name)
6856     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6857                        cluster_info.enabled_hypervisors)
6858     for nname, nresult in node_data.items():
6859       # first fill in static (config-based) values
6860       ninfo = cfg.GetNodeInfo(nname)
6861       pnr = {
6862         "tags": list(ninfo.GetTags()),
6863         "primary_ip": ninfo.primary_ip,
6864         "secondary_ip": ninfo.secondary_ip,
6865         "offline": ninfo.offline,
6866         "drained": ninfo.drained,
6867         "master_candidate": ninfo.master_candidate,
6868         }
6869
6870       if not ninfo.offline:
6871         nresult.Raise("Can't get data for node %s" % nname)
6872         node_iinfo[nname].Raise("Can't get node instance info from node %s" %
6873                                 nname)
6874         remote_info = nresult.payload
6875         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6876                      'vg_size', 'vg_free', 'cpu_total']:
6877           if attr not in remote_info:
6878             raise errors.OpExecError("Node '%s' didn't return attribute"
6879                                      " '%s'" % (nname, attr))
6880           if not isinstance(remote_info[attr], int):
6881             raise errors.OpExecError("Node '%s' returned invalid value"
6882                                      " for '%s': %s" %
6883                                      (nname, attr, remote_info[attr]))
6884         # compute memory used by primary instances
6885         i_p_mem = i_p_up_mem = 0
6886         for iinfo, beinfo in i_list:
6887           if iinfo.primary_node == nname:
6888             i_p_mem += beinfo[constants.BE_MEMORY]
6889             if iinfo.name not in node_iinfo[nname].payload:
6890               i_used_mem = 0
6891             else:
6892               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6893             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6894             remote_info['memory_free'] -= max(0, i_mem_diff)
6895
6896             if iinfo.admin_up:
6897               i_p_up_mem += beinfo[constants.BE_MEMORY]
6898
6899         # compute memory used by instances
6900         pnr_dyn = {
6901           "total_memory": remote_info['memory_total'],
6902           "reserved_memory": remote_info['memory_dom0'],
6903           "free_memory": remote_info['memory_free'],
6904           "total_disk": remote_info['vg_size'],
6905           "free_disk": remote_info['vg_free'],
6906           "total_cpus": remote_info['cpu_total'],
6907           "i_pri_memory": i_p_mem,
6908           "i_pri_up_memory": i_p_up_mem,
6909           }
6910         pnr.update(pnr_dyn)
6911
6912       node_results[nname] = pnr
6913     data["nodes"] = node_results
6914
6915     # instance data
6916     instance_data = {}
6917     for iinfo, beinfo in i_list:
6918       nic_data = []
6919       for nic in iinfo.nics:
6920         filled_params = objects.FillDict(
6921             cluster_info.nicparams[constants.PP_DEFAULT],
6922             nic.nicparams)
6923         nic_dict = {"mac": nic.mac,
6924                     "ip": nic.ip,
6925                     "mode": filled_params[constants.NIC_MODE],
6926                     "link": filled_params[constants.NIC_LINK],
6927                    }
6928         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
6929           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
6930         nic_data.append(nic_dict)
6931       pir = {
6932         "tags": list(iinfo.GetTags()),
6933         "admin_up": iinfo.admin_up,
6934         "vcpus": beinfo[constants.BE_VCPUS],
6935         "memory": beinfo[constants.BE_MEMORY],
6936         "os": iinfo.os,
6937         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6938         "nics": nic_data,
6939         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6940         "disk_template": iinfo.disk_template,
6941         "hypervisor": iinfo.hypervisor,
6942         }
6943       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6944                                                  pir["disks"])
6945       instance_data[iinfo.name] = pir
6946
6947     data["instances"] = instance_data
6948
6949     self.in_data = data
6950
6951   def _AddNewInstance(self):
6952     """Add new instance data to allocator structure.
6953
6954     This in combination with _AllocatorGetClusterData will create the
6955     correct structure needed as input for the allocator.
6956
6957     The checks for the completeness of the opcode must have already been
6958     done.
6959
6960     """
6961     data = self.in_data
6962
6963     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6964
6965     if self.disk_template in constants.DTS_NET_MIRROR:
6966       self.required_nodes = 2
6967     else:
6968       self.required_nodes = 1
6969     request = {
6970       "type": "allocate",
6971       "name": self.name,
6972       "disk_template": self.disk_template,
6973       "tags": self.tags,
6974       "os": self.os,
6975       "vcpus": self.vcpus,
6976       "memory": self.mem_size,
6977       "disks": self.disks,
6978       "disk_space_total": disk_space,
6979       "nics": self.nics,
6980       "required_nodes": self.required_nodes,
6981       }
6982     data["request"] = request
6983
6984   def _AddRelocateInstance(self):
6985     """Add relocate instance data to allocator structure.
6986
6987     This in combination with _IAllocatorGetClusterData will create the
6988     correct structure needed as input for the allocator.
6989
6990     The checks for the completeness of the opcode must have already been
6991     done.
6992
6993     """
6994     instance = self.lu.cfg.GetInstanceInfo(self.name)
6995     if instance is None:
6996       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6997                                    " IAllocator" % self.name)
6998
6999     if instance.disk_template not in constants.DTS_NET_MIRROR:
7000       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7001
7002     if len(instance.secondary_nodes) != 1:
7003       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7004
7005     self.required_nodes = 1
7006     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7007     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7008
7009     request = {
7010       "type": "relocate",
7011       "name": self.name,
7012       "disk_space_total": disk_space,
7013       "required_nodes": self.required_nodes,
7014       "relocate_from": self.relocate_from,
7015       }
7016     self.in_data["request"] = request
7017
7018   def _BuildInputData(self):
7019     """Build input data structures.
7020
7021     """
7022     self._ComputeClusterData()
7023
7024     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7025       self._AddNewInstance()
7026     else:
7027       self._AddRelocateInstance()
7028
7029     self.in_text = serializer.Dump(self.in_data)
7030
7031   def Run(self, name, validate=True, call_fn=None):
7032     """Run an instance allocator and return the results.
7033
7034     """
7035     if call_fn is None:
7036       call_fn = self.lu.rpc.call_iallocator_runner
7037     data = self.in_text
7038
7039     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7040     result.Raise("Failure while running the iallocator script")
7041
7042     self.out_text = result.payload
7043     if validate:
7044       self._ValidateResult()
7045
7046   def _ValidateResult(self):
7047     """Process the allocator results.
7048
7049     This will process and if successful save the result in
7050     self.out_data and the other parameters.
7051
7052     """
7053     try:
7054       rdict = serializer.Load(self.out_text)
7055     except Exception, err:
7056       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7057
7058     if not isinstance(rdict, dict):
7059       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7060
7061     for key in "success", "info", "nodes":
7062       if key not in rdict:
7063         raise errors.OpExecError("Can't parse iallocator results:"
7064                                  " missing key '%s'" % key)
7065       setattr(self, key, rdict[key])
7066
7067     if not isinstance(rdict["nodes"], list):
7068       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7069                                " is not a list")
7070     self.out_data = rdict
7071
7072
7073 class LUTestAllocator(NoHooksLU):
7074   """Run allocator tests.
7075
7076   This LU runs the allocator tests
7077
7078   """
7079   _OP_REQP = ["direction", "mode", "name"]
7080
7081   def CheckPrereq(self):
7082     """Check prerequisites.
7083
7084     This checks the opcode parameters depending on the director and mode test.
7085
7086     """
7087     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7088       for attr in ["name", "mem_size", "disks", "disk_template",
7089                    "os", "tags", "nics", "vcpus"]:
7090         if not hasattr(self.op, attr):
7091           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7092                                      attr)
7093       iname = self.cfg.ExpandInstanceName(self.op.name)
7094       if iname is not None:
7095         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7096                                    iname)
7097       if not isinstance(self.op.nics, list):
7098         raise errors.OpPrereqError("Invalid parameter 'nics'")
7099       for row in self.op.nics:
7100         if (not isinstance(row, dict) or
7101             "mac" not in row or
7102             "ip" not in row or
7103             "bridge" not in row):
7104           raise errors.OpPrereqError("Invalid contents of the"
7105                                      " 'nics' parameter")
7106       if not isinstance(self.op.disks, list):
7107         raise errors.OpPrereqError("Invalid parameter 'disks'")
7108       for row in self.op.disks:
7109         if (not isinstance(row, dict) or
7110             "size" not in row or
7111             not isinstance(row["size"], int) or
7112             "mode" not in row or
7113             row["mode"] not in ['r', 'w']):
7114           raise errors.OpPrereqError("Invalid contents of the"
7115                                      " 'disks' parameter")
7116       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7117         self.op.hypervisor = self.cfg.GetHypervisorType()
7118     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7119       if not hasattr(self.op, "name"):
7120         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7121       fname = self.cfg.ExpandInstanceName(self.op.name)
7122       if fname is None:
7123         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7124                                    self.op.name)
7125       self.op.name = fname
7126       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7127     else:
7128       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7129                                  self.op.mode)
7130
7131     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7132       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7133         raise errors.OpPrereqError("Missing allocator name")
7134     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7135       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7136                                  self.op.direction)
7137
7138   def Exec(self, feedback_fn):
7139     """Run the allocator test.
7140
7141     """
7142     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7143       ial = IAllocator(self,
7144                        mode=self.op.mode,
7145                        name=self.op.name,
7146                        mem_size=self.op.mem_size,
7147                        disks=self.op.disks,
7148                        disk_template=self.op.disk_template,
7149                        os=self.op.os,
7150                        tags=self.op.tags,
7151                        nics=self.op.nics,
7152                        vcpus=self.op.vcpus,
7153                        hypervisor=self.op.hypervisor,
7154                        )
7155     else:
7156       ial = IAllocator(self,
7157                        mode=self.op.mode,
7158                        name=self.op.name,
7159                        relocate_from=list(self.relocate_from),
7160                        )
7161
7162     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7163       result = ial.in_text
7164     else:
7165       ial.Run(self.op.allocator, validate=False)
7166       result = ial.out_text
7167     return result