Merge branch 'next' into branch-2.1
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks,
457                           bep, hvp, hypervisor):
458   """Builds instance related env variables for hooks
459
460   This builds the hook environment from individual variables.
461
462   @type name: string
463   @param name: the name of the instance
464   @type primary_node: string
465   @param primary_node: the name of the instance's primary node
466   @type secondary_nodes: list
467   @param secondary_nodes: list of secondary nodes as strings
468   @type os_type: string
469   @param os_type: the name of the instance's OS
470   @type status: boolean
471   @param status: the should_run status of the instance
472   @type memory: string
473   @param memory: the memory size of the instance
474   @type vcpus: string
475   @param vcpus: the count of VCPUs the instance has
476   @type nics: list
477   @param nics: list of tuples (ip, bridge, mac) representing
478       the NICs the instance  has
479   @type disk_template: string
480   @param disk_template: the distk template of the instance
481   @type disks: list
482   @param disks: the list of (size, mode) pairs
483   @type bep: dict
484   @param bep: the backend parameters for the instance
485   @type hvp: dict
486   @param hvp: the hypervisor parameters for the instance
487   @type hypervisor: string
488   @param hypervisor: the hypervisor for the instance
489   @rtype: dict
490   @return: the hook environment for this instance
491
492   """
493   if status:
494     str_status = "up"
495   else:
496     str_status = "down"
497   env = {
498     "OP_TARGET": name,
499     "INSTANCE_NAME": name,
500     "INSTANCE_PRIMARY": primary_node,
501     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
502     "INSTANCE_OS_TYPE": os_type,
503     "INSTANCE_STATUS": str_status,
504     "INSTANCE_MEMORY": memory,
505     "INSTANCE_VCPUS": vcpus,
506     "INSTANCE_DISK_TEMPLATE": disk_template,
507     "INSTANCE_HYPERVISOR": hypervisor,
508   }
509
510   if nics:
511     nic_count = len(nics)
512     for idx, (ip, mac, mode, link) in enumerate(nics):
513       if ip is None:
514         ip = ""
515       env["INSTANCE_NIC%d_IP" % idx] = ip
516       env["INSTANCE_NIC%d_MAC" % idx] = mac
517       env["INSTANCE_NIC%d_MODE" % idx] = mode
518       env["INSTANCE_NIC%d_LINK" % idx] = link
519       if mode == constants.NIC_MODE_BRIDGED:
520         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
521   else:
522     nic_count = 0
523
524   env["INSTANCE_NIC_COUNT"] = nic_count
525
526   if disks:
527     disk_count = len(disks)
528     for idx, (size, mode) in enumerate(disks):
529       env["INSTANCE_DISK%d_SIZE" % idx] = size
530       env["INSTANCE_DISK%d_MODE" % idx] = mode
531   else:
532     disk_count = 0
533
534   env["INSTANCE_DISK_COUNT"] = disk_count
535
536   for source, kind in [(bep, "BE"), (hvp, "HV")]:
537     for key, value in source.items():
538       env["INSTANCE_%s_%s" % (kind, key)] = value
539
540   return env
541
542 def _PreBuildNICHooksList(lu, nics):
543   """Build a list of nic information tuples.
544
545   This list is suitable to be passed to _BuildInstanceHookEnv.
546
547   @type lu:  L{LogicalUnit}
548   @param lu: the logical unit on whose behalf we execute
549   @type nics: list of L{objects.NIC}
550   @param nics: list of nics to convert to hooks tuples
551
552   """
553   hooks_nics = []
554   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
555   for nic in nics:
556     ip = nic.ip
557     mac = nic.mac
558     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
559     mode = filled_params[constants.NIC_MODE]
560     link = filled_params[constants.NIC_LINK]
561     hooks_nics.append((ip, mac, mode, link))
562   return hooks_nics
563
564 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
565   """Builds instance related env variables for hooks from an object.
566
567   @type lu: L{LogicalUnit}
568   @param lu: the logical unit on whose behalf we execute
569   @type instance: L{objects.Instance}
570   @param instance: the instance for which we should build the
571       environment
572   @type override: dict
573   @param override: dictionary with key/values that will override
574       our values
575   @rtype: dict
576   @return: the hook environment dictionary
577
578   """
579   cluster = lu.cfg.GetClusterInfo()
580   bep = cluster.FillBE(instance)
581   hvp = cluster.FillHV(instance)
582   args = {
583     'name': instance.name,
584     'primary_node': instance.primary_node,
585     'secondary_nodes': instance.secondary_nodes,
586     'os_type': instance.os,
587     'status': instance.admin_up,
588     'memory': bep[constants.BE_MEMORY],
589     'vcpus': bep[constants.BE_VCPUS],
590     'nics': _PreBuildNICHooksList(lu, instance.nics),
591     'disk_template': instance.disk_template,
592     'disks': [(disk.size, disk.mode) for disk in instance.disks],
593     'bep': bep,
594     'hvp': hvp,
595     'hypervisor': instance.hypervisor,
596   }
597   if override:
598     args.update(override)
599   return _BuildInstanceHookEnv(**args)
600
601
602 def _AdjustCandidatePool(lu):
603   """Adjust the candidate pool after node operations.
604
605   """
606   mod_list = lu.cfg.MaintainCandidatePool()
607   if mod_list:
608     lu.LogInfo("Promoted nodes to master candidate role: %s",
609                ", ".join(node.name for node in mod_list))
610     for name in mod_list:
611       lu.context.ReaddNode(name)
612   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
613   if mc_now > mc_max:
614     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
615                (mc_now, mc_max))
616
617
618 def _CheckNicsBridgesExist(lu, target_nics, target_node,
619                                profile=constants.PP_DEFAULT):
620   """Check that the brigdes needed by a list of nics exist.
621
622   """
623   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
624   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
625                 for nic in target_nics]
626   brlist = [params[constants.NIC_LINK] for params in paramslist
627             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
628   if brlist:
629     result = lu.rpc.call_bridges_exist(target_node, brlist)
630     result.Raise("Error checking bridges on destination node '%s'" %
631                  target_node, prereq=True)
632
633
634 def _CheckInstanceBridgesExist(lu, instance, node=None):
635   """Check that the brigdes needed by an instance exist.
636
637   """
638   if node is None:
639     node=instance.primary_node
640   _CheckNicsBridgesExist(lu, instance.nics, node)
641
642
643 class LUDestroyCluster(NoHooksLU):
644   """Logical unit for destroying the cluster.
645
646   """
647   _OP_REQP = []
648
649   def CheckPrereq(self):
650     """Check prerequisites.
651
652     This checks whether the cluster is empty.
653
654     Any errors are signalled by raising errors.OpPrereqError.
655
656     """
657     master = self.cfg.GetMasterNode()
658
659     nodelist = self.cfg.GetNodeList()
660     if len(nodelist) != 1 or nodelist[0] != master:
661       raise errors.OpPrereqError("There are still %d node(s) in"
662                                  " this cluster." % (len(nodelist) - 1))
663     instancelist = self.cfg.GetInstanceList()
664     if instancelist:
665       raise errors.OpPrereqError("There are still %d instance(s) in"
666                                  " this cluster." % len(instancelist))
667
668   def Exec(self, feedback_fn):
669     """Destroys the cluster.
670
671     """
672     master = self.cfg.GetMasterNode()
673     result = self.rpc.call_node_stop_master(master, False)
674     result.Raise("Could not disable the master role")
675     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
676     utils.CreateBackup(priv_key)
677     utils.CreateBackup(pub_key)
678     return master
679
680
681 class LUVerifyCluster(LogicalUnit):
682   """Verifies the cluster status.
683
684   """
685   HPATH = "cluster-verify"
686   HTYPE = constants.HTYPE_CLUSTER
687   _OP_REQP = ["skip_checks"]
688   REQ_BGL = False
689
690   def ExpandNames(self):
691     self.needed_locks = {
692       locking.LEVEL_NODE: locking.ALL_SET,
693       locking.LEVEL_INSTANCE: locking.ALL_SET,
694     }
695     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
696
697   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
698                   node_result, feedback_fn, master_files,
699                   drbd_map, vg_name):
700     """Run multiple tests against a node.
701
702     Test list:
703
704       - compares ganeti version
705       - checks vg existance and size > 20G
706       - checks config file checksum
707       - checks ssh to other nodes
708
709     @type nodeinfo: L{objects.Node}
710     @param nodeinfo: the node to check
711     @param file_list: required list of files
712     @param local_cksum: dictionary of local files and their checksums
713     @param node_result: the results from the node
714     @param feedback_fn: function used to accumulate results
715     @param master_files: list of files that only masters should have
716     @param drbd_map: the useddrbd minors for this node, in
717         form of minor: (instance, must_exist) which correspond to instances
718         and their running status
719     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
720
721     """
722     node = nodeinfo.name
723
724     # main result, node_result should be a non-empty dict
725     if not node_result or not isinstance(node_result, dict):
726       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
727       return True
728
729     # compares ganeti version
730     local_version = constants.PROTOCOL_VERSION
731     remote_version = node_result.get('version', None)
732     if not (remote_version and isinstance(remote_version, (list, tuple)) and
733             len(remote_version) == 2):
734       feedback_fn("  - ERROR: connection to %s failed" % (node))
735       return True
736
737     if local_version != remote_version[0]:
738       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
739                   " node %s %s" % (local_version, node, remote_version[0]))
740       return True
741
742     # node seems compatible, we can actually try to look into its results
743
744     bad = False
745
746     # full package version
747     if constants.RELEASE_VERSION != remote_version[1]:
748       feedback_fn("  - WARNING: software version mismatch: master %s,"
749                   " node %s %s" %
750                   (constants.RELEASE_VERSION, node, remote_version[1]))
751
752     # checks vg existence and size > 20G
753     if vg_name is not None:
754       vglist = node_result.get(constants.NV_VGLIST, None)
755       if not vglist:
756         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
757                         (node,))
758         bad = True
759       else:
760         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
761                                               constants.MIN_VG_SIZE)
762         if vgstatus:
763           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
764           bad = True
765
766     # checks config file checksum
767
768     remote_cksum = node_result.get(constants.NV_FILELIST, None)
769     if not isinstance(remote_cksum, dict):
770       bad = True
771       feedback_fn("  - ERROR: node hasn't returned file checksum data")
772     else:
773       for file_name in file_list:
774         node_is_mc = nodeinfo.master_candidate
775         must_have_file = file_name not in master_files
776         if file_name not in remote_cksum:
777           if node_is_mc or must_have_file:
778             bad = True
779             feedback_fn("  - ERROR: file '%s' missing" % file_name)
780         elif remote_cksum[file_name] != local_cksum[file_name]:
781           if node_is_mc or must_have_file:
782             bad = True
783             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
784           else:
785             # not candidate and this is not a must-have file
786             bad = True
787             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
788                         " '%s'" % file_name)
789         else:
790           # all good, except non-master/non-must have combination
791           if not node_is_mc and not must_have_file:
792             feedback_fn("  - ERROR: file '%s' should not exist on non master"
793                         " candidates" % file_name)
794
795     # checks ssh to any
796
797     if constants.NV_NODELIST not in node_result:
798       bad = True
799       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
800     else:
801       if node_result[constants.NV_NODELIST]:
802         bad = True
803         for node in node_result[constants.NV_NODELIST]:
804           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
805                           (node, node_result[constants.NV_NODELIST][node]))
806
807     if constants.NV_NODENETTEST not in node_result:
808       bad = True
809       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
810     else:
811       if node_result[constants.NV_NODENETTEST]:
812         bad = True
813         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
814         for node in nlist:
815           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
816                           (node, node_result[constants.NV_NODENETTEST][node]))
817
818     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
819     if isinstance(hyp_result, dict):
820       for hv_name, hv_result in hyp_result.iteritems():
821         if hv_result is not None:
822           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
823                       (hv_name, hv_result))
824
825     # check used drbd list
826     if vg_name is not None:
827       used_minors = node_result.get(constants.NV_DRBDLIST, [])
828       if not isinstance(used_minors, (tuple, list)):
829         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
830                     str(used_minors))
831       else:
832         for minor, (iname, must_exist) in drbd_map.items():
833           if minor not in used_minors and must_exist:
834             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
835                         " not active" % (minor, iname))
836             bad = True
837         for minor in used_minors:
838           if minor not in drbd_map:
839             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
840                         minor)
841             bad = True
842
843     return bad
844
845   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
846                       node_instance, feedback_fn, n_offline):
847     """Verify an instance.
848
849     This function checks to see if the required block devices are
850     available on the instance's node.
851
852     """
853     bad = False
854
855     node_current = instanceconfig.primary_node
856
857     node_vol_should = {}
858     instanceconfig.MapLVsByNode(node_vol_should)
859
860     for node in node_vol_should:
861       if node in n_offline:
862         # ignore missing volumes on offline nodes
863         continue
864       for volume in node_vol_should[node]:
865         if node not in node_vol_is or volume not in node_vol_is[node]:
866           feedback_fn("  - ERROR: volume %s missing on node %s" %
867                           (volume, node))
868           bad = True
869
870     if instanceconfig.admin_up:
871       if ((node_current not in node_instance or
872           not instance in node_instance[node_current]) and
873           node_current not in n_offline):
874         feedback_fn("  - ERROR: instance %s not running on node %s" %
875                         (instance, node_current))
876         bad = True
877
878     for node in node_instance:
879       if (not node == node_current):
880         if instance in node_instance[node]:
881           feedback_fn("  - ERROR: instance %s should not run on node %s" %
882                           (instance, node))
883           bad = True
884
885     return bad
886
887   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
888     """Verify if there are any unknown volumes in the cluster.
889
890     The .os, .swap and backup volumes are ignored. All other volumes are
891     reported as unknown.
892
893     """
894     bad = False
895
896     for node in node_vol_is:
897       for volume in node_vol_is[node]:
898         if node not in node_vol_should or volume not in node_vol_should[node]:
899           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
900                       (volume, node))
901           bad = True
902     return bad
903
904   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
905     """Verify the list of running instances.
906
907     This checks what instances are running but unknown to the cluster.
908
909     """
910     bad = False
911     for node in node_instance:
912       for runninginstance in node_instance[node]:
913         if runninginstance not in instancelist:
914           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
915                           (runninginstance, node))
916           bad = True
917     return bad
918
919   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
920     """Verify N+1 Memory Resilience.
921
922     Check that if one single node dies we can still start all the instances it
923     was primary for.
924
925     """
926     bad = False
927
928     for node, nodeinfo in node_info.iteritems():
929       # This code checks that every node which is now listed as secondary has
930       # enough memory to host all instances it is supposed to should a single
931       # other node in the cluster fail.
932       # FIXME: not ready for failover to an arbitrary node
933       # FIXME: does not support file-backed instances
934       # WARNING: we currently take into account down instances as well as up
935       # ones, considering that even if they're down someone might want to start
936       # them even in the event of a node failure.
937       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
938         needed_mem = 0
939         for instance in instances:
940           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
941           if bep[constants.BE_AUTO_BALANCE]:
942             needed_mem += bep[constants.BE_MEMORY]
943         if nodeinfo['mfree'] < needed_mem:
944           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
945                       " failovers should node %s fail" % (node, prinode))
946           bad = True
947     return bad
948
949   def CheckPrereq(self):
950     """Check prerequisites.
951
952     Transform the list of checks we're going to skip into a set and check that
953     all its members are valid.
954
955     """
956     self.skip_set = frozenset(self.op.skip_checks)
957     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
958       raise errors.OpPrereqError("Invalid checks to be skipped specified")
959
960   def BuildHooksEnv(self):
961     """Build hooks env.
962
963     Cluster-Verify hooks just rone in the post phase and their failure makes
964     the output be logged in the verify output and the verification to fail.
965
966     """
967     all_nodes = self.cfg.GetNodeList()
968     env = {
969       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
970       }
971     for node in self.cfg.GetAllNodesInfo().values():
972       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
973
974     return env, [], all_nodes
975
976   def Exec(self, feedback_fn):
977     """Verify integrity of cluster, performing various test on nodes.
978
979     """
980     bad = False
981     feedback_fn("* Verifying global settings")
982     for msg in self.cfg.VerifyConfig():
983       feedback_fn("  - ERROR: %s" % msg)
984
985     vg_name = self.cfg.GetVGName()
986     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
987     nodelist = utils.NiceSort(self.cfg.GetNodeList())
988     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
989     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
990     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
991                         for iname in instancelist)
992     i_non_redundant = [] # Non redundant instances
993     i_non_a_balanced = [] # Non auto-balanced instances
994     n_offline = [] # List of offline nodes
995     n_drained = [] # List of nodes being drained
996     node_volume = {}
997     node_instance = {}
998     node_info = {}
999     instance_cfg = {}
1000
1001     # FIXME: verify OS list
1002     # do local checksums
1003     master_files = [constants.CLUSTER_CONF_FILE]
1004
1005     file_names = ssconf.SimpleStore().GetFileList()
1006     file_names.append(constants.SSL_CERT_FILE)
1007     file_names.append(constants.RAPI_CERT_FILE)
1008     file_names.extend(master_files)
1009
1010     local_checksums = utils.FingerprintFiles(file_names)
1011
1012     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1013     node_verify_param = {
1014       constants.NV_FILELIST: file_names,
1015       constants.NV_NODELIST: [node.name for node in nodeinfo
1016                               if not node.offline],
1017       constants.NV_HYPERVISOR: hypervisors,
1018       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1019                                   node.secondary_ip) for node in nodeinfo
1020                                  if not node.offline],
1021       constants.NV_INSTANCELIST: hypervisors,
1022       constants.NV_VERSION: None,
1023       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1024       }
1025     if vg_name is not None:
1026       node_verify_param[constants.NV_VGLIST] = None
1027       node_verify_param[constants.NV_LVLIST] = vg_name
1028       node_verify_param[constants.NV_DRBDLIST] = None
1029     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1030                                            self.cfg.GetClusterName())
1031
1032     cluster = self.cfg.GetClusterInfo()
1033     master_node = self.cfg.GetMasterNode()
1034     all_drbd_map = self.cfg.ComputeDRBDMap()
1035
1036     for node_i in nodeinfo:
1037       node = node_i.name
1038
1039       if node_i.offline:
1040         feedback_fn("* Skipping offline node %s" % (node,))
1041         n_offline.append(node)
1042         continue
1043
1044       if node == master_node:
1045         ntype = "master"
1046       elif node_i.master_candidate:
1047         ntype = "master candidate"
1048       elif node_i.drained:
1049         ntype = "drained"
1050         n_drained.append(node)
1051       else:
1052         ntype = "regular"
1053       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1054
1055       msg = all_nvinfo[node].fail_msg
1056       if msg:
1057         feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1058         bad = True
1059         continue
1060
1061       nresult = all_nvinfo[node].payload
1062       node_drbd = {}
1063       for minor, instance in all_drbd_map[node].items():
1064         if instance not in instanceinfo:
1065           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1066                       instance)
1067           # ghost instance should not be running, but otherwise we
1068           # don't give double warnings (both ghost instance and
1069           # unallocated minor in use)
1070           node_drbd[minor] = (instance, False)
1071         else:
1072           instance = instanceinfo[instance]
1073           node_drbd[minor] = (instance.name, instance.admin_up)
1074       result = self._VerifyNode(node_i, file_names, local_checksums,
1075                                 nresult, feedback_fn, master_files,
1076                                 node_drbd, vg_name)
1077       bad = bad or result
1078
1079       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1080       if vg_name is None:
1081         node_volume[node] = {}
1082       elif isinstance(lvdata, basestring):
1083         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1084                     (node, utils.SafeEncode(lvdata)))
1085         bad = True
1086         node_volume[node] = {}
1087       elif not isinstance(lvdata, dict):
1088         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1089         bad = True
1090         continue
1091       else:
1092         node_volume[node] = lvdata
1093
1094       # node_instance
1095       idata = nresult.get(constants.NV_INSTANCELIST, None)
1096       if not isinstance(idata, list):
1097         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1098                     (node,))
1099         bad = True
1100         continue
1101
1102       node_instance[node] = idata
1103
1104       # node_info
1105       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1106       if not isinstance(nodeinfo, dict):
1107         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1108         bad = True
1109         continue
1110
1111       try:
1112         node_info[node] = {
1113           "mfree": int(nodeinfo['memory_free']),
1114           "pinst": [],
1115           "sinst": [],
1116           # dictionary holding all instances this node is secondary for,
1117           # grouped by their primary node. Each key is a cluster node, and each
1118           # value is a list of instances which have the key as primary and the
1119           # current node as secondary.  this is handy to calculate N+1 memory
1120           # availability if you can only failover from a primary to its
1121           # secondary.
1122           "sinst-by-pnode": {},
1123         }
1124         # FIXME: devise a free space model for file based instances as well
1125         if vg_name is not None:
1126           if (constants.NV_VGLIST not in nresult or
1127               vg_name not in nresult[constants.NV_VGLIST]):
1128             feedback_fn("  - ERROR: node %s didn't return data for the"
1129                         " volume group '%s' - it is either missing or broken" %
1130                         (node, vg_name))
1131             bad = True
1132             continue
1133           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1134       except (ValueError, KeyError):
1135         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1136                     " from node %s" % (node,))
1137         bad = True
1138         continue
1139
1140     node_vol_should = {}
1141
1142     for instance in instancelist:
1143       feedback_fn("* Verifying instance %s" % instance)
1144       inst_config = instanceinfo[instance]
1145       result =  self._VerifyInstance(instance, inst_config, node_volume,
1146                                      node_instance, feedback_fn, n_offline)
1147       bad = bad or result
1148       inst_nodes_offline = []
1149
1150       inst_config.MapLVsByNode(node_vol_should)
1151
1152       instance_cfg[instance] = inst_config
1153
1154       pnode = inst_config.primary_node
1155       if pnode in node_info:
1156         node_info[pnode]['pinst'].append(instance)
1157       elif pnode not in n_offline:
1158         feedback_fn("  - ERROR: instance %s, connection to primary node"
1159                     " %s failed" % (instance, pnode))
1160         bad = True
1161
1162       if pnode in n_offline:
1163         inst_nodes_offline.append(pnode)
1164
1165       # If the instance is non-redundant we cannot survive losing its primary
1166       # node, so we are not N+1 compliant. On the other hand we have no disk
1167       # templates with more than one secondary so that situation is not well
1168       # supported either.
1169       # FIXME: does not support file-backed instances
1170       if len(inst_config.secondary_nodes) == 0:
1171         i_non_redundant.append(instance)
1172       elif len(inst_config.secondary_nodes) > 1:
1173         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1174                     % instance)
1175
1176       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1177         i_non_a_balanced.append(instance)
1178
1179       for snode in inst_config.secondary_nodes:
1180         if snode in node_info:
1181           node_info[snode]['sinst'].append(instance)
1182           if pnode not in node_info[snode]['sinst-by-pnode']:
1183             node_info[snode]['sinst-by-pnode'][pnode] = []
1184           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1185         elif snode not in n_offline:
1186           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1187                       " %s failed" % (instance, snode))
1188           bad = True
1189         if snode in n_offline:
1190           inst_nodes_offline.append(snode)
1191
1192       if inst_nodes_offline:
1193         # warn that the instance lives on offline nodes, and set bad=True
1194         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1195                     ", ".join(inst_nodes_offline))
1196         bad = True
1197
1198     feedback_fn("* Verifying orphan volumes")
1199     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1200                                        feedback_fn)
1201     bad = bad or result
1202
1203     feedback_fn("* Verifying remaining instances")
1204     result = self._VerifyOrphanInstances(instancelist, node_instance,
1205                                          feedback_fn)
1206     bad = bad or result
1207
1208     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1209       feedback_fn("* Verifying N+1 Memory redundancy")
1210       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1211       bad = bad or result
1212
1213     feedback_fn("* Other Notes")
1214     if i_non_redundant:
1215       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1216                   % len(i_non_redundant))
1217
1218     if i_non_a_balanced:
1219       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1220                   % len(i_non_a_balanced))
1221
1222     if n_offline:
1223       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1224
1225     if n_drained:
1226       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1227
1228     return not bad
1229
1230   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1231     """Analize the post-hooks' result
1232
1233     This method analyses the hook result, handles it, and sends some
1234     nicely-formatted feedback back to the user.
1235
1236     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1237         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1238     @param hooks_results: the results of the multi-node hooks rpc call
1239     @param feedback_fn: function used send feedback back to the caller
1240     @param lu_result: previous Exec result
1241     @return: the new Exec result, based on the previous result
1242         and hook results
1243
1244     """
1245     # We only really run POST phase hooks, and are only interested in
1246     # their results
1247     if phase == constants.HOOKS_PHASE_POST:
1248       # Used to change hooks' output to proper indentation
1249       indent_re = re.compile('^', re.M)
1250       feedback_fn("* Hooks Results")
1251       if not hooks_results:
1252         feedback_fn("  - ERROR: general communication failure")
1253         lu_result = 1
1254       else:
1255         for node_name in hooks_results:
1256           show_node_header = True
1257           res = hooks_results[node_name]
1258           msg = res.fail_msg
1259           if msg:
1260             if res.offline:
1261               # no need to warn or set fail return value
1262               continue
1263             feedback_fn("    Communication failure in hooks execution: %s" %
1264                         msg)
1265             lu_result = 1
1266             continue
1267           for script, hkr, output in res.payload:
1268             if hkr == constants.HKR_FAIL:
1269               # The node header is only shown once, if there are
1270               # failing hooks on that node
1271               if show_node_header:
1272                 feedback_fn("  Node %s:" % node_name)
1273                 show_node_header = False
1274               feedback_fn("    ERROR: Script %s failed, output:" % script)
1275               output = indent_re.sub('      ', output)
1276               feedback_fn("%s" % output)
1277               lu_result = 1
1278
1279       return lu_result
1280
1281
1282 class LUVerifyDisks(NoHooksLU):
1283   """Verifies the cluster disks status.
1284
1285   """
1286   _OP_REQP = []
1287   REQ_BGL = False
1288
1289   def ExpandNames(self):
1290     self.needed_locks = {
1291       locking.LEVEL_NODE: locking.ALL_SET,
1292       locking.LEVEL_INSTANCE: locking.ALL_SET,
1293     }
1294     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1295
1296   def CheckPrereq(self):
1297     """Check prerequisites.
1298
1299     This has no prerequisites.
1300
1301     """
1302     pass
1303
1304   def Exec(self, feedback_fn):
1305     """Verify integrity of cluster disks.
1306
1307     @rtype: tuple of three items
1308     @return: a tuple of (dict of node-to-node_error, list of instances
1309         which need activate-disks, dict of instance: (node, volume) for
1310         missing volumes
1311
1312     """
1313     result = res_nodes, res_instances, res_missing = {}, [], {}
1314
1315     vg_name = self.cfg.GetVGName()
1316     nodes = utils.NiceSort(self.cfg.GetNodeList())
1317     instances = [self.cfg.GetInstanceInfo(name)
1318                  for name in self.cfg.GetInstanceList()]
1319
1320     nv_dict = {}
1321     for inst in instances:
1322       inst_lvs = {}
1323       if (not inst.admin_up or
1324           inst.disk_template not in constants.DTS_NET_MIRROR):
1325         continue
1326       inst.MapLVsByNode(inst_lvs)
1327       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1328       for node, vol_list in inst_lvs.iteritems():
1329         for vol in vol_list:
1330           nv_dict[(node, vol)] = inst
1331
1332     if not nv_dict:
1333       return result
1334
1335     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1336
1337     to_act = set()
1338     for node in nodes:
1339       # node_volume
1340       node_res = node_lvs[node]
1341       if node_res.offline:
1342         continue
1343       msg = node_res.fail_msg
1344       if msg:
1345         logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1346         res_nodes[node] = msg
1347         continue
1348
1349       lvs = node_res.payload
1350       for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1351         inst = nv_dict.pop((node, lv_name), None)
1352         if (not lv_online and inst is not None
1353             and inst.name not in res_instances):
1354           res_instances.append(inst.name)
1355
1356     # any leftover items in nv_dict are missing LVs, let's arrange the
1357     # data better
1358     for key, inst in nv_dict.iteritems():
1359       if inst.name not in res_missing:
1360         res_missing[inst.name] = []
1361       res_missing[inst.name].append(key)
1362
1363     return result
1364
1365
1366 class LURenameCluster(LogicalUnit):
1367   """Rename the cluster.
1368
1369   """
1370   HPATH = "cluster-rename"
1371   HTYPE = constants.HTYPE_CLUSTER
1372   _OP_REQP = ["name"]
1373
1374   def BuildHooksEnv(self):
1375     """Build hooks env.
1376
1377     """
1378     env = {
1379       "OP_TARGET": self.cfg.GetClusterName(),
1380       "NEW_NAME": self.op.name,
1381       }
1382     mn = self.cfg.GetMasterNode()
1383     return env, [mn], [mn]
1384
1385   def CheckPrereq(self):
1386     """Verify that the passed name is a valid one.
1387
1388     """
1389     hostname = utils.HostInfo(self.op.name)
1390
1391     new_name = hostname.name
1392     self.ip = new_ip = hostname.ip
1393     old_name = self.cfg.GetClusterName()
1394     old_ip = self.cfg.GetMasterIP()
1395     if new_name == old_name and new_ip == old_ip:
1396       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1397                                  " cluster has changed")
1398     if new_ip != old_ip:
1399       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1400         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1401                                    " reachable on the network. Aborting." %
1402                                    new_ip)
1403
1404     self.op.name = new_name
1405
1406   def Exec(self, feedback_fn):
1407     """Rename the cluster.
1408
1409     """
1410     clustername = self.op.name
1411     ip = self.ip
1412
1413     # shutdown the master IP
1414     master = self.cfg.GetMasterNode()
1415     result = self.rpc.call_node_stop_master(master, False)
1416     result.Raise("Could not disable the master role")
1417
1418     try:
1419       cluster = self.cfg.GetClusterInfo()
1420       cluster.cluster_name = clustername
1421       cluster.master_ip = ip
1422       self.cfg.Update(cluster)
1423
1424       # update the known hosts file
1425       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1426       node_list = self.cfg.GetNodeList()
1427       try:
1428         node_list.remove(master)
1429       except ValueError:
1430         pass
1431       result = self.rpc.call_upload_file(node_list,
1432                                          constants.SSH_KNOWN_HOSTS_FILE)
1433       for to_node, to_result in result.iteritems():
1434         msg = to_result.fail_msg
1435         if msg:
1436           msg = ("Copy of file %s to node %s failed: %s" %
1437                  (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1438           self.proc.LogWarning(msg)
1439
1440     finally:
1441       result = self.rpc.call_node_start_master(master, False)
1442       msg = result.fail_msg
1443       if msg:
1444         self.LogWarning("Could not re-enable the master role on"
1445                         " the master, please restart manually: %s", msg)
1446
1447
1448 def _RecursiveCheckIfLVMBased(disk):
1449   """Check if the given disk or its children are lvm-based.
1450
1451   @type disk: L{objects.Disk}
1452   @param disk: the disk to check
1453   @rtype: booleean
1454   @return: boolean indicating whether a LD_LV dev_type was found or not
1455
1456   """
1457   if disk.children:
1458     for chdisk in disk.children:
1459       if _RecursiveCheckIfLVMBased(chdisk):
1460         return True
1461   return disk.dev_type == constants.LD_LV
1462
1463
1464 class LUSetClusterParams(LogicalUnit):
1465   """Change the parameters of the cluster.
1466
1467   """
1468   HPATH = "cluster-modify"
1469   HTYPE = constants.HTYPE_CLUSTER
1470   _OP_REQP = []
1471   REQ_BGL = False
1472
1473   def CheckArguments(self):
1474     """Check parameters
1475
1476     """
1477     if not hasattr(self.op, "candidate_pool_size"):
1478       self.op.candidate_pool_size = None
1479     if self.op.candidate_pool_size is not None:
1480       try:
1481         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1482       except (ValueError, TypeError), err:
1483         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1484                                    str(err))
1485       if self.op.candidate_pool_size < 1:
1486         raise errors.OpPrereqError("At least one master candidate needed")
1487
1488   def ExpandNames(self):
1489     # FIXME: in the future maybe other cluster params won't require checking on
1490     # all nodes to be modified.
1491     self.needed_locks = {
1492       locking.LEVEL_NODE: locking.ALL_SET,
1493     }
1494     self.share_locks[locking.LEVEL_NODE] = 1
1495
1496   def BuildHooksEnv(self):
1497     """Build hooks env.
1498
1499     """
1500     env = {
1501       "OP_TARGET": self.cfg.GetClusterName(),
1502       "NEW_VG_NAME": self.op.vg_name,
1503       }
1504     mn = self.cfg.GetMasterNode()
1505     return env, [mn], [mn]
1506
1507   def CheckPrereq(self):
1508     """Check prerequisites.
1509
1510     This checks whether the given params don't conflict and
1511     if the given volume group is valid.
1512
1513     """
1514     if self.op.vg_name is not None and not self.op.vg_name:
1515       instances = self.cfg.GetAllInstancesInfo().values()
1516       for inst in instances:
1517         for disk in inst.disks:
1518           if _RecursiveCheckIfLVMBased(disk):
1519             raise errors.OpPrereqError("Cannot disable lvm storage while"
1520                                        " lvm-based instances exist")
1521
1522     node_list = self.acquired_locks[locking.LEVEL_NODE]
1523
1524     # if vg_name not None, checks given volume group on all nodes
1525     if self.op.vg_name:
1526       vglist = self.rpc.call_vg_list(node_list)
1527       for node in node_list:
1528         msg = vglist[node].fail_msg
1529         if msg:
1530           # ignoring down node
1531           self.LogWarning("Error while gathering data on node %s"
1532                           " (ignoring node): %s", node, msg)
1533           continue
1534         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1535                                               self.op.vg_name,
1536                                               constants.MIN_VG_SIZE)
1537         if vgstatus:
1538           raise errors.OpPrereqError("Error on node '%s': %s" %
1539                                      (node, vgstatus))
1540
1541     self.cluster = cluster = self.cfg.GetClusterInfo()
1542     # validate params changes
1543     if self.op.beparams:
1544       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1545       self.new_beparams = objects.FillDict(
1546         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1547
1548     if self.op.nicparams:
1549       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1550       self.new_nicparams = objects.FillDict(
1551         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1552       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1553
1554     # hypervisor list/parameters
1555     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1556     if self.op.hvparams:
1557       if not isinstance(self.op.hvparams, dict):
1558         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1559       for hv_name, hv_dict in self.op.hvparams.items():
1560         if hv_name not in self.new_hvparams:
1561           self.new_hvparams[hv_name] = hv_dict
1562         else:
1563           self.new_hvparams[hv_name].update(hv_dict)
1564
1565     if self.op.enabled_hypervisors is not None:
1566       self.hv_list = self.op.enabled_hypervisors
1567     else:
1568       self.hv_list = cluster.enabled_hypervisors
1569
1570     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1571       # either the enabled list has changed, or the parameters have, validate
1572       for hv_name, hv_params in self.new_hvparams.items():
1573         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1574             (self.op.enabled_hypervisors and
1575              hv_name in self.op.enabled_hypervisors)):
1576           # either this is a new hypervisor, or its parameters have changed
1577           hv_class = hypervisor.GetHypervisor(hv_name)
1578           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1579           hv_class.CheckParameterSyntax(hv_params)
1580           _CheckHVParams(self, node_list, hv_name, hv_params)
1581
1582   def Exec(self, feedback_fn):
1583     """Change the parameters of the cluster.
1584
1585     """
1586     if self.op.vg_name is not None:
1587       new_volume = self.op.vg_name
1588       if not new_volume:
1589         new_volume = None
1590       if new_volume != self.cfg.GetVGName():
1591         self.cfg.SetVGName(new_volume)
1592       else:
1593         feedback_fn("Cluster LVM configuration already in desired"
1594                     " state, not changing")
1595     if self.op.hvparams:
1596       self.cluster.hvparams = self.new_hvparams
1597     if self.op.enabled_hypervisors is not None:
1598       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1599     if self.op.beparams:
1600       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1601     if self.op.nicparams:
1602       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1603
1604     if self.op.candidate_pool_size is not None:
1605       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1606
1607     self.cfg.Update(self.cluster)
1608
1609     # we want to update nodes after the cluster so that if any errors
1610     # happen, we have recorded and saved the cluster info
1611     if self.op.candidate_pool_size is not None:
1612       _AdjustCandidatePool(self)
1613
1614
1615 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1616   """Distribute additional files which are part of the cluster configuration.
1617
1618   ConfigWriter takes care of distributing the config and ssconf files, but
1619   there are more files which should be distributed to all nodes. This function
1620   makes sure those are copied.
1621
1622   @param lu: calling logical unit
1623   @param additional_nodes: list of nodes not in the config to distribute to
1624
1625   """
1626   # 1. Gather target nodes
1627   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1628   dist_nodes = lu.cfg.GetNodeList()
1629   if additional_nodes is not None:
1630     dist_nodes.extend(additional_nodes)
1631   if myself.name in dist_nodes:
1632     dist_nodes.remove(myself.name)
1633   # 2. Gather files to distribute
1634   dist_files = set([constants.ETC_HOSTS,
1635                     constants.SSH_KNOWN_HOSTS_FILE,
1636                     constants.RAPI_CERT_FILE,
1637                     constants.RAPI_USERS_FILE,
1638                    ])
1639
1640   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1641   for hv_name in enabled_hypervisors:
1642     hv_class = hypervisor.GetHypervisor(hv_name)
1643     dist_files.update(hv_class.GetAncillaryFiles())
1644
1645   # 3. Perform the files upload
1646   for fname in dist_files:
1647     if os.path.exists(fname):
1648       result = lu.rpc.call_upload_file(dist_nodes, fname)
1649       for to_node, to_result in result.items():
1650         msg = to_result.fail_msg
1651         if msg:
1652           msg = ("Copy of file %s to node %s failed: %s" %
1653                  (fname, to_node, msg))
1654           lu.proc.LogWarning(msg)
1655
1656
1657 class LURedistributeConfig(NoHooksLU):
1658   """Force the redistribution of cluster configuration.
1659
1660   This is a very simple LU.
1661
1662   """
1663   _OP_REQP = []
1664   REQ_BGL = False
1665
1666   def ExpandNames(self):
1667     self.needed_locks = {
1668       locking.LEVEL_NODE: locking.ALL_SET,
1669     }
1670     self.share_locks[locking.LEVEL_NODE] = 1
1671
1672   def CheckPrereq(self):
1673     """Check prerequisites.
1674
1675     """
1676
1677   def Exec(self, feedback_fn):
1678     """Redistribute the configuration.
1679
1680     """
1681     self.cfg.Update(self.cfg.GetClusterInfo())
1682     _RedistributeAncillaryFiles(self)
1683
1684
1685 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1686   """Sleep and poll for an instance's disk to sync.
1687
1688   """
1689   if not instance.disks:
1690     return True
1691
1692   if not oneshot:
1693     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1694
1695   node = instance.primary_node
1696
1697   for dev in instance.disks:
1698     lu.cfg.SetDiskID(dev, node)
1699
1700   retries = 0
1701   degr_retries = 10 # in seconds, as we sleep 1 second each time
1702   while True:
1703     max_time = 0
1704     done = True
1705     cumul_degraded = False
1706     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1707     msg = rstats.fail_msg
1708     if msg:
1709       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1710       retries += 1
1711       if retries >= 10:
1712         raise errors.RemoteError("Can't contact node %s for mirror data,"
1713                                  " aborting." % node)
1714       time.sleep(6)
1715       continue
1716     rstats = rstats.payload
1717     retries = 0
1718     for i, mstat in enumerate(rstats):
1719       if mstat is None:
1720         lu.LogWarning("Can't compute data for node %s/%s",
1721                            node, instance.disks[i].iv_name)
1722         continue
1723       # we ignore the ldisk parameter
1724       perc_done, est_time, is_degraded, _ = mstat
1725       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1726       if perc_done is not None:
1727         done = False
1728         if est_time is not None:
1729           rem_time = "%d estimated seconds remaining" % est_time
1730           max_time = est_time
1731         else:
1732           rem_time = "no time estimate"
1733         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1734                         (instance.disks[i].iv_name, perc_done, rem_time))
1735
1736     # if we're done but degraded, let's do a few small retries, to
1737     # make sure we see a stable and not transient situation; therefore
1738     # we force restart of the loop
1739     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1740       logging.info("Degraded disks found, %d retries left", degr_retries)
1741       degr_retries -= 1
1742       time.sleep(1)
1743       continue
1744
1745     if done or oneshot:
1746       break
1747
1748     time.sleep(min(60, max_time))
1749
1750   if done:
1751     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1752   return not cumul_degraded
1753
1754
1755 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1756   """Check that mirrors are not degraded.
1757
1758   The ldisk parameter, if True, will change the test from the
1759   is_degraded attribute (which represents overall non-ok status for
1760   the device(s)) to the ldisk (representing the local storage status).
1761
1762   """
1763   lu.cfg.SetDiskID(dev, node)
1764   if ldisk:
1765     idx = 6
1766   else:
1767     idx = 5
1768
1769   result = True
1770   if on_primary or dev.AssembleOnSecondary():
1771     rstats = lu.rpc.call_blockdev_find(node, dev)
1772     msg = rstats.fail_msg
1773     if msg:
1774       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1775       result = False
1776     elif not rstats.payload:
1777       lu.LogWarning("Can't find disk on node %s", node)
1778       result = False
1779     else:
1780       result = result and (not rstats.payload[idx])
1781   if dev.children:
1782     for child in dev.children:
1783       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1784
1785   return result
1786
1787
1788 class LUDiagnoseOS(NoHooksLU):
1789   """Logical unit for OS diagnose/query.
1790
1791   """
1792   _OP_REQP = ["output_fields", "names"]
1793   REQ_BGL = False
1794   _FIELDS_STATIC = utils.FieldSet()
1795   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1796
1797   def ExpandNames(self):
1798     if self.op.names:
1799       raise errors.OpPrereqError("Selective OS query not supported")
1800
1801     _CheckOutputFields(static=self._FIELDS_STATIC,
1802                        dynamic=self._FIELDS_DYNAMIC,
1803                        selected=self.op.output_fields)
1804
1805     # Lock all nodes, in shared mode
1806     # Temporary removal of locks, should be reverted later
1807     # TODO: reintroduce locks when they are lighter-weight
1808     self.needed_locks = {}
1809     #self.share_locks[locking.LEVEL_NODE] = 1
1810     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1811
1812   def CheckPrereq(self):
1813     """Check prerequisites.
1814
1815     """
1816
1817   @staticmethod
1818   def _DiagnoseByOS(node_list, rlist):
1819     """Remaps a per-node return list into an a per-os per-node dictionary
1820
1821     @param node_list: a list with the names of all nodes
1822     @param rlist: a map with node names as keys and OS objects as values
1823
1824     @rtype: dict
1825     @return: a dictionary with osnames as keys and as value another map, with
1826         nodes as keys and tuples of (path, status, diagnose) as values, eg::
1827
1828           {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1829                                      (/srv/..., False, "invalid api")],
1830                            "node2": [(/srv/..., True, "")]}
1831           }
1832
1833     """
1834     all_os = {}
1835     # we build here the list of nodes that didn't fail the RPC (at RPC
1836     # level), so that nodes with a non-responding node daemon don't
1837     # make all OSes invalid
1838     good_nodes = [node_name for node_name in rlist
1839                   if not rlist[node_name].fail_msg]
1840     for node_name, nr in rlist.items():
1841       if nr.fail_msg or not nr.payload:
1842         continue
1843       for name, path, status, diagnose in nr.payload:
1844         if name not in all_os:
1845           # build a list of nodes for this os containing empty lists
1846           # for each node in node_list
1847           all_os[name] = {}
1848           for nname in good_nodes:
1849             all_os[name][nname] = []
1850         all_os[name][node_name].append((path, status, diagnose))
1851     return all_os
1852
1853   def Exec(self, feedback_fn):
1854     """Compute the list of OSes.
1855
1856     """
1857     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1858     node_data = self.rpc.call_os_diagnose(valid_nodes)
1859     pol = self._DiagnoseByOS(valid_nodes, node_data)
1860     output = []
1861     for os_name, os_data in pol.items():
1862       row = []
1863       for field in self.op.output_fields:
1864         if field == "name":
1865           val = os_name
1866         elif field == "valid":
1867           val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1868         elif field == "node_status":
1869           # this is just a copy of the dict
1870           val = {}
1871           for node_name, nos_list in os_data.items():
1872             val[node_name] = nos_list
1873         else:
1874           raise errors.ParameterError(field)
1875         row.append(val)
1876       output.append(row)
1877
1878     return output
1879
1880
1881 class LURemoveNode(LogicalUnit):
1882   """Logical unit for removing a node.
1883
1884   """
1885   HPATH = "node-remove"
1886   HTYPE = constants.HTYPE_NODE
1887   _OP_REQP = ["node_name"]
1888
1889   def BuildHooksEnv(self):
1890     """Build hooks env.
1891
1892     This doesn't run on the target node in the pre phase as a failed
1893     node would then be impossible to remove.
1894
1895     """
1896     env = {
1897       "OP_TARGET": self.op.node_name,
1898       "NODE_NAME": self.op.node_name,
1899       }
1900     all_nodes = self.cfg.GetNodeList()
1901     all_nodes.remove(self.op.node_name)
1902     return env, all_nodes, all_nodes
1903
1904   def CheckPrereq(self):
1905     """Check prerequisites.
1906
1907     This checks:
1908      - the node exists in the configuration
1909      - it does not have primary or secondary instances
1910      - it's not the master
1911
1912     Any errors are signalled by raising errors.OpPrereqError.
1913
1914     """
1915     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1916     if node is None:
1917       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1918
1919     instance_list = self.cfg.GetInstanceList()
1920
1921     masternode = self.cfg.GetMasterNode()
1922     if node.name == masternode:
1923       raise errors.OpPrereqError("Node is the master node,"
1924                                  " you need to failover first.")
1925
1926     for instance_name in instance_list:
1927       instance = self.cfg.GetInstanceInfo(instance_name)
1928       if node.name in instance.all_nodes:
1929         raise errors.OpPrereqError("Instance %s is still running on the node,"
1930                                    " please remove first." % instance_name)
1931     self.op.node_name = node.name
1932     self.node = node
1933
1934   def Exec(self, feedback_fn):
1935     """Removes the node from the cluster.
1936
1937     """
1938     node = self.node
1939     logging.info("Stopping the node daemon and removing configs from node %s",
1940                  node.name)
1941
1942     self.context.RemoveNode(node.name)
1943
1944     result = self.rpc.call_node_leave_cluster(node.name)
1945     msg = result.fail_msg
1946     if msg:
1947       self.LogWarning("Errors encountered on the remote node while leaving"
1948                       " the cluster: %s", msg)
1949
1950     # Promote nodes to master candidate as needed
1951     _AdjustCandidatePool(self)
1952
1953
1954 class LUQueryNodes(NoHooksLU):
1955   """Logical unit for querying nodes.
1956
1957   """
1958   _OP_REQP = ["output_fields", "names", "use_locking"]
1959   REQ_BGL = False
1960   _FIELDS_DYNAMIC = utils.FieldSet(
1961     "dtotal", "dfree",
1962     "mtotal", "mnode", "mfree",
1963     "bootid",
1964     "ctotal", "cnodes", "csockets",
1965     )
1966
1967   _FIELDS_STATIC = utils.FieldSet(
1968     "name", "pinst_cnt", "sinst_cnt",
1969     "pinst_list", "sinst_list",
1970     "pip", "sip", "tags",
1971     "serial_no",
1972     "master_candidate",
1973     "master",
1974     "offline",
1975     "drained",
1976     )
1977
1978   def ExpandNames(self):
1979     _CheckOutputFields(static=self._FIELDS_STATIC,
1980                        dynamic=self._FIELDS_DYNAMIC,
1981                        selected=self.op.output_fields)
1982
1983     self.needed_locks = {}
1984     self.share_locks[locking.LEVEL_NODE] = 1
1985
1986     if self.op.names:
1987       self.wanted = _GetWantedNodes(self, self.op.names)
1988     else:
1989       self.wanted = locking.ALL_SET
1990
1991     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1992     self.do_locking = self.do_node_query and self.op.use_locking
1993     if self.do_locking:
1994       # if we don't request only static fields, we need to lock the nodes
1995       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1996
1997
1998   def CheckPrereq(self):
1999     """Check prerequisites.
2000
2001     """
2002     # The validation of the node list is done in the _GetWantedNodes,
2003     # if non empty, and if empty, there's no validation to do
2004     pass
2005
2006   def Exec(self, feedback_fn):
2007     """Computes the list of nodes and their attributes.
2008
2009     """
2010     all_info = self.cfg.GetAllNodesInfo()
2011     if self.do_locking:
2012       nodenames = self.acquired_locks[locking.LEVEL_NODE]
2013     elif self.wanted != locking.ALL_SET:
2014       nodenames = self.wanted
2015       missing = set(nodenames).difference(all_info.keys())
2016       if missing:
2017         raise errors.OpExecError(
2018           "Some nodes were removed before retrieving their data: %s" % missing)
2019     else:
2020       nodenames = all_info.keys()
2021
2022     nodenames = utils.NiceSort(nodenames)
2023     nodelist = [all_info[name] for name in nodenames]
2024
2025     # begin data gathering
2026
2027     if self.do_node_query:
2028       live_data = {}
2029       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2030                                           self.cfg.GetHypervisorType())
2031       for name in nodenames:
2032         nodeinfo = node_data[name]
2033         if not nodeinfo.fail_msg and nodeinfo.payload:
2034           nodeinfo = nodeinfo.payload
2035           fn = utils.TryConvert
2036           live_data[name] = {
2037             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2038             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2039             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2040             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2041             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2042             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2043             "bootid": nodeinfo.get('bootid', None),
2044             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2045             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2046             }
2047         else:
2048           live_data[name] = {}
2049     else:
2050       live_data = dict.fromkeys(nodenames, {})
2051
2052     node_to_primary = dict([(name, set()) for name in nodenames])
2053     node_to_secondary = dict([(name, set()) for name in nodenames])
2054
2055     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2056                              "sinst_cnt", "sinst_list"))
2057     if inst_fields & frozenset(self.op.output_fields):
2058       instancelist = self.cfg.GetInstanceList()
2059
2060       for instance_name in instancelist:
2061         inst = self.cfg.GetInstanceInfo(instance_name)
2062         if inst.primary_node in node_to_primary:
2063           node_to_primary[inst.primary_node].add(inst.name)
2064         for secnode in inst.secondary_nodes:
2065           if secnode in node_to_secondary:
2066             node_to_secondary[secnode].add(inst.name)
2067
2068     master_node = self.cfg.GetMasterNode()
2069
2070     # end data gathering
2071
2072     output = []
2073     for node in nodelist:
2074       node_output = []
2075       for field in self.op.output_fields:
2076         if field == "name":
2077           val = node.name
2078         elif field == "pinst_list":
2079           val = list(node_to_primary[node.name])
2080         elif field == "sinst_list":
2081           val = list(node_to_secondary[node.name])
2082         elif field == "pinst_cnt":
2083           val = len(node_to_primary[node.name])
2084         elif field == "sinst_cnt":
2085           val = len(node_to_secondary[node.name])
2086         elif field == "pip":
2087           val = node.primary_ip
2088         elif field == "sip":
2089           val = node.secondary_ip
2090         elif field == "tags":
2091           val = list(node.GetTags())
2092         elif field == "serial_no":
2093           val = node.serial_no
2094         elif field == "master_candidate":
2095           val = node.master_candidate
2096         elif field == "master":
2097           val = node.name == master_node
2098         elif field == "offline":
2099           val = node.offline
2100         elif field == "drained":
2101           val = node.drained
2102         elif self._FIELDS_DYNAMIC.Matches(field):
2103           val = live_data[node.name].get(field, None)
2104         else:
2105           raise errors.ParameterError(field)
2106         node_output.append(val)
2107       output.append(node_output)
2108
2109     return output
2110
2111
2112 class LUQueryNodeVolumes(NoHooksLU):
2113   """Logical unit for getting volumes on node(s).
2114
2115   """
2116   _OP_REQP = ["nodes", "output_fields"]
2117   REQ_BGL = False
2118   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2119   _FIELDS_STATIC = utils.FieldSet("node")
2120
2121   def ExpandNames(self):
2122     _CheckOutputFields(static=self._FIELDS_STATIC,
2123                        dynamic=self._FIELDS_DYNAMIC,
2124                        selected=self.op.output_fields)
2125
2126     self.needed_locks = {}
2127     self.share_locks[locking.LEVEL_NODE] = 1
2128     if not self.op.nodes:
2129       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2130     else:
2131       self.needed_locks[locking.LEVEL_NODE] = \
2132         _GetWantedNodes(self, self.op.nodes)
2133
2134   def CheckPrereq(self):
2135     """Check prerequisites.
2136
2137     This checks that the fields required are valid output fields.
2138
2139     """
2140     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2141
2142   def Exec(self, feedback_fn):
2143     """Computes the list of nodes and their attributes.
2144
2145     """
2146     nodenames = self.nodes
2147     volumes = self.rpc.call_node_volumes(nodenames)
2148
2149     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2150              in self.cfg.GetInstanceList()]
2151
2152     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2153
2154     output = []
2155     for node in nodenames:
2156       nresult = volumes[node]
2157       if nresult.offline:
2158         continue
2159       msg = nresult.fail_msg
2160       if msg:
2161         self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2162         continue
2163
2164       node_vols = nresult.payload[:]
2165       node_vols.sort(key=lambda vol: vol['dev'])
2166
2167       for vol in node_vols:
2168         node_output = []
2169         for field in self.op.output_fields:
2170           if field == "node":
2171             val = node
2172           elif field == "phys":
2173             val = vol['dev']
2174           elif field == "vg":
2175             val = vol['vg']
2176           elif field == "name":
2177             val = vol['name']
2178           elif field == "size":
2179             val = int(float(vol['size']))
2180           elif field == "instance":
2181             for inst in ilist:
2182               if node not in lv_by_node[inst]:
2183                 continue
2184               if vol['name'] in lv_by_node[inst][node]:
2185                 val = inst.name
2186                 break
2187             else:
2188               val = '-'
2189           else:
2190             raise errors.ParameterError(field)
2191           node_output.append(str(val))
2192
2193         output.append(node_output)
2194
2195     return output
2196
2197
2198 class LUAddNode(LogicalUnit):
2199   """Logical unit for adding node to the cluster.
2200
2201   """
2202   HPATH = "node-add"
2203   HTYPE = constants.HTYPE_NODE
2204   _OP_REQP = ["node_name"]
2205
2206   def BuildHooksEnv(self):
2207     """Build hooks env.
2208
2209     This will run on all nodes before, and on all nodes + the new node after.
2210
2211     """
2212     env = {
2213       "OP_TARGET": self.op.node_name,
2214       "NODE_NAME": self.op.node_name,
2215       "NODE_PIP": self.op.primary_ip,
2216       "NODE_SIP": self.op.secondary_ip,
2217       }
2218     nodes_0 = self.cfg.GetNodeList()
2219     nodes_1 = nodes_0 + [self.op.node_name, ]
2220     return env, nodes_0, nodes_1
2221
2222   def CheckPrereq(self):
2223     """Check prerequisites.
2224
2225     This checks:
2226      - the new node is not already in the config
2227      - it is resolvable
2228      - its parameters (single/dual homed) matches the cluster
2229
2230     Any errors are signalled by raising errors.OpPrereqError.
2231
2232     """
2233     node_name = self.op.node_name
2234     cfg = self.cfg
2235
2236     dns_data = utils.HostInfo(node_name)
2237
2238     node = dns_data.name
2239     primary_ip = self.op.primary_ip = dns_data.ip
2240     secondary_ip = getattr(self.op, "secondary_ip", None)
2241     if secondary_ip is None:
2242       secondary_ip = primary_ip
2243     if not utils.IsValidIP(secondary_ip):
2244       raise errors.OpPrereqError("Invalid secondary IP given")
2245     self.op.secondary_ip = secondary_ip
2246
2247     node_list = cfg.GetNodeList()
2248     if not self.op.readd and node in node_list:
2249       raise errors.OpPrereqError("Node %s is already in the configuration" %
2250                                  node)
2251     elif self.op.readd and node not in node_list:
2252       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2253
2254     for existing_node_name in node_list:
2255       existing_node = cfg.GetNodeInfo(existing_node_name)
2256
2257       if self.op.readd and node == existing_node_name:
2258         if (existing_node.primary_ip != primary_ip or
2259             existing_node.secondary_ip != secondary_ip):
2260           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2261                                      " address configuration as before")
2262         continue
2263
2264       if (existing_node.primary_ip == primary_ip or
2265           existing_node.secondary_ip == primary_ip or
2266           existing_node.primary_ip == secondary_ip or
2267           existing_node.secondary_ip == secondary_ip):
2268         raise errors.OpPrereqError("New node ip address(es) conflict with"
2269                                    " existing node %s" % existing_node.name)
2270
2271     # check that the type of the node (single versus dual homed) is the
2272     # same as for the master
2273     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2274     master_singlehomed = myself.secondary_ip == myself.primary_ip
2275     newbie_singlehomed = secondary_ip == primary_ip
2276     if master_singlehomed != newbie_singlehomed:
2277       if master_singlehomed:
2278         raise errors.OpPrereqError("The master has no private ip but the"
2279                                    " new node has one")
2280       else:
2281         raise errors.OpPrereqError("The master has a private ip but the"
2282                                    " new node doesn't have one")
2283
2284     # checks reachablity
2285     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2286       raise errors.OpPrereqError("Node not reachable by ping")
2287
2288     if not newbie_singlehomed:
2289       # check reachability from my secondary ip to newbie's secondary ip
2290       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2291                            source=myself.secondary_ip):
2292         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2293                                    " based ping to noded port")
2294
2295     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2296     mc_now, _ = self.cfg.GetMasterCandidateStats()
2297     master_candidate = mc_now < cp_size
2298
2299     self.new_node = objects.Node(name=node,
2300                                  primary_ip=primary_ip,
2301                                  secondary_ip=secondary_ip,
2302                                  master_candidate=master_candidate,
2303                                  offline=False, drained=False)
2304
2305   def Exec(self, feedback_fn):
2306     """Adds the new node to the cluster.
2307
2308     """
2309     new_node = self.new_node
2310     node = new_node.name
2311
2312     # check connectivity
2313     result = self.rpc.call_version([node])[node]
2314     result.Raise("Can't get version information from node %s" % node)
2315     if constants.PROTOCOL_VERSION == result.payload:
2316       logging.info("Communication to node %s fine, sw version %s match",
2317                    node, result.payload)
2318     else:
2319       raise errors.OpExecError("Version mismatch master version %s,"
2320                                " node version %s" %
2321                                (constants.PROTOCOL_VERSION, result.payload))
2322
2323     # setup ssh on node
2324     logging.info("Copy ssh key to node %s", node)
2325     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2326     keyarray = []
2327     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2328                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2329                 priv_key, pub_key]
2330
2331     for i in keyfiles:
2332       f = open(i, 'r')
2333       try:
2334         keyarray.append(f.read())
2335       finally:
2336         f.close()
2337
2338     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2339                                     keyarray[2],
2340                                     keyarray[3], keyarray[4], keyarray[5])
2341     result.Raise("Cannot transfer ssh keys to the new node")
2342
2343     # Add node to our /etc/hosts, and add key to known_hosts
2344     if self.cfg.GetClusterInfo().modify_etc_hosts:
2345       utils.AddHostToEtcHosts(new_node.name)
2346
2347     if new_node.secondary_ip != new_node.primary_ip:
2348       result = self.rpc.call_node_has_ip_address(new_node.name,
2349                                                  new_node.secondary_ip)
2350       result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2351                    prereq=True)
2352       if not result.payload:
2353         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2354                                  " you gave (%s). Please fix and re-run this"
2355                                  " command." % new_node.secondary_ip)
2356
2357     node_verify_list = [self.cfg.GetMasterNode()]
2358     node_verify_param = {
2359       'nodelist': [node],
2360       # TODO: do a node-net-test as well?
2361     }
2362
2363     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2364                                        self.cfg.GetClusterName())
2365     for verifier in node_verify_list:
2366       result[verifier].Raise("Cannot communicate with node %s" % verifier)
2367       nl_payload = result[verifier].payload['nodelist']
2368       if nl_payload:
2369         for failed in nl_payload:
2370           feedback_fn("ssh/hostname verification failed %s -> %s" %
2371                       (verifier, nl_payload[failed]))
2372         raise errors.OpExecError("ssh/hostname verification failed.")
2373
2374     if self.op.readd:
2375       _RedistributeAncillaryFiles(self)
2376       self.context.ReaddNode(new_node)
2377     else:
2378       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2379       self.context.AddNode(new_node)
2380
2381
2382 class LUSetNodeParams(LogicalUnit):
2383   """Modifies the parameters of a node.
2384
2385   """
2386   HPATH = "node-modify"
2387   HTYPE = constants.HTYPE_NODE
2388   _OP_REQP = ["node_name"]
2389   REQ_BGL = False
2390
2391   def CheckArguments(self):
2392     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2393     if node_name is None:
2394       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2395     self.op.node_name = node_name
2396     _CheckBooleanOpField(self.op, 'master_candidate')
2397     _CheckBooleanOpField(self.op, 'offline')
2398     _CheckBooleanOpField(self.op, 'drained')
2399     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2400     if all_mods.count(None) == 3:
2401       raise errors.OpPrereqError("Please pass at least one modification")
2402     if all_mods.count(True) > 1:
2403       raise errors.OpPrereqError("Can't set the node into more than one"
2404                                  " state at the same time")
2405
2406   def ExpandNames(self):
2407     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2408
2409   def BuildHooksEnv(self):
2410     """Build hooks env.
2411
2412     This runs on the master node.
2413
2414     """
2415     env = {
2416       "OP_TARGET": self.op.node_name,
2417       "MASTER_CANDIDATE": str(self.op.master_candidate),
2418       "OFFLINE": str(self.op.offline),
2419       "DRAINED": str(self.op.drained),
2420       }
2421     nl = [self.cfg.GetMasterNode(),
2422           self.op.node_name]
2423     return env, nl, nl
2424
2425   def CheckPrereq(self):
2426     """Check prerequisites.
2427
2428     This only checks the instance list against the existing names.
2429
2430     """
2431     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2432
2433     if ((self.op.master_candidate == False or self.op.offline == True or
2434          self.op.drained == True) and node.master_candidate):
2435       # we will demote the node from master_candidate
2436       if self.op.node_name == self.cfg.GetMasterNode():
2437         raise errors.OpPrereqError("The master node has to be a"
2438                                    " master candidate, online and not drained")
2439       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2440       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2441       if num_candidates <= cp_size:
2442         msg = ("Not enough master candidates (desired"
2443                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2444         if self.op.force:
2445           self.LogWarning(msg)
2446         else:
2447           raise errors.OpPrereqError(msg)
2448
2449     if (self.op.master_candidate == True and
2450         ((node.offline and not self.op.offline == False) or
2451          (node.drained and not self.op.drained == False))):
2452       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2453                                  " to master_candidate" % node.name)
2454
2455     return
2456
2457   def Exec(self, feedback_fn):
2458     """Modifies a node.
2459
2460     """
2461     node = self.node
2462
2463     result = []
2464     changed_mc = False
2465
2466     if self.op.offline is not None:
2467       node.offline = self.op.offline
2468       result.append(("offline", str(self.op.offline)))
2469       if self.op.offline == True:
2470         if node.master_candidate:
2471           node.master_candidate = False
2472           changed_mc = True
2473           result.append(("master_candidate", "auto-demotion due to offline"))
2474         if node.drained:
2475           node.drained = False
2476           result.append(("drained", "clear drained status due to offline"))
2477
2478     if self.op.master_candidate is not None:
2479       node.master_candidate = self.op.master_candidate
2480       changed_mc = True
2481       result.append(("master_candidate", str(self.op.master_candidate)))
2482       if self.op.master_candidate == False:
2483         rrc = self.rpc.call_node_demote_from_mc(node.name)
2484         msg = rrc.fail_msg
2485         if msg:
2486           self.LogWarning("Node failed to demote itself: %s" % msg)
2487
2488     if self.op.drained is not None:
2489       node.drained = self.op.drained
2490       result.append(("drained", str(self.op.drained)))
2491       if self.op.drained == True:
2492         if node.master_candidate:
2493           node.master_candidate = False
2494           changed_mc = True
2495           result.append(("master_candidate", "auto-demotion due to drain"))
2496         if node.offline:
2497           node.offline = False
2498           result.append(("offline", "clear offline status due to drain"))
2499
2500     # this will trigger configuration file update, if needed
2501     self.cfg.Update(node)
2502     # this will trigger job queue propagation or cleanup
2503     if changed_mc:
2504       self.context.ReaddNode(node)
2505
2506     return result
2507
2508
2509 class LUPowercycleNode(NoHooksLU):
2510   """Powercycles a node.
2511
2512   """
2513   _OP_REQP = ["node_name", "force"]
2514   REQ_BGL = False
2515
2516   def CheckArguments(self):
2517     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2518     if node_name is None:
2519       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2520     self.op.node_name = node_name
2521     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2522       raise errors.OpPrereqError("The node is the master and the force"
2523                                  " parameter was not set")
2524
2525   def ExpandNames(self):
2526     """Locking for PowercycleNode.
2527
2528     This is a last-resource option and shouldn't block on other
2529     jobs. Therefore, we grab no locks.
2530
2531     """
2532     self.needed_locks = {}
2533
2534   def CheckPrereq(self):
2535     """Check prerequisites.
2536
2537     This LU has no prereqs.
2538
2539     """
2540     pass
2541
2542   def Exec(self, feedback_fn):
2543     """Reboots a node.
2544
2545     """
2546     result = self.rpc.call_node_powercycle(self.op.node_name,
2547                                            self.cfg.GetHypervisorType())
2548     result.Raise("Failed to schedule the reboot")
2549     return result.payload
2550
2551
2552 class LUQueryClusterInfo(NoHooksLU):
2553   """Query cluster configuration.
2554
2555   """
2556   _OP_REQP = []
2557   REQ_BGL = False
2558
2559   def ExpandNames(self):
2560     self.needed_locks = {}
2561
2562   def CheckPrereq(self):
2563     """No prerequsites needed for this LU.
2564
2565     """
2566     pass
2567
2568   def Exec(self, feedback_fn):
2569     """Return cluster config.
2570
2571     """
2572     cluster = self.cfg.GetClusterInfo()
2573     result = {
2574       "software_version": constants.RELEASE_VERSION,
2575       "protocol_version": constants.PROTOCOL_VERSION,
2576       "config_version": constants.CONFIG_VERSION,
2577       "os_api_version": constants.OS_API_VERSION,
2578       "export_version": constants.EXPORT_VERSION,
2579       "architecture": (platform.architecture()[0], platform.machine()),
2580       "name": cluster.cluster_name,
2581       "master": cluster.master_node,
2582       "default_hypervisor": cluster.default_hypervisor,
2583       "enabled_hypervisors": cluster.enabled_hypervisors,
2584       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2585                         for hypervisor in cluster.enabled_hypervisors]),
2586       "beparams": cluster.beparams,
2587       "nicparams": cluster.nicparams,
2588       "candidate_pool_size": cluster.candidate_pool_size,
2589       "master_netdev": cluster.master_netdev,
2590       "volume_group_name": cluster.volume_group_name,
2591       "file_storage_dir": cluster.file_storage_dir,
2592       }
2593
2594     return result
2595
2596
2597 class LUQueryConfigValues(NoHooksLU):
2598   """Return configuration values.
2599
2600   """
2601   _OP_REQP = []
2602   REQ_BGL = False
2603   _FIELDS_DYNAMIC = utils.FieldSet()
2604   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2605
2606   def ExpandNames(self):
2607     self.needed_locks = {}
2608
2609     _CheckOutputFields(static=self._FIELDS_STATIC,
2610                        dynamic=self._FIELDS_DYNAMIC,
2611                        selected=self.op.output_fields)
2612
2613   def CheckPrereq(self):
2614     """No prerequisites.
2615
2616     """
2617     pass
2618
2619   def Exec(self, feedback_fn):
2620     """Dump a representation of the cluster config to the standard output.
2621
2622     """
2623     values = []
2624     for field in self.op.output_fields:
2625       if field == "cluster_name":
2626         entry = self.cfg.GetClusterName()
2627       elif field == "master_node":
2628         entry = self.cfg.GetMasterNode()
2629       elif field == "drain_flag":
2630         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2631       else:
2632         raise errors.ParameterError(field)
2633       values.append(entry)
2634     return values
2635
2636
2637 class LUActivateInstanceDisks(NoHooksLU):
2638   """Bring up an instance's disks.
2639
2640   """
2641   _OP_REQP = ["instance_name"]
2642   REQ_BGL = False
2643
2644   def ExpandNames(self):
2645     self._ExpandAndLockInstance()
2646     self.needed_locks[locking.LEVEL_NODE] = []
2647     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2648
2649   def DeclareLocks(self, level):
2650     if level == locking.LEVEL_NODE:
2651       self._LockInstancesNodes()
2652
2653   def CheckPrereq(self):
2654     """Check prerequisites.
2655
2656     This checks that the instance is in the cluster.
2657
2658     """
2659     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2660     assert self.instance is not None, \
2661       "Cannot retrieve locked instance %s" % self.op.instance_name
2662     _CheckNodeOnline(self, self.instance.primary_node)
2663
2664   def Exec(self, feedback_fn):
2665     """Activate the disks.
2666
2667     """
2668     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2669     if not disks_ok:
2670       raise errors.OpExecError("Cannot activate block devices")
2671
2672     return disks_info
2673
2674
2675 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2676   """Prepare the block devices for an instance.
2677
2678   This sets up the block devices on all nodes.
2679
2680   @type lu: L{LogicalUnit}
2681   @param lu: the logical unit on whose behalf we execute
2682   @type instance: L{objects.Instance}
2683   @param instance: the instance for whose disks we assemble
2684   @type ignore_secondaries: boolean
2685   @param ignore_secondaries: if true, errors on secondary nodes
2686       won't result in an error return from the function
2687   @return: False if the operation failed, otherwise a list of
2688       (host, instance_visible_name, node_visible_name)
2689       with the mapping from node devices to instance devices
2690
2691   """
2692   device_info = []
2693   disks_ok = True
2694   iname = instance.name
2695   # With the two passes mechanism we try to reduce the window of
2696   # opportunity for the race condition of switching DRBD to primary
2697   # before handshaking occured, but we do not eliminate it
2698
2699   # The proper fix would be to wait (with some limits) until the
2700   # connection has been made and drbd transitions from WFConnection
2701   # into any other network-connected state (Connected, SyncTarget,
2702   # SyncSource, etc.)
2703
2704   # 1st pass, assemble on all nodes in secondary mode
2705   for inst_disk in instance.disks:
2706     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2707       lu.cfg.SetDiskID(node_disk, node)
2708       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2709       msg = result.fail_msg
2710       if msg:
2711         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2712                            " (is_primary=False, pass=1): %s",
2713                            inst_disk.iv_name, node, msg)
2714         if not ignore_secondaries:
2715           disks_ok = False
2716
2717   # FIXME: race condition on drbd migration to primary
2718
2719   # 2nd pass, do only the primary node
2720   for inst_disk in instance.disks:
2721     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2722       if node != instance.primary_node:
2723         continue
2724       lu.cfg.SetDiskID(node_disk, node)
2725       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2726       msg = result.fail_msg
2727       if msg:
2728         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2729                            " (is_primary=True, pass=2): %s",
2730                            inst_disk.iv_name, node, msg)
2731         disks_ok = False
2732     device_info.append((instance.primary_node, inst_disk.iv_name,
2733                         result.payload))
2734
2735   # leave the disks configured for the primary node
2736   # this is a workaround that would be fixed better by
2737   # improving the logical/physical id handling
2738   for disk in instance.disks:
2739     lu.cfg.SetDiskID(disk, instance.primary_node)
2740
2741   return disks_ok, device_info
2742
2743
2744 def _StartInstanceDisks(lu, instance, force):
2745   """Start the disks of an instance.
2746
2747   """
2748   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2749                                            ignore_secondaries=force)
2750   if not disks_ok:
2751     _ShutdownInstanceDisks(lu, instance)
2752     if force is not None and not force:
2753       lu.proc.LogWarning("", hint="If the message above refers to a"
2754                          " secondary node,"
2755                          " you can retry the operation using '--force'.")
2756     raise errors.OpExecError("Disk consistency error")
2757
2758
2759 class LUDeactivateInstanceDisks(NoHooksLU):
2760   """Shutdown an instance's disks.
2761
2762   """
2763   _OP_REQP = ["instance_name"]
2764   REQ_BGL = False
2765
2766   def ExpandNames(self):
2767     self._ExpandAndLockInstance()
2768     self.needed_locks[locking.LEVEL_NODE] = []
2769     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2770
2771   def DeclareLocks(self, level):
2772     if level == locking.LEVEL_NODE:
2773       self._LockInstancesNodes()
2774
2775   def CheckPrereq(self):
2776     """Check prerequisites.
2777
2778     This checks that the instance is in the cluster.
2779
2780     """
2781     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2782     assert self.instance is not None, \
2783       "Cannot retrieve locked instance %s" % self.op.instance_name
2784
2785   def Exec(self, feedback_fn):
2786     """Deactivate the disks
2787
2788     """
2789     instance = self.instance
2790     _SafeShutdownInstanceDisks(self, instance)
2791
2792
2793 def _SafeShutdownInstanceDisks(lu, instance):
2794   """Shutdown block devices of an instance.
2795
2796   This function checks if an instance is running, before calling
2797   _ShutdownInstanceDisks.
2798
2799   """
2800   pnode = instance.primary_node
2801   ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2802   ins_l.Raise("Can't contact node %s" % pnode)
2803
2804   if instance.name in ins_l.payload:
2805     raise errors.OpExecError("Instance is running, can't shutdown"
2806                              " block devices.")
2807
2808   _ShutdownInstanceDisks(lu, instance)
2809
2810
2811 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2812   """Shutdown block devices of an instance.
2813
2814   This does the shutdown on all nodes of the instance.
2815
2816   If the ignore_primary is false, errors on the primary node are
2817   ignored.
2818
2819   """
2820   all_result = True
2821   for disk in instance.disks:
2822     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2823       lu.cfg.SetDiskID(top_disk, node)
2824       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2825       msg = result.fail_msg
2826       if msg:
2827         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2828                       disk.iv_name, node, msg)
2829         if not ignore_primary or node != instance.primary_node:
2830           all_result = False
2831   return all_result
2832
2833
2834 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2835   """Checks if a node has enough free memory.
2836
2837   This function check if a given node has the needed amount of free
2838   memory. In case the node has less memory or we cannot get the
2839   information from the node, this function raise an OpPrereqError
2840   exception.
2841
2842   @type lu: C{LogicalUnit}
2843   @param lu: a logical unit from which we get configuration data
2844   @type node: C{str}
2845   @param node: the node to check
2846   @type reason: C{str}
2847   @param reason: string to use in the error message
2848   @type requested: C{int}
2849   @param requested: the amount of memory in MiB to check for
2850   @type hypervisor_name: C{str}
2851   @param hypervisor_name: the hypervisor to ask for memory stats
2852   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2853       we cannot check the node
2854
2855   """
2856   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2857   nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2858   free_mem = nodeinfo[node].payload.get('memory_free', None)
2859   if not isinstance(free_mem, int):
2860     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2861                                " was '%s'" % (node, free_mem))
2862   if requested > free_mem:
2863     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2864                                " needed %s MiB, available %s MiB" %
2865                                (node, reason, requested, free_mem))
2866
2867
2868 class LUStartupInstance(LogicalUnit):
2869   """Starts an instance.
2870
2871   """
2872   HPATH = "instance-start"
2873   HTYPE = constants.HTYPE_INSTANCE
2874   _OP_REQP = ["instance_name", "force"]
2875   REQ_BGL = False
2876
2877   def ExpandNames(self):
2878     self._ExpandAndLockInstance()
2879
2880   def BuildHooksEnv(self):
2881     """Build hooks env.
2882
2883     This runs on master, primary and secondary nodes of the instance.
2884
2885     """
2886     env = {
2887       "FORCE": self.op.force,
2888       }
2889     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2890     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2891     return env, nl, nl
2892
2893   def CheckPrereq(self):
2894     """Check prerequisites.
2895
2896     This checks that the instance is in the cluster.
2897
2898     """
2899     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2900     assert self.instance is not None, \
2901       "Cannot retrieve locked instance %s" % self.op.instance_name
2902
2903     # extra beparams
2904     self.beparams = getattr(self.op, "beparams", {})
2905     if self.beparams:
2906       if not isinstance(self.beparams, dict):
2907         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2908                                    " dict" % (type(self.beparams), ))
2909       # fill the beparams dict
2910       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2911       self.op.beparams = self.beparams
2912
2913     # extra hvparams
2914     self.hvparams = getattr(self.op, "hvparams", {})
2915     if self.hvparams:
2916       if not isinstance(self.hvparams, dict):
2917         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2918                                    " dict" % (type(self.hvparams), ))
2919
2920       # check hypervisor parameter syntax (locally)
2921       cluster = self.cfg.GetClusterInfo()
2922       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2923       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2924                                     instance.hvparams)
2925       filled_hvp.update(self.hvparams)
2926       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2927       hv_type.CheckParameterSyntax(filled_hvp)
2928       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2929       self.op.hvparams = self.hvparams
2930
2931     _CheckNodeOnline(self, instance.primary_node)
2932
2933     bep = self.cfg.GetClusterInfo().FillBE(instance)
2934     # check bridges existance
2935     _CheckInstanceBridgesExist(self, instance)
2936
2937     remote_info = self.rpc.call_instance_info(instance.primary_node,
2938                                               instance.name,
2939                                               instance.hypervisor)
2940     remote_info.Raise("Error checking node %s" % instance.primary_node,
2941                       prereq=True)
2942     if not remote_info.payload: # not running already
2943       _CheckNodeFreeMemory(self, instance.primary_node,
2944                            "starting instance %s" % instance.name,
2945                            bep[constants.BE_MEMORY], instance.hypervisor)
2946
2947   def Exec(self, feedback_fn):
2948     """Start the instance.
2949
2950     """
2951     instance = self.instance
2952     force = self.op.force
2953
2954     self.cfg.MarkInstanceUp(instance.name)
2955
2956     node_current = instance.primary_node
2957
2958     _StartInstanceDisks(self, instance, force)
2959
2960     result = self.rpc.call_instance_start(node_current, instance,
2961                                           self.hvparams, self.beparams)
2962     msg = result.fail_msg
2963     if msg:
2964       _ShutdownInstanceDisks(self, instance)
2965       raise errors.OpExecError("Could not start instance: %s" % msg)
2966
2967
2968 class LURebootInstance(LogicalUnit):
2969   """Reboot an instance.
2970
2971   """
2972   HPATH = "instance-reboot"
2973   HTYPE = constants.HTYPE_INSTANCE
2974   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2975   REQ_BGL = False
2976
2977   def ExpandNames(self):
2978     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2979                                    constants.INSTANCE_REBOOT_HARD,
2980                                    constants.INSTANCE_REBOOT_FULL]:
2981       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2982                                   (constants.INSTANCE_REBOOT_SOFT,
2983                                    constants.INSTANCE_REBOOT_HARD,
2984                                    constants.INSTANCE_REBOOT_FULL))
2985     self._ExpandAndLockInstance()
2986
2987   def BuildHooksEnv(self):
2988     """Build hooks env.
2989
2990     This runs on master, primary and secondary nodes of the instance.
2991
2992     """
2993     env = {
2994       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2995       "REBOOT_TYPE": self.op.reboot_type,
2996       }
2997     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2998     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2999     return env, nl, nl
3000
3001   def CheckPrereq(self):
3002     """Check prerequisites.
3003
3004     This checks that the instance is in the cluster.
3005
3006     """
3007     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3008     assert self.instance is not None, \
3009       "Cannot retrieve locked instance %s" % self.op.instance_name
3010
3011     _CheckNodeOnline(self, instance.primary_node)
3012
3013     # check bridges existance
3014     _CheckInstanceBridgesExist(self, instance)
3015
3016   def Exec(self, feedback_fn):
3017     """Reboot the instance.
3018
3019     """
3020     instance = self.instance
3021     ignore_secondaries = self.op.ignore_secondaries
3022     reboot_type = self.op.reboot_type
3023
3024     node_current = instance.primary_node
3025
3026     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3027                        constants.INSTANCE_REBOOT_HARD]:
3028       for disk in instance.disks:
3029         self.cfg.SetDiskID(disk, node_current)
3030       result = self.rpc.call_instance_reboot(node_current, instance,
3031                                              reboot_type)
3032       result.Raise("Could not reboot instance")
3033     else:
3034       result = self.rpc.call_instance_shutdown(node_current, instance)
3035       result.Raise("Could not shutdown instance for full reboot")
3036       _ShutdownInstanceDisks(self, instance)
3037       _StartInstanceDisks(self, instance, ignore_secondaries)
3038       result = self.rpc.call_instance_start(node_current, instance, None, None)
3039       msg = result.fail_msg
3040       if msg:
3041         _ShutdownInstanceDisks(self, instance)
3042         raise errors.OpExecError("Could not start instance for"
3043                                  " full reboot: %s" % msg)
3044
3045     self.cfg.MarkInstanceUp(instance.name)
3046
3047
3048 class LUShutdownInstance(LogicalUnit):
3049   """Shutdown an instance.
3050
3051   """
3052   HPATH = "instance-stop"
3053   HTYPE = constants.HTYPE_INSTANCE
3054   _OP_REQP = ["instance_name"]
3055   REQ_BGL = False
3056
3057   def ExpandNames(self):
3058     self._ExpandAndLockInstance()
3059
3060   def BuildHooksEnv(self):
3061     """Build hooks env.
3062
3063     This runs on master, primary and secondary nodes of the instance.
3064
3065     """
3066     env = _BuildInstanceHookEnvByObject(self, self.instance)
3067     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3068     return env, nl, nl
3069
3070   def CheckPrereq(self):
3071     """Check prerequisites.
3072
3073     This checks that the instance is in the cluster.
3074
3075     """
3076     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3077     assert self.instance is not None, \
3078       "Cannot retrieve locked instance %s" % self.op.instance_name
3079     _CheckNodeOnline(self, self.instance.primary_node)
3080
3081   def Exec(self, feedback_fn):
3082     """Shutdown the instance.
3083
3084     """
3085     instance = self.instance
3086     node_current = instance.primary_node
3087     self.cfg.MarkInstanceDown(instance.name)
3088     result = self.rpc.call_instance_shutdown(node_current, instance)
3089     msg = result.fail_msg
3090     if msg:
3091       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3092
3093     _ShutdownInstanceDisks(self, instance)
3094
3095
3096 class LUReinstallInstance(LogicalUnit):
3097   """Reinstall an instance.
3098
3099   """
3100   HPATH = "instance-reinstall"
3101   HTYPE = constants.HTYPE_INSTANCE
3102   _OP_REQP = ["instance_name"]
3103   REQ_BGL = False
3104
3105   def ExpandNames(self):
3106     self._ExpandAndLockInstance()
3107
3108   def BuildHooksEnv(self):
3109     """Build hooks env.
3110
3111     This runs on master, primary and secondary nodes of the instance.
3112
3113     """
3114     env = _BuildInstanceHookEnvByObject(self, self.instance)
3115     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3116     return env, nl, nl
3117
3118   def CheckPrereq(self):
3119     """Check prerequisites.
3120
3121     This checks that the instance is in the cluster and is not running.
3122
3123     """
3124     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3125     assert instance is not None, \
3126       "Cannot retrieve locked instance %s" % self.op.instance_name
3127     _CheckNodeOnline(self, instance.primary_node)
3128
3129     if instance.disk_template == constants.DT_DISKLESS:
3130       raise errors.OpPrereqError("Instance '%s' has no disks" %
3131                                  self.op.instance_name)
3132     if instance.admin_up:
3133       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3134                                  self.op.instance_name)
3135     remote_info = self.rpc.call_instance_info(instance.primary_node,
3136                                               instance.name,
3137                                               instance.hypervisor)
3138     remote_info.Raise("Error checking node %s" % instance.primary_node,
3139                       prereq=True)
3140     if remote_info.payload:
3141       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3142                                  (self.op.instance_name,
3143                                   instance.primary_node))
3144
3145     self.op.os_type = getattr(self.op, "os_type", None)
3146     if self.op.os_type is not None:
3147       # OS verification
3148       pnode = self.cfg.GetNodeInfo(
3149         self.cfg.ExpandNodeName(instance.primary_node))
3150       if pnode is None:
3151         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3152                                    self.op.pnode)
3153       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3154       result.Raise("OS '%s' not in supported OS list for primary node %s" %
3155                    (self.op.os_type, pnode.name), prereq=True)
3156
3157     self.instance = instance
3158
3159   def Exec(self, feedback_fn):
3160     """Reinstall the instance.
3161
3162     """
3163     inst = self.instance
3164
3165     if self.op.os_type is not None:
3166       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3167       inst.os = self.op.os_type
3168       self.cfg.Update(inst)
3169
3170     _StartInstanceDisks(self, inst, None)
3171     try:
3172       feedback_fn("Running the instance OS create scripts...")
3173       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3174       result.Raise("Could not install OS for instance %s on node %s" %
3175                    (inst.name, inst.primary_node))
3176     finally:
3177       _ShutdownInstanceDisks(self, inst)
3178
3179
3180 class LURenameInstance(LogicalUnit):
3181   """Rename an instance.
3182
3183   """
3184   HPATH = "instance-rename"
3185   HTYPE = constants.HTYPE_INSTANCE
3186   _OP_REQP = ["instance_name", "new_name"]
3187
3188   def BuildHooksEnv(self):
3189     """Build hooks env.
3190
3191     This runs on master, primary and secondary nodes of the instance.
3192
3193     """
3194     env = _BuildInstanceHookEnvByObject(self, self.instance)
3195     env["INSTANCE_NEW_NAME"] = self.op.new_name
3196     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3197     return env, nl, nl
3198
3199   def CheckPrereq(self):
3200     """Check prerequisites.
3201
3202     This checks that the instance is in the cluster and is not running.
3203
3204     """
3205     instance = self.cfg.GetInstanceInfo(
3206       self.cfg.ExpandInstanceName(self.op.instance_name))
3207     if instance is None:
3208       raise errors.OpPrereqError("Instance '%s' not known" %
3209                                  self.op.instance_name)
3210     _CheckNodeOnline(self, instance.primary_node)
3211
3212     if instance.admin_up:
3213       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3214                                  self.op.instance_name)
3215     remote_info = self.rpc.call_instance_info(instance.primary_node,
3216                                               instance.name,
3217                                               instance.hypervisor)
3218     remote_info.Raise("Error checking node %s" % instance.primary_node,
3219                       prereq=True)
3220     if remote_info.payload:
3221       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3222                                  (self.op.instance_name,
3223                                   instance.primary_node))
3224     self.instance = instance
3225
3226     # new name verification
3227     name_info = utils.HostInfo(self.op.new_name)
3228
3229     self.op.new_name = new_name = name_info.name
3230     instance_list = self.cfg.GetInstanceList()
3231     if new_name in instance_list:
3232       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3233                                  new_name)
3234
3235     if not getattr(self.op, "ignore_ip", False):
3236       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3237         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3238                                    (name_info.ip, new_name))
3239
3240
3241   def Exec(self, feedback_fn):
3242     """Reinstall the instance.
3243
3244     """
3245     inst = self.instance
3246     old_name = inst.name
3247
3248     if inst.disk_template == constants.DT_FILE:
3249       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3250
3251     self.cfg.RenameInstance(inst.name, self.op.new_name)
3252     # Change the instance lock. This is definitely safe while we hold the BGL
3253     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3254     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3255
3256     # re-read the instance from the configuration after rename
3257     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3258
3259     if inst.disk_template == constants.DT_FILE:
3260       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3261       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3262                                                      old_file_storage_dir,
3263                                                      new_file_storage_dir)
3264       result.Raise("Could not rename on node %s directory '%s' to '%s'"
3265                    " (but the instance has been renamed in Ganeti)" %
3266                    (inst.primary_node, old_file_storage_dir,
3267                     new_file_storage_dir))
3268
3269     _StartInstanceDisks(self, inst, None)
3270     try:
3271       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3272                                                  old_name)
3273       msg = result.fail_msg
3274       if msg:
3275         msg = ("Could not run OS rename script for instance %s on node %s"
3276                " (but the instance has been renamed in Ganeti): %s" %
3277                (inst.name, inst.primary_node, msg))
3278         self.proc.LogWarning(msg)
3279     finally:
3280       _ShutdownInstanceDisks(self, inst)
3281
3282
3283 class LURemoveInstance(LogicalUnit):
3284   """Remove an instance.
3285
3286   """
3287   HPATH = "instance-remove"
3288   HTYPE = constants.HTYPE_INSTANCE
3289   _OP_REQP = ["instance_name", "ignore_failures"]
3290   REQ_BGL = False
3291
3292   def ExpandNames(self):
3293     self._ExpandAndLockInstance()
3294     self.needed_locks[locking.LEVEL_NODE] = []
3295     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3296
3297   def DeclareLocks(self, level):
3298     if level == locking.LEVEL_NODE:
3299       self._LockInstancesNodes()
3300
3301   def BuildHooksEnv(self):
3302     """Build hooks env.
3303
3304     This runs on master, primary and secondary nodes of the instance.
3305
3306     """
3307     env = _BuildInstanceHookEnvByObject(self, self.instance)
3308     nl = [self.cfg.GetMasterNode()]
3309     return env, nl, nl
3310
3311   def CheckPrereq(self):
3312     """Check prerequisites.
3313
3314     This checks that the instance is in the cluster.
3315
3316     """
3317     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3318     assert self.instance is not None, \
3319       "Cannot retrieve locked instance %s" % self.op.instance_name
3320
3321   def Exec(self, feedback_fn):
3322     """Remove the instance.
3323
3324     """
3325     instance = self.instance
3326     logging.info("Shutting down instance %s on node %s",
3327                  instance.name, instance.primary_node)
3328
3329     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3330     msg = result.fail_msg
3331     if msg:
3332       if self.op.ignore_failures:
3333         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3334       else:
3335         raise errors.OpExecError("Could not shutdown instance %s on"
3336                                  " node %s: %s" %
3337                                  (instance.name, instance.primary_node, msg))
3338
3339     logging.info("Removing block devices for instance %s", instance.name)
3340
3341     if not _RemoveDisks(self, instance):
3342       if self.op.ignore_failures:
3343         feedback_fn("Warning: can't remove instance's disks")
3344       else:
3345         raise errors.OpExecError("Can't remove instance's disks")
3346
3347     logging.info("Removing instance %s out of cluster config", instance.name)
3348
3349     self.cfg.RemoveInstance(instance.name)
3350     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3351
3352
3353 class LUQueryInstances(NoHooksLU):
3354   """Logical unit for querying instances.
3355
3356   """
3357   _OP_REQP = ["output_fields", "names", "use_locking"]
3358   REQ_BGL = False
3359   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3360                                     "admin_state",
3361                                     "disk_template", "ip", "mac", "bridge",
3362                                     "sda_size", "sdb_size", "vcpus", "tags",
3363                                     "network_port", "beparams",
3364                                     r"(disk)\.(size)/([0-9]+)",
3365                                     r"(disk)\.(sizes)", "disk_usage",
3366                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3367                                     r"(nic)\.(macs|ips|bridges)",
3368                                     r"(disk|nic)\.(count)",
3369                                     "serial_no", "hypervisor", "hvparams",] +
3370                                   ["hv/%s" % name
3371                                    for name in constants.HVS_PARAMETERS] +
3372                                   ["be/%s" % name
3373                                    for name in constants.BES_PARAMETERS])
3374   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3375
3376
3377   def ExpandNames(self):
3378     _CheckOutputFields(static=self._FIELDS_STATIC,
3379                        dynamic=self._FIELDS_DYNAMIC,
3380                        selected=self.op.output_fields)
3381
3382     self.needed_locks = {}
3383     self.share_locks[locking.LEVEL_INSTANCE] = 1
3384     self.share_locks[locking.LEVEL_NODE] = 1
3385
3386     if self.op.names:
3387       self.wanted = _GetWantedInstances(self, self.op.names)
3388     else:
3389       self.wanted = locking.ALL_SET
3390
3391     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3392     self.do_locking = self.do_node_query and self.op.use_locking
3393     if self.do_locking:
3394       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3395       self.needed_locks[locking.LEVEL_NODE] = []
3396       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3397
3398   def DeclareLocks(self, level):
3399     if level == locking.LEVEL_NODE and self.do_locking:
3400       self._LockInstancesNodes()
3401
3402   def CheckPrereq(self):
3403     """Check prerequisites.
3404
3405     """
3406     pass
3407
3408   def Exec(self, feedback_fn):
3409     """Computes the list of nodes and their attributes.
3410
3411     """
3412     all_info = self.cfg.GetAllInstancesInfo()
3413     if self.wanted == locking.ALL_SET:
3414       # caller didn't specify instance names, so ordering is not important
3415       if self.do_locking:
3416         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3417       else:
3418         instance_names = all_info.keys()
3419       instance_names = utils.NiceSort(instance_names)
3420     else:
3421       # caller did specify names, so we must keep the ordering
3422       if self.do_locking:
3423         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3424       else:
3425         tgt_set = all_info.keys()
3426       missing = set(self.wanted).difference(tgt_set)
3427       if missing:
3428         raise errors.OpExecError("Some instances were removed before"
3429                                  " retrieving their data: %s" % missing)
3430       instance_names = self.wanted
3431
3432     instance_list = [all_info[iname] for iname in instance_names]
3433
3434     # begin data gathering
3435
3436     nodes = frozenset([inst.primary_node for inst in instance_list])
3437     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3438
3439     bad_nodes = []
3440     off_nodes = []
3441     if self.do_node_query:
3442       live_data = {}
3443       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3444       for name in nodes:
3445         result = node_data[name]
3446         if result.offline:
3447           # offline nodes will be in both lists
3448           off_nodes.append(name)
3449         if result.failed or result.fail_msg:
3450           bad_nodes.append(name)
3451         else:
3452           if result.payload:
3453             live_data.update(result.payload)
3454           # else no instance is alive
3455     else:
3456       live_data = dict([(name, {}) for name in instance_names])
3457
3458     # end data gathering
3459
3460     HVPREFIX = "hv/"
3461     BEPREFIX = "be/"
3462     output = []
3463     for instance in instance_list:
3464       iout = []
3465       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3466       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3467       for field in self.op.output_fields:
3468         st_match = self._FIELDS_STATIC.Matches(field)
3469         if field == "name":
3470           val = instance.name
3471         elif field == "os":
3472           val = instance.os
3473         elif field == "pnode":
3474           val = instance.primary_node
3475         elif field == "snodes":
3476           val = list(instance.secondary_nodes)
3477         elif field == "admin_state":
3478           val = instance.admin_up
3479         elif field == "oper_state":
3480           if instance.primary_node in bad_nodes:
3481             val = None
3482           else:
3483             val = bool(live_data.get(instance.name))
3484         elif field == "status":
3485           if instance.primary_node in off_nodes:
3486             val = "ERROR_nodeoffline"
3487           elif instance.primary_node in bad_nodes:
3488             val = "ERROR_nodedown"
3489           else:
3490             running = bool(live_data.get(instance.name))
3491             if running:
3492               if instance.admin_up:
3493                 val = "running"
3494               else:
3495                 val = "ERROR_up"
3496             else:
3497               if instance.admin_up:
3498                 val = "ERROR_down"
3499               else:
3500                 val = "ADMIN_down"
3501         elif field == "oper_ram":
3502           if instance.primary_node in bad_nodes:
3503             val = None
3504           elif instance.name in live_data:
3505             val = live_data[instance.name].get("memory", "?")
3506           else:
3507             val = "-"
3508         elif field == "disk_template":
3509           val = instance.disk_template
3510         elif field == "ip":
3511           val = instance.nics[0].ip
3512         elif field == "bridge":
3513           val = instance.nics[0].bridge
3514         elif field == "mac":
3515           val = instance.nics[0].mac
3516         elif field == "sda_size" or field == "sdb_size":
3517           idx = ord(field[2]) - ord('a')
3518           try:
3519             val = instance.FindDisk(idx).size
3520           except errors.OpPrereqError:
3521             val = None
3522         elif field == "disk_usage": # total disk usage per node
3523           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3524           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3525         elif field == "tags":
3526           val = list(instance.GetTags())
3527         elif field == "serial_no":
3528           val = instance.serial_no
3529         elif field == "network_port":
3530           val = instance.network_port
3531         elif field == "hypervisor":
3532           val = instance.hypervisor
3533         elif field == "hvparams":
3534           val = i_hv
3535         elif (field.startswith(HVPREFIX) and
3536               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3537           val = i_hv.get(field[len(HVPREFIX):], None)
3538         elif field == "beparams":
3539           val = i_be
3540         elif (field.startswith(BEPREFIX) and
3541               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3542           val = i_be.get(field[len(BEPREFIX):], None)
3543         elif st_match and st_match.groups():
3544           # matches a variable list
3545           st_groups = st_match.groups()
3546           if st_groups and st_groups[0] == "disk":
3547             if st_groups[1] == "count":
3548               val = len(instance.disks)
3549             elif st_groups[1] == "sizes":
3550               val = [disk.size for disk in instance.disks]
3551             elif st_groups[1] == "size":
3552               try:
3553                 val = instance.FindDisk(st_groups[2]).size
3554               except errors.OpPrereqError:
3555                 val = None
3556             else:
3557               assert False, "Unhandled disk parameter"
3558           elif st_groups[0] == "nic":
3559             if st_groups[1] == "count":
3560               val = len(instance.nics)
3561             elif st_groups[1] == "macs":
3562               val = [nic.mac for nic in instance.nics]
3563             elif st_groups[1] == "ips":
3564               val = [nic.ip for nic in instance.nics]
3565             elif st_groups[1] == "bridges":
3566               val = [nic.bridge for nic in instance.nics]
3567             else:
3568               # index-based item
3569               nic_idx = int(st_groups[2])
3570               if nic_idx >= len(instance.nics):
3571                 val = None
3572               else:
3573                 if st_groups[1] == "mac":
3574                   val = instance.nics[nic_idx].mac
3575                 elif st_groups[1] == "ip":
3576                   val = instance.nics[nic_idx].ip
3577                 elif st_groups[1] == "bridge":
3578                   val = instance.nics[nic_idx].bridge
3579                 else:
3580                   assert False, "Unhandled NIC parameter"
3581           else:
3582             assert False, "Unhandled variable parameter"
3583         else:
3584           raise errors.ParameterError(field)
3585         iout.append(val)
3586       output.append(iout)
3587
3588     return output
3589
3590
3591 class LUFailoverInstance(LogicalUnit):
3592   """Failover an instance.
3593
3594   """
3595   HPATH = "instance-failover"
3596   HTYPE = constants.HTYPE_INSTANCE
3597   _OP_REQP = ["instance_name", "ignore_consistency"]
3598   REQ_BGL = False
3599
3600   def ExpandNames(self):
3601     self._ExpandAndLockInstance()
3602     self.needed_locks[locking.LEVEL_NODE] = []
3603     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3604
3605   def DeclareLocks(self, level):
3606     if level == locking.LEVEL_NODE:
3607       self._LockInstancesNodes()
3608
3609   def BuildHooksEnv(self):
3610     """Build hooks env.
3611
3612     This runs on master, primary and secondary nodes of the instance.
3613
3614     """
3615     env = {
3616       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3617       }
3618     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3619     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3620     return env, nl, nl
3621
3622   def CheckPrereq(self):
3623     """Check prerequisites.
3624
3625     This checks that the instance is in the cluster.
3626
3627     """
3628     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3629     assert self.instance is not None, \
3630       "Cannot retrieve locked instance %s" % self.op.instance_name
3631
3632     bep = self.cfg.GetClusterInfo().FillBE(instance)
3633     if instance.disk_template not in constants.DTS_NET_MIRROR:
3634       raise errors.OpPrereqError("Instance's disk layout is not"
3635                                  " network mirrored, cannot failover.")
3636
3637     secondary_nodes = instance.secondary_nodes
3638     if not secondary_nodes:
3639       raise errors.ProgrammerError("no secondary node but using "
3640                                    "a mirrored disk template")
3641
3642     target_node = secondary_nodes[0]
3643     _CheckNodeOnline(self, target_node)
3644     _CheckNodeNotDrained(self, target_node)
3645     if instance.admin_up:
3646       # check memory requirements on the secondary node
3647       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3648                            instance.name, bep[constants.BE_MEMORY],
3649                            instance.hypervisor)
3650     else:
3651       self.LogInfo("Not checking memory on the secondary node as"
3652                    " instance will not be started")
3653
3654     # check bridge existance
3655     _CheckInstanceBridgesExist(self, instance, node=target_node)
3656
3657   def Exec(self, feedback_fn):
3658     """Failover an instance.
3659
3660     The failover is done by shutting it down on its present node and
3661     starting it on the secondary.
3662
3663     """
3664     instance = self.instance
3665
3666     source_node = instance.primary_node
3667     target_node = instance.secondary_nodes[0]
3668
3669     feedback_fn("* checking disk consistency between source and target")
3670     for dev in instance.disks:
3671       # for drbd, these are drbd over lvm
3672       if not _CheckDiskConsistency(self, dev, target_node, False):
3673         if instance.admin_up and not self.op.ignore_consistency:
3674           raise errors.OpExecError("Disk %s is degraded on target node,"
3675                                    " aborting failover." % dev.iv_name)
3676
3677     feedback_fn("* shutting down instance on source node")
3678     logging.info("Shutting down instance %s on node %s",
3679                  instance.name, source_node)
3680
3681     result = self.rpc.call_instance_shutdown(source_node, instance)
3682     msg = result.fail_msg
3683     if msg:
3684       if self.op.ignore_consistency:
3685         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3686                              " Proceeding anyway. Please make sure node"
3687                              " %s is down. Error details: %s",
3688                              instance.name, source_node, source_node, msg)
3689       else:
3690         raise errors.OpExecError("Could not shutdown instance %s on"
3691                                  " node %s: %s" %
3692                                  (instance.name, source_node, msg))
3693
3694     feedback_fn("* deactivating the instance's disks on source node")
3695     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3696       raise errors.OpExecError("Can't shut down the instance's disks.")
3697
3698     instance.primary_node = target_node
3699     # distribute new instance config to the other nodes
3700     self.cfg.Update(instance)
3701
3702     # Only start the instance if it's marked as up
3703     if instance.admin_up:
3704       feedback_fn("* activating the instance's disks on target node")
3705       logging.info("Starting instance %s on node %s",
3706                    instance.name, target_node)
3707
3708       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3709                                                ignore_secondaries=True)
3710       if not disks_ok:
3711         _ShutdownInstanceDisks(self, instance)
3712         raise errors.OpExecError("Can't activate the instance's disks")
3713
3714       feedback_fn("* starting the instance on the target node")
3715       result = self.rpc.call_instance_start(target_node, instance, None, None)
3716       msg = result.fail_msg
3717       if msg:
3718         _ShutdownInstanceDisks(self, instance)
3719         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3720                                  (instance.name, target_node, msg))
3721
3722
3723 class LUMigrateInstance(LogicalUnit):
3724   """Migrate an instance.
3725
3726   This is migration without shutting down, compared to the failover,
3727   which is done with shutdown.
3728
3729   """
3730   HPATH = "instance-migrate"
3731   HTYPE = constants.HTYPE_INSTANCE
3732   _OP_REQP = ["instance_name", "live", "cleanup"]
3733
3734   REQ_BGL = False
3735
3736   def ExpandNames(self):
3737     self._ExpandAndLockInstance()
3738     self.needed_locks[locking.LEVEL_NODE] = []
3739     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3740
3741   def DeclareLocks(self, level):
3742     if level == locking.LEVEL_NODE:
3743       self._LockInstancesNodes()
3744
3745   def BuildHooksEnv(self):
3746     """Build hooks env.
3747
3748     This runs on master, primary and secondary nodes of the instance.
3749
3750     """
3751     env = _BuildInstanceHookEnvByObject(self, self.instance)
3752     env["MIGRATE_LIVE"] = self.op.live
3753     env["MIGRATE_CLEANUP"] = self.op.cleanup
3754     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3755     return env, nl, nl
3756
3757   def CheckPrereq(self):
3758     """Check prerequisites.
3759
3760     This checks that the instance is in the cluster.
3761
3762     """
3763     instance = self.cfg.GetInstanceInfo(
3764       self.cfg.ExpandInstanceName(self.op.instance_name))
3765     if instance is None:
3766       raise errors.OpPrereqError("Instance '%s' not known" %
3767                                  self.op.instance_name)
3768
3769     if instance.disk_template != constants.DT_DRBD8:
3770       raise errors.OpPrereqError("Instance's disk layout is not"
3771                                  " drbd8, cannot migrate.")
3772
3773     secondary_nodes = instance.secondary_nodes
3774     if not secondary_nodes:
3775       raise errors.ConfigurationError("No secondary node but using"
3776                                       " drbd8 disk template")
3777
3778     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3779
3780     target_node = secondary_nodes[0]
3781     # check memory requirements on the secondary node
3782     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3783                          instance.name, i_be[constants.BE_MEMORY],
3784                          instance.hypervisor)
3785
3786     # check bridge existance
3787     _CheckInstanceBridgesExist(self, instance, node=target_node)
3788
3789     if not self.op.cleanup:
3790       _CheckNodeNotDrained(self, target_node)
3791       result = self.rpc.call_instance_migratable(instance.primary_node,
3792                                                  instance)
3793       result.Raise("Can't migrate, please use failover", prereq=True)
3794
3795     self.instance = instance
3796
3797   def _WaitUntilSync(self):
3798     """Poll with custom rpc for disk sync.
3799
3800     This uses our own step-based rpc call.
3801
3802     """
3803     self.feedback_fn("* wait until resync is done")
3804     all_done = False
3805     while not all_done:
3806       all_done = True
3807       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3808                                             self.nodes_ip,
3809                                             self.instance.disks)
3810       min_percent = 100
3811       for node, nres in result.items():
3812         nres.Raise("Cannot resync disks on node %s" % node)
3813         node_done, node_percent = nres.payload
3814         all_done = all_done and node_done
3815         if node_percent is not None:
3816           min_percent = min(min_percent, node_percent)
3817       if not all_done:
3818         if min_percent < 100:
3819           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3820         time.sleep(2)
3821
3822   def _EnsureSecondary(self, node):
3823     """Demote a node to secondary.
3824
3825     """
3826     self.feedback_fn("* switching node %s to secondary mode" % node)
3827
3828     for dev in self.instance.disks:
3829       self.cfg.SetDiskID(dev, node)
3830
3831     result = self.rpc.call_blockdev_close(node, self.instance.name,
3832                                           self.instance.disks)
3833     result.Raise("Cannot change disk to secondary on node %s" % node)
3834
3835   def _GoStandalone(self):
3836     """Disconnect from the network.
3837
3838     """
3839     self.feedback_fn("* changing into standalone mode")
3840     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3841                                                self.instance.disks)
3842     for node, nres in result.items():
3843       nres.Raise("Cannot disconnect disks node %s" % node)
3844
3845   def _GoReconnect(self, multimaster):
3846     """Reconnect to the network.
3847
3848     """
3849     if multimaster:
3850       msg = "dual-master"
3851     else:
3852       msg = "single-master"
3853     self.feedback_fn("* changing disks into %s mode" % msg)
3854     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3855                                            self.instance.disks,
3856                                            self.instance.name, multimaster)
3857     for node, nres in result.items():
3858       nres.Raise("Cannot change disks config on node %s" % node)
3859
3860   def _ExecCleanup(self):
3861     """Try to cleanup after a failed migration.
3862
3863     The cleanup is done by:
3864       - check that the instance is running only on one node
3865         (and update the config if needed)
3866       - change disks on its secondary node to secondary
3867       - wait until disks are fully synchronized
3868       - disconnect from the network
3869       - change disks into single-master mode
3870       - wait again until disks are fully synchronized
3871
3872     """
3873     instance = self.instance
3874     target_node = self.target_node
3875     source_node = self.source_node
3876
3877     # check running on only one node
3878     self.feedback_fn("* checking where the instance actually runs"
3879                      " (if this hangs, the hypervisor might be in"
3880                      " a bad state)")
3881     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3882     for node, result in ins_l.items():
3883       result.Raise("Can't contact node %s" % node)
3884
3885     runningon_source = instance.name in ins_l[source_node].payload
3886     runningon_target = instance.name in ins_l[target_node].payload
3887
3888     if runningon_source and runningon_target:
3889       raise errors.OpExecError("Instance seems to be running on two nodes,"
3890                                " or the hypervisor is confused. You will have"
3891                                " to ensure manually that it runs only on one"
3892                                " and restart this operation.")
3893
3894     if not (runningon_source or runningon_target):
3895       raise errors.OpExecError("Instance does not seem to be running at all."
3896                                " In this case, it's safer to repair by"
3897                                " running 'gnt-instance stop' to ensure disk"
3898                                " shutdown, and then restarting it.")
3899
3900     if runningon_target:
3901       # the migration has actually succeeded, we need to update the config
3902       self.feedback_fn("* instance running on secondary node (%s),"
3903                        " updating config" % target_node)
3904       instance.primary_node = target_node
3905       self.cfg.Update(instance)
3906       demoted_node = source_node
3907     else:
3908       self.feedback_fn("* instance confirmed to be running on its"
3909                        " primary node (%s)" % source_node)
3910       demoted_node = target_node
3911
3912     self._EnsureSecondary(demoted_node)
3913     try:
3914       self._WaitUntilSync()
3915     except errors.OpExecError:
3916       # we ignore here errors, since if the device is standalone, it
3917       # won't be able to sync
3918       pass
3919     self._GoStandalone()
3920     self._GoReconnect(False)
3921     self._WaitUntilSync()
3922
3923     self.feedback_fn("* done")
3924
3925   def _RevertDiskStatus(self):
3926     """Try to revert the disk status after a failed migration.
3927
3928     """
3929     target_node = self.target_node
3930     try:
3931       self._EnsureSecondary(target_node)
3932       self._GoStandalone()
3933       self._GoReconnect(False)
3934       self._WaitUntilSync()
3935     except errors.OpExecError, err:
3936       self.LogWarning("Migration failed and I can't reconnect the"
3937                       " drives: error '%s'\n"
3938                       "Please look and recover the instance status" %
3939                       str(err))
3940
3941   def _AbortMigration(self):
3942     """Call the hypervisor code to abort a started migration.
3943
3944     """
3945     instance = self.instance
3946     target_node = self.target_node
3947     migration_info = self.migration_info
3948
3949     abort_result = self.rpc.call_finalize_migration(target_node,
3950                                                     instance,
3951                                                     migration_info,
3952                                                     False)
3953     abort_msg = abort_result.fail_msg
3954     if abort_msg:
3955       logging.error("Aborting migration failed on target node %s: %s" %
3956                     (target_node, abort_msg))
3957       # Don't raise an exception here, as we stil have to try to revert the
3958       # disk status, even if this step failed.
3959
3960   def _ExecMigration(self):
3961     """Migrate an instance.
3962
3963     The migrate is done by:
3964       - change the disks into dual-master mode
3965       - wait until disks are fully synchronized again
3966       - migrate the instance
3967       - change disks on the new secondary node (the old primary) to secondary
3968       - wait until disks are fully synchronized
3969       - change disks into single-master mode
3970
3971     """
3972     instance = self.instance
3973     target_node = self.target_node
3974     source_node = self.source_node
3975
3976     self.feedback_fn("* checking disk consistency between source and target")
3977     for dev in instance.disks:
3978       if not _CheckDiskConsistency(self, dev, target_node, False):
3979         raise errors.OpExecError("Disk %s is degraded or not fully"
3980                                  " synchronized on target node,"
3981                                  " aborting migrate." % dev.iv_name)
3982
3983     # First get the migration information from the remote node
3984     result = self.rpc.call_migration_info(source_node, instance)
3985     msg = result.fail_msg
3986     if msg:
3987       log_err = ("Failed fetching source migration information from %s: %s" %
3988                  (source_node, msg))
3989       logging.error(log_err)
3990       raise errors.OpExecError(log_err)
3991
3992     self.migration_info = migration_info = result.payload
3993
3994     # Then switch the disks to master/master mode
3995     self._EnsureSecondary(target_node)
3996     self._GoStandalone()
3997     self._GoReconnect(True)
3998     self._WaitUntilSync()
3999
4000     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4001     result = self.rpc.call_accept_instance(target_node,
4002                                            instance,
4003                                            migration_info,
4004                                            self.nodes_ip[target_node])
4005
4006     msg = result.fail_msg
4007     if msg:
4008       logging.error("Instance pre-migration failed, trying to revert"
4009                     " disk status: %s", msg)
4010       self._AbortMigration()
4011       self._RevertDiskStatus()
4012       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4013                                (instance.name, msg))
4014
4015     self.feedback_fn("* migrating instance to %s" % target_node)
4016     time.sleep(10)
4017     result = self.rpc.call_instance_migrate(source_node, instance,
4018                                             self.nodes_ip[target_node],
4019                                             self.op.live)
4020     msg = result.fail_msg
4021     if msg:
4022       logging.error("Instance migration failed, trying to revert"
4023                     " disk status: %s", msg)
4024       self._AbortMigration()
4025       self._RevertDiskStatus()
4026       raise errors.OpExecError("Could not migrate instance %s: %s" %
4027                                (instance.name, msg))
4028     time.sleep(10)
4029
4030     instance.primary_node = target_node
4031     # distribute new instance config to the other nodes
4032     self.cfg.Update(instance)
4033
4034     result = self.rpc.call_finalize_migration(target_node,
4035                                               instance,
4036                                               migration_info,
4037                                               True)
4038     msg = result.fail_msg
4039     if msg:
4040       logging.error("Instance migration succeeded, but finalization failed:"
4041                     " %s" % msg)
4042       raise errors.OpExecError("Could not finalize instance migration: %s" %
4043                                msg)
4044
4045     self._EnsureSecondary(source_node)
4046     self._WaitUntilSync()
4047     self._GoStandalone()
4048     self._GoReconnect(False)
4049     self._WaitUntilSync()
4050
4051     self.feedback_fn("* done")
4052
4053   def Exec(self, feedback_fn):
4054     """Perform the migration.
4055
4056     """
4057     self.feedback_fn = feedback_fn
4058
4059     self.source_node = self.instance.primary_node
4060     self.target_node = self.instance.secondary_nodes[0]
4061     self.all_nodes = [self.source_node, self.target_node]
4062     self.nodes_ip = {
4063       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4064       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4065       }
4066     if self.op.cleanup:
4067       return self._ExecCleanup()
4068     else:
4069       return self._ExecMigration()
4070
4071
4072 def _CreateBlockDev(lu, node, instance, device, force_create,
4073                     info, force_open):
4074   """Create a tree of block devices on a given node.
4075
4076   If this device type has to be created on secondaries, create it and
4077   all its children.
4078
4079   If not, just recurse to children keeping the same 'force' value.
4080
4081   @param lu: the lu on whose behalf we execute
4082   @param node: the node on which to create the device
4083   @type instance: L{objects.Instance}
4084   @param instance: the instance which owns the device
4085   @type device: L{objects.Disk}
4086   @param device: the device to create
4087   @type force_create: boolean
4088   @param force_create: whether to force creation of this device; this
4089       will be change to True whenever we find a device which has
4090       CreateOnSecondary() attribute
4091   @param info: the extra 'metadata' we should attach to the device
4092       (this will be represented as a LVM tag)
4093   @type force_open: boolean
4094   @param force_open: this parameter will be passes to the
4095       L{backend.BlockdevCreate} function where it specifies
4096       whether we run on primary or not, and it affects both
4097       the child assembly and the device own Open() execution
4098
4099   """
4100   if device.CreateOnSecondary():
4101     force_create = True
4102
4103   if device.children:
4104     for child in device.children:
4105       _CreateBlockDev(lu, node, instance, child, force_create,
4106                       info, force_open)
4107
4108   if not force_create:
4109     return
4110
4111   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4112
4113
4114 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4115   """Create a single block device on a given node.
4116
4117   This will not recurse over children of the device, so they must be
4118   created in advance.
4119
4120   @param lu: the lu on whose behalf we execute
4121   @param node: the node on which to create the device
4122   @type instance: L{objects.Instance}
4123   @param instance: the instance which owns the device
4124   @type device: L{objects.Disk}
4125   @param device: the device to create
4126   @param info: the extra 'metadata' we should attach to the device
4127       (this will be represented as a LVM tag)
4128   @type force_open: boolean
4129   @param force_open: this parameter will be passes to the
4130       L{backend.BlockdevCreate} function where it specifies
4131       whether we run on primary or not, and it affects both
4132       the child assembly and the device own Open() execution
4133
4134   """
4135   lu.cfg.SetDiskID(device, node)
4136   result = lu.rpc.call_blockdev_create(node, device, device.size,
4137                                        instance.name, force_open, info)
4138   result.Raise("Can't create block device %s on"
4139                " node %s for instance %s" % (device, node, instance.name))
4140   if device.physical_id is None:
4141     device.physical_id = result.payload
4142
4143
4144 def _GenerateUniqueNames(lu, exts):
4145   """Generate a suitable LV name.
4146
4147   This will generate a logical volume name for the given instance.
4148
4149   """
4150   results = []
4151   for val in exts:
4152     new_id = lu.cfg.GenerateUniqueID()
4153     results.append("%s%s" % (new_id, val))
4154   return results
4155
4156
4157 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4158                          p_minor, s_minor):
4159   """Generate a drbd8 device complete with its children.
4160
4161   """
4162   port = lu.cfg.AllocatePort()
4163   vgname = lu.cfg.GetVGName()
4164   shared_secret = lu.cfg.GenerateDRBDSecret()
4165   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4166                           logical_id=(vgname, names[0]))
4167   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4168                           logical_id=(vgname, names[1]))
4169   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4170                           logical_id=(primary, secondary, port,
4171                                       p_minor, s_minor,
4172                                       shared_secret),
4173                           children=[dev_data, dev_meta],
4174                           iv_name=iv_name)
4175   return drbd_dev
4176
4177
4178 def _GenerateDiskTemplate(lu, template_name,
4179                           instance_name, primary_node,
4180                           secondary_nodes, disk_info,
4181                           file_storage_dir, file_driver,
4182                           base_index):
4183   """Generate the entire disk layout for a given template type.
4184
4185   """
4186   #TODO: compute space requirements
4187
4188   vgname = lu.cfg.GetVGName()
4189   disk_count = len(disk_info)
4190   disks = []
4191   if template_name == constants.DT_DISKLESS:
4192     pass
4193   elif template_name == constants.DT_PLAIN:
4194     if len(secondary_nodes) != 0:
4195       raise errors.ProgrammerError("Wrong template configuration")
4196
4197     names = _GenerateUniqueNames(lu, [".disk%d" % i
4198                                       for i in range(disk_count)])
4199     for idx, disk in enumerate(disk_info):
4200       disk_index = idx + base_index
4201       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4202                               logical_id=(vgname, names[idx]),
4203                               iv_name="disk/%d" % disk_index,
4204                               mode=disk["mode"])
4205       disks.append(disk_dev)
4206   elif template_name == constants.DT_DRBD8:
4207     if len(secondary_nodes) != 1:
4208       raise errors.ProgrammerError("Wrong template configuration")
4209     remote_node = secondary_nodes[0]
4210     minors = lu.cfg.AllocateDRBDMinor(
4211       [primary_node, remote_node] * len(disk_info), instance_name)
4212
4213     names = []
4214     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4215                                                for i in range(disk_count)]):
4216       names.append(lv_prefix + "_data")
4217       names.append(lv_prefix + "_meta")
4218     for idx, disk in enumerate(disk_info):
4219       disk_index = idx + base_index
4220       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4221                                       disk["size"], names[idx*2:idx*2+2],
4222                                       "disk/%d" % disk_index,
4223                                       minors[idx*2], minors[idx*2+1])
4224       disk_dev.mode = disk["mode"]
4225       disks.append(disk_dev)
4226   elif template_name == constants.DT_FILE:
4227     if len(secondary_nodes) != 0:
4228       raise errors.ProgrammerError("Wrong template configuration")
4229
4230     for idx, disk in enumerate(disk_info):
4231       disk_index = idx + base_index
4232       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4233                               iv_name="disk/%d" % disk_index,
4234                               logical_id=(file_driver,
4235                                           "%s/disk%d" % (file_storage_dir,
4236                                                          disk_index)),
4237                               mode=disk["mode"])
4238       disks.append(disk_dev)
4239   else:
4240     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4241   return disks
4242
4243
4244 def _GetInstanceInfoText(instance):
4245   """Compute that text that should be added to the disk's metadata.
4246
4247   """
4248   return "originstname+%s" % instance.name
4249
4250
4251 def _CreateDisks(lu, instance):
4252   """Create all disks for an instance.
4253
4254   This abstracts away some work from AddInstance.
4255
4256   @type lu: L{LogicalUnit}
4257   @param lu: the logical unit on whose behalf we execute
4258   @type instance: L{objects.Instance}
4259   @param instance: the instance whose disks we should create
4260   @rtype: boolean
4261   @return: the success of the creation
4262
4263   """
4264   info = _GetInstanceInfoText(instance)
4265   pnode = instance.primary_node
4266
4267   if instance.disk_template == constants.DT_FILE:
4268     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4269     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4270
4271     result.Raise("Failed to create directory '%s' on"
4272                  " node %s: %s" % (file_storage_dir, pnode))
4273
4274   # Note: this needs to be kept in sync with adding of disks in
4275   # LUSetInstanceParams
4276   for device in instance.disks:
4277     logging.info("Creating volume %s for instance %s",
4278                  device.iv_name, instance.name)
4279     #HARDCODE
4280     for node in instance.all_nodes:
4281       f_create = node == pnode
4282       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4283
4284
4285 def _RemoveDisks(lu, instance):
4286   """Remove all disks for an instance.
4287
4288   This abstracts away some work from `AddInstance()` and
4289   `RemoveInstance()`. Note that in case some of the devices couldn't
4290   be removed, the removal will continue with the other ones (compare
4291   with `_CreateDisks()`).
4292
4293   @type lu: L{LogicalUnit}
4294   @param lu: the logical unit on whose behalf we execute
4295   @type instance: L{objects.Instance}
4296   @param instance: the instance whose disks we should remove
4297   @rtype: boolean
4298   @return: the success of the removal
4299
4300   """
4301   logging.info("Removing block devices for instance %s", instance.name)
4302
4303   all_result = True
4304   for device in instance.disks:
4305     for node, disk in device.ComputeNodeTree(instance.primary_node):
4306       lu.cfg.SetDiskID(disk, node)
4307       msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4308       if msg:
4309         lu.LogWarning("Could not remove block device %s on node %s,"
4310                       " continuing anyway: %s", device.iv_name, node, msg)
4311         all_result = False
4312
4313   if instance.disk_template == constants.DT_FILE:
4314     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4315     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4316                                                  file_storage_dir)
4317     msg = result.fail_msg
4318     if msg:
4319       lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4320                     file_storage_dir, instance.primary_node, msg)
4321       all_result = False
4322
4323   return all_result
4324
4325
4326 def _ComputeDiskSize(disk_template, disks):
4327   """Compute disk size requirements in the volume group
4328
4329   """
4330   # Required free disk space as a function of disk and swap space
4331   req_size_dict = {
4332     constants.DT_DISKLESS: None,
4333     constants.DT_PLAIN: sum(d["size"] for d in disks),
4334     # 128 MB are added for drbd metadata for each disk
4335     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4336     constants.DT_FILE: None,
4337   }
4338
4339   if disk_template not in req_size_dict:
4340     raise errors.ProgrammerError("Disk template '%s' size requirement"
4341                                  " is unknown" %  disk_template)
4342
4343   return req_size_dict[disk_template]
4344
4345
4346 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4347   """Hypervisor parameter validation.
4348
4349   This function abstract the hypervisor parameter validation to be
4350   used in both instance create and instance modify.
4351
4352   @type lu: L{LogicalUnit}
4353   @param lu: the logical unit for which we check
4354   @type nodenames: list
4355   @param nodenames: the list of nodes on which we should check
4356   @type hvname: string
4357   @param hvname: the name of the hypervisor we should use
4358   @type hvparams: dict
4359   @param hvparams: the parameters which we need to check
4360   @raise errors.OpPrereqError: if the parameters are not valid
4361
4362   """
4363   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4364                                                   hvname,
4365                                                   hvparams)
4366   for node in nodenames:
4367     info = hvinfo[node]
4368     if info.offline:
4369       continue
4370     info.Raise("Hypervisor parameter validation failed on node %s" % node)
4371
4372
4373 class LUCreateInstance(LogicalUnit):
4374   """Create an instance.
4375
4376   """
4377   HPATH = "instance-add"
4378   HTYPE = constants.HTYPE_INSTANCE
4379   _OP_REQP = ["instance_name", "disks", "disk_template",
4380               "mode", "start",
4381               "wait_for_sync", "ip_check", "nics",
4382               "hvparams", "beparams"]
4383   REQ_BGL = False
4384
4385   def _ExpandNode(self, node):
4386     """Expands and checks one node name.
4387
4388     """
4389     node_full = self.cfg.ExpandNodeName(node)
4390     if node_full is None:
4391       raise errors.OpPrereqError("Unknown node %s" % node)
4392     return node_full
4393
4394   def ExpandNames(self):
4395     """ExpandNames for CreateInstance.
4396
4397     Figure out the right locks for instance creation.
4398
4399     """
4400     self.needed_locks = {}
4401
4402     # set optional parameters to none if they don't exist
4403     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4404       if not hasattr(self.op, attr):
4405         setattr(self.op, attr, None)
4406
4407     # cheap checks, mostly valid constants given
4408
4409     # verify creation mode
4410     if self.op.mode not in (constants.INSTANCE_CREATE,
4411                             constants.INSTANCE_IMPORT):
4412       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4413                                  self.op.mode)
4414
4415     # disk template and mirror node verification
4416     if self.op.disk_template not in constants.DISK_TEMPLATES:
4417       raise errors.OpPrereqError("Invalid disk template name")
4418
4419     if self.op.hypervisor is None:
4420       self.op.hypervisor = self.cfg.GetHypervisorType()
4421
4422     cluster = self.cfg.GetClusterInfo()
4423     enabled_hvs = cluster.enabled_hypervisors
4424     if self.op.hypervisor not in enabled_hvs:
4425       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4426                                  " cluster (%s)" % (self.op.hypervisor,
4427                                   ",".join(enabled_hvs)))
4428
4429     # check hypervisor parameter syntax (locally)
4430     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4431     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4432                                   self.op.hvparams)
4433     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4434     hv_type.CheckParameterSyntax(filled_hvp)
4435     self.hv_full = filled_hvp
4436
4437     # fill and remember the beparams dict
4438     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4439     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4440                                     self.op.beparams)
4441
4442     #### instance parameters check
4443
4444     # instance name verification
4445     hostname1 = utils.HostInfo(self.op.instance_name)
4446     self.op.instance_name = instance_name = hostname1.name
4447
4448     # this is just a preventive check, but someone might still add this
4449     # instance in the meantime, and creation will fail at lock-add time
4450     if instance_name in self.cfg.GetInstanceList():
4451       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4452                                  instance_name)
4453
4454     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4455
4456     # NIC buildup
4457     self.nics = []
4458     for idx, nic in enumerate(self.op.nics):
4459       nic_mode_req = nic.get("mode", None)
4460       nic_mode = nic_mode_req
4461       if nic_mode is None:
4462         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4463
4464       # in routed mode, for the first nic, the default ip is 'auto'
4465       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4466         default_ip_mode = constants.VALUE_AUTO
4467       else:
4468         default_ip_mode = constants.VALUE_NONE
4469
4470       # ip validity checks
4471       ip = nic.get("ip", default_ip_mode)
4472       if ip is None or ip.lower() == constants.VALUE_NONE:
4473         nic_ip = None
4474       elif ip.lower() == constants.VALUE_AUTO:
4475         nic_ip = hostname1.ip
4476       else:
4477         if not utils.IsValidIP(ip):
4478           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4479                                      " like a valid IP" % ip)
4480         nic_ip = ip
4481
4482       # TODO: check the ip for uniqueness !!
4483       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4484         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4485
4486       # MAC address verification
4487       mac = nic.get("mac", constants.VALUE_AUTO)
4488       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4489         if not utils.IsValidMac(mac.lower()):
4490           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4491                                      mac)
4492       # bridge verification
4493       bridge = nic.get("bridge", None)
4494       link = nic.get("link", None)
4495       if bridge and link:
4496         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4497       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4498         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4499       elif bridge:
4500         link = bridge
4501
4502       nicparams = {}
4503       if nic_mode_req:
4504         nicparams[constants.NIC_MODE] = nic_mode_req
4505       if link:
4506         nicparams[constants.NIC_LINK] = link
4507
4508       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4509                                       nicparams)
4510       objects.NIC.CheckParameterSyntax(check_params)
4511       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4512
4513     # disk checks/pre-build
4514     self.disks = []
4515     for disk in self.op.disks:
4516       mode = disk.get("mode", constants.DISK_RDWR)
4517       if mode not in constants.DISK_ACCESS_SET:
4518         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4519                                    mode)
4520       size = disk.get("size", None)
4521       if size is None:
4522         raise errors.OpPrereqError("Missing disk size")
4523       try:
4524         size = int(size)
4525       except ValueError:
4526         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4527       self.disks.append({"size": size, "mode": mode})
4528
4529     # used in CheckPrereq for ip ping check
4530     self.check_ip = hostname1.ip
4531
4532     # file storage checks
4533     if (self.op.file_driver and
4534         not self.op.file_driver in constants.FILE_DRIVER):
4535       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4536                                  self.op.file_driver)
4537
4538     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4539       raise errors.OpPrereqError("File storage directory path not absolute")
4540
4541     ### Node/iallocator related checks
4542     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4543       raise errors.OpPrereqError("One and only one of iallocator and primary"
4544                                  " node must be given")
4545
4546     if self.op.iallocator:
4547       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4548     else:
4549       self.op.pnode = self._ExpandNode(self.op.pnode)
4550       nodelist = [self.op.pnode]
4551       if self.op.snode is not None:
4552         self.op.snode = self._ExpandNode(self.op.snode)
4553         nodelist.append(self.op.snode)
4554       self.needed_locks[locking.LEVEL_NODE] = nodelist
4555
4556     # in case of import lock the source node too
4557     if self.op.mode == constants.INSTANCE_IMPORT:
4558       src_node = getattr(self.op, "src_node", None)
4559       src_path = getattr(self.op, "src_path", None)
4560
4561       if src_path is None:
4562         self.op.src_path = src_path = self.op.instance_name
4563
4564       if src_node is None:
4565         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4566         self.op.src_node = None
4567         if os.path.isabs(src_path):
4568           raise errors.OpPrereqError("Importing an instance from an absolute"
4569                                      " path requires a source node option.")
4570       else:
4571         self.op.src_node = src_node = self._ExpandNode(src_node)
4572         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4573           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4574         if not os.path.isabs(src_path):
4575           self.op.src_path = src_path = \
4576             os.path.join(constants.EXPORT_DIR, src_path)
4577
4578     else: # INSTANCE_CREATE
4579       if getattr(self.op, "os_type", None) is None:
4580         raise errors.OpPrereqError("No guest OS specified")
4581
4582   def _RunAllocator(self):
4583     """Run the allocator based on input opcode.
4584
4585     """
4586     nics = [n.ToDict() for n in self.nics]
4587     ial = IAllocator(self,
4588                      mode=constants.IALLOCATOR_MODE_ALLOC,
4589                      name=self.op.instance_name,
4590                      disk_template=self.op.disk_template,
4591                      tags=[],
4592                      os=self.op.os_type,
4593                      vcpus=self.be_full[constants.BE_VCPUS],
4594                      mem_size=self.be_full[constants.BE_MEMORY],
4595                      disks=self.disks,
4596                      nics=nics,
4597                      hypervisor=self.op.hypervisor,
4598                      )
4599
4600     ial.Run(self.op.iallocator)
4601
4602     if not ial.success:
4603       raise errors.OpPrereqError("Can't compute nodes using"
4604                                  " iallocator '%s': %s" % (self.op.iallocator,
4605                                                            ial.info))
4606     if len(ial.nodes) != ial.required_nodes:
4607       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4608                                  " of nodes (%s), required %s" %
4609                                  (self.op.iallocator, len(ial.nodes),
4610                                   ial.required_nodes))
4611     self.op.pnode = ial.nodes[0]
4612     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4613                  self.op.instance_name, self.op.iallocator,
4614                  ", ".join(ial.nodes))
4615     if ial.required_nodes == 2:
4616       self.op.snode = ial.nodes[1]
4617
4618   def BuildHooksEnv(self):
4619     """Build hooks env.
4620
4621     This runs on master, primary and secondary nodes of the instance.
4622
4623     """
4624     env = {
4625       "ADD_MODE": self.op.mode,
4626       }
4627     if self.op.mode == constants.INSTANCE_IMPORT:
4628       env["SRC_NODE"] = self.op.src_node
4629       env["SRC_PATH"] = self.op.src_path
4630       env["SRC_IMAGES"] = self.src_images
4631
4632     env.update(_BuildInstanceHookEnv(
4633       name=self.op.instance_name,
4634       primary_node=self.op.pnode,
4635       secondary_nodes=self.secondaries,
4636       status=self.op.start,
4637       os_type=self.op.os_type,
4638       memory=self.be_full[constants.BE_MEMORY],
4639       vcpus=self.be_full[constants.BE_VCPUS],
4640       nics=_PreBuildNICHooksList(self, self.nics),
4641       disk_template=self.op.disk_template,
4642       disks=[(d["size"], d["mode"]) for d in self.disks],
4643       bep=self.be_full,
4644       hvp=self.hv_full,
4645       hypervisor=self.op.hypervisor,
4646     ))
4647
4648     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4649           self.secondaries)
4650     return env, nl, nl
4651
4652
4653   def CheckPrereq(self):
4654     """Check prerequisites.
4655
4656     """
4657     if (not self.cfg.GetVGName() and
4658         self.op.disk_template not in constants.DTS_NOT_LVM):
4659       raise errors.OpPrereqError("Cluster does not support lvm-based"
4660                                  " instances")
4661
4662     if self.op.mode == constants.INSTANCE_IMPORT:
4663       src_node = self.op.src_node
4664       src_path = self.op.src_path
4665
4666       if src_node is None:
4667         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4668         exp_list = self.rpc.call_export_list(locked_nodes)
4669         found = False
4670         for node in exp_list:
4671           if exp_list[node].fail_msg:
4672             continue
4673           if src_path in exp_list[node].payload:
4674             found = True
4675             self.op.src_node = src_node = node
4676             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4677                                                        src_path)
4678             break
4679         if not found:
4680           raise errors.OpPrereqError("No export found for relative path %s" %
4681                                       src_path)
4682
4683       _CheckNodeOnline(self, src_node)
4684       result = self.rpc.call_export_info(src_node, src_path)
4685       result.Raise("No export or invalid export found in dir %s" % src_path)
4686
4687       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4688       if not export_info.has_section(constants.INISECT_EXP):
4689         raise errors.ProgrammerError("Corrupted export config")
4690
4691       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4692       if (int(ei_version) != constants.EXPORT_VERSION):
4693         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4694                                    (ei_version, constants.EXPORT_VERSION))
4695
4696       # Check that the new instance doesn't have less disks than the export
4697       instance_disks = len(self.disks)
4698       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4699       if instance_disks < export_disks:
4700         raise errors.OpPrereqError("Not enough disks to import."
4701                                    " (instance: %d, export: %d)" %
4702                                    (instance_disks, export_disks))
4703
4704       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4705       disk_images = []
4706       for idx in range(export_disks):
4707         option = 'disk%d_dump' % idx
4708         if export_info.has_option(constants.INISECT_INS, option):
4709           # FIXME: are the old os-es, disk sizes, etc. useful?
4710           export_name = export_info.get(constants.INISECT_INS, option)
4711           image = os.path.join(src_path, export_name)
4712           disk_images.append(image)
4713         else:
4714           disk_images.append(False)
4715
4716       self.src_images = disk_images
4717
4718       old_name = export_info.get(constants.INISECT_INS, 'name')
4719       # FIXME: int() here could throw a ValueError on broken exports
4720       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4721       if self.op.instance_name == old_name:
4722         for idx, nic in enumerate(self.nics):
4723           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4724             nic_mac_ini = 'nic%d_mac' % idx
4725             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4726
4727     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4728     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4729     if self.op.start and not self.op.ip_check:
4730       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4731                                  " adding an instance in start mode")
4732
4733     if self.op.ip_check:
4734       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4735         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4736                                    (self.check_ip, self.op.instance_name))
4737
4738     #### mac address generation
4739     # By generating here the mac address both the allocator and the hooks get
4740     # the real final mac address rather than the 'auto' or 'generate' value.
4741     # There is a race condition between the generation and the instance object
4742     # creation, which means that we know the mac is valid now, but we're not
4743     # sure it will be when we actually add the instance. If things go bad
4744     # adding the instance will abort because of a duplicate mac, and the
4745     # creation job will fail.
4746     for nic in self.nics:
4747       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4748         nic.mac = self.cfg.GenerateMAC()
4749
4750     #### allocator run
4751
4752     if self.op.iallocator is not None:
4753       self._RunAllocator()
4754
4755     #### node related checks
4756
4757     # check primary node
4758     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4759     assert self.pnode is not None, \
4760       "Cannot retrieve locked node %s" % self.op.pnode
4761     if pnode.offline:
4762       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4763                                  pnode.name)
4764     if pnode.drained:
4765       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4766                                  pnode.name)
4767
4768     self.secondaries = []
4769
4770     # mirror node verification
4771     if self.op.disk_template in constants.DTS_NET_MIRROR:
4772       if self.op.snode is None:
4773         raise errors.OpPrereqError("The networked disk templates need"
4774                                    " a mirror node")
4775       if self.op.snode == pnode.name:
4776         raise errors.OpPrereqError("The secondary node cannot be"
4777                                    " the primary node.")
4778       _CheckNodeOnline(self, self.op.snode)
4779       _CheckNodeNotDrained(self, self.op.snode)
4780       self.secondaries.append(self.op.snode)
4781
4782     nodenames = [pnode.name] + self.secondaries
4783
4784     req_size = _ComputeDiskSize(self.op.disk_template,
4785                                 self.disks)
4786
4787     # Check lv size requirements
4788     if req_size is not None:
4789       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4790                                          self.op.hypervisor)
4791       for node in nodenames:
4792         info = nodeinfo[node]
4793         info.Raise("Cannot get current information from node %s" % node)
4794         info = info.payload
4795         vg_free = info.get('vg_free', None)
4796         if not isinstance(vg_free, int):
4797           raise errors.OpPrereqError("Can't compute free disk space on"
4798                                      " node %s" % node)
4799         if req_size > vg_free:
4800           raise errors.OpPrereqError("Not enough disk space on target node %s."
4801                                      " %d MB available, %d MB required" %
4802                                      (node, vg_free, req_size))
4803
4804     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4805
4806     # os verification
4807     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4808     result.Raise("OS '%s' not in supported os list for primary node %s" %
4809                  (self.op.os_type, pnode.name), prereq=True)
4810
4811     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4812
4813     # memory check on primary node
4814     if self.op.start:
4815       _CheckNodeFreeMemory(self, self.pnode.name,
4816                            "creating instance %s" % self.op.instance_name,
4817                            self.be_full[constants.BE_MEMORY],
4818                            self.op.hypervisor)
4819
4820   def Exec(self, feedback_fn):
4821     """Create and add the instance to the cluster.
4822
4823     """
4824     instance = self.op.instance_name
4825     pnode_name = self.pnode.name
4826
4827     ht_kind = self.op.hypervisor
4828     if ht_kind in constants.HTS_REQ_PORT:
4829       network_port = self.cfg.AllocatePort()
4830     else:
4831       network_port = None
4832
4833     ##if self.op.vnc_bind_address is None:
4834     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4835
4836     # this is needed because os.path.join does not accept None arguments
4837     if self.op.file_storage_dir is None:
4838       string_file_storage_dir = ""
4839     else:
4840       string_file_storage_dir = self.op.file_storage_dir
4841
4842     # build the full file storage dir path
4843     file_storage_dir = os.path.normpath(os.path.join(
4844                                         self.cfg.GetFileStorageDir(),
4845                                         string_file_storage_dir, instance))
4846
4847
4848     disks = _GenerateDiskTemplate(self,
4849                                   self.op.disk_template,
4850                                   instance, pnode_name,
4851                                   self.secondaries,
4852                                   self.disks,
4853                                   file_storage_dir,
4854                                   self.op.file_driver,
4855                                   0)
4856
4857     iobj = objects.Instance(name=instance, os=self.op.os_type,
4858                             primary_node=pnode_name,
4859                             nics=self.nics, disks=disks,
4860                             disk_template=self.op.disk_template,
4861                             admin_up=False,
4862                             network_port=network_port,
4863                             beparams=self.op.beparams,
4864                             hvparams=self.op.hvparams,
4865                             hypervisor=self.op.hypervisor,
4866                             )
4867
4868     feedback_fn("* creating instance disks...")
4869     try:
4870       _CreateDisks(self, iobj)
4871     except errors.OpExecError:
4872       self.LogWarning("Device creation failed, reverting...")
4873       try:
4874         _RemoveDisks(self, iobj)
4875       finally:
4876         self.cfg.ReleaseDRBDMinors(instance)
4877         raise
4878
4879     feedback_fn("adding instance %s to cluster config" % instance)
4880
4881     self.cfg.AddInstance(iobj)
4882     # Declare that we don't want to remove the instance lock anymore, as we've
4883     # added the instance to the config
4884     del self.remove_locks[locking.LEVEL_INSTANCE]
4885     # Unlock all the nodes
4886     if self.op.mode == constants.INSTANCE_IMPORT:
4887       nodes_keep = [self.op.src_node]
4888       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4889                        if node != self.op.src_node]
4890       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4891       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4892     else:
4893       self.context.glm.release(locking.LEVEL_NODE)
4894       del self.acquired_locks[locking.LEVEL_NODE]
4895
4896     if self.op.wait_for_sync:
4897       disk_abort = not _WaitForSync(self, iobj)
4898     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4899       # make sure the disks are not degraded (still sync-ing is ok)
4900       time.sleep(15)
4901       feedback_fn("* checking mirrors status")
4902       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4903     else:
4904       disk_abort = False
4905
4906     if disk_abort:
4907       _RemoveDisks(self, iobj)
4908       self.cfg.RemoveInstance(iobj.name)
4909       # Make sure the instance lock gets removed
4910       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4911       raise errors.OpExecError("There are some degraded disks for"
4912                                " this instance")
4913
4914     feedback_fn("creating os for instance %s on node %s" %
4915                 (instance, pnode_name))
4916
4917     if iobj.disk_template != constants.DT_DISKLESS:
4918       if self.op.mode == constants.INSTANCE_CREATE:
4919         feedback_fn("* running the instance OS create scripts...")
4920         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4921         result.Raise("Could not add os for instance %s"
4922                      " on node %s" % (instance, pnode_name))
4923
4924       elif self.op.mode == constants.INSTANCE_IMPORT:
4925         feedback_fn("* running the instance OS import scripts...")
4926         src_node = self.op.src_node
4927         src_images = self.src_images
4928         cluster_name = self.cfg.GetClusterName()
4929         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4930                                                          src_node, src_images,
4931                                                          cluster_name)
4932         msg = import_result.fail_msg
4933         if msg:
4934           self.LogWarning("Error while importing the disk images for instance"
4935                           " %s on node %s: %s" % (instance, pnode_name, msg))
4936       else:
4937         # also checked in the prereq part
4938         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4939                                      % self.op.mode)
4940
4941     if self.op.start:
4942       iobj.admin_up = True
4943       self.cfg.Update(iobj)
4944       logging.info("Starting instance %s on node %s", instance, pnode_name)
4945       feedback_fn("* starting instance...")
4946       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4947       result.Raise("Could not start instance")
4948
4949
4950 class LUConnectConsole(NoHooksLU):
4951   """Connect to an instance's console.
4952
4953   This is somewhat special in that it returns the command line that
4954   you need to run on the master node in order to connect to the
4955   console.
4956
4957   """
4958   _OP_REQP = ["instance_name"]
4959   REQ_BGL = False
4960
4961   def ExpandNames(self):
4962     self._ExpandAndLockInstance()
4963
4964   def CheckPrereq(self):
4965     """Check prerequisites.
4966
4967     This checks that the instance is in the cluster.
4968
4969     """
4970     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4971     assert self.instance is not None, \
4972       "Cannot retrieve locked instance %s" % self.op.instance_name
4973     _CheckNodeOnline(self, self.instance.primary_node)
4974
4975   def Exec(self, feedback_fn):
4976     """Connect to the console of an instance
4977
4978     """
4979     instance = self.instance
4980     node = instance.primary_node
4981
4982     node_insts = self.rpc.call_instance_list([node],
4983                                              [instance.hypervisor])[node]
4984     node_insts.Raise("Can't get node information from %s" % node)
4985
4986     if instance.name not in node_insts.payload:
4987       raise errors.OpExecError("Instance %s is not running." % instance.name)
4988
4989     logging.debug("Connecting to console of %s on %s", instance.name, node)
4990
4991     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4992     cluster = self.cfg.GetClusterInfo()
4993     # beparams and hvparams are passed separately, to avoid editing the
4994     # instance and then saving the defaults in the instance itself.
4995     hvparams = cluster.FillHV(instance)
4996     beparams = cluster.FillBE(instance)
4997     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4998
4999     # build ssh cmdline
5000     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5001
5002
5003 class LUReplaceDisks(LogicalUnit):
5004   """Replace the disks of an instance.
5005
5006   """
5007   HPATH = "mirrors-replace"
5008   HTYPE = constants.HTYPE_INSTANCE
5009   _OP_REQP = ["instance_name", "mode", "disks"]
5010   REQ_BGL = False
5011
5012   def CheckArguments(self):
5013     if not hasattr(self.op, "remote_node"):
5014       self.op.remote_node = None
5015     if not hasattr(self.op, "iallocator"):
5016       self.op.iallocator = None
5017
5018     # check for valid parameter combination
5019     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5020     if self.op.mode == constants.REPLACE_DISK_CHG:
5021       if cnt == 2:
5022         raise errors.OpPrereqError("When changing the secondary either an"
5023                                    " iallocator script must be used or the"
5024                                    " new node given")
5025       elif cnt == 0:
5026         raise errors.OpPrereqError("Give either the iallocator or the new"
5027                                    " secondary, not both")
5028     else: # not replacing the secondary
5029       if cnt != 2:
5030         raise errors.OpPrereqError("The iallocator and new node options can"
5031                                    " be used only when changing the"
5032                                    " secondary node")
5033
5034   def ExpandNames(self):
5035     self._ExpandAndLockInstance()
5036
5037     if self.op.iallocator is not None:
5038       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5039     elif self.op.remote_node is not None:
5040       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5041       if remote_node is None:
5042         raise errors.OpPrereqError("Node '%s' not known" %
5043                                    self.op.remote_node)
5044       self.op.remote_node = remote_node
5045       # Warning: do not remove the locking of the new secondary here
5046       # unless DRBD8.AddChildren is changed to work in parallel;
5047       # currently it doesn't since parallel invocations of
5048       # FindUnusedMinor will conflict
5049       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5050       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5051     else:
5052       self.needed_locks[locking.LEVEL_NODE] = []
5053       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5054
5055   def DeclareLocks(self, level):
5056     # If we're not already locking all nodes in the set we have to declare the
5057     # instance's primary/secondary nodes.
5058     if (level == locking.LEVEL_NODE and
5059         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5060       self._LockInstancesNodes()
5061
5062   def _RunAllocator(self):
5063     """Compute a new secondary node using an IAllocator.
5064
5065     """
5066     ial = IAllocator(self,
5067                      mode=constants.IALLOCATOR_MODE_RELOC,
5068                      name=self.op.instance_name,
5069                      relocate_from=[self.sec_node])
5070
5071     ial.Run(self.op.iallocator)
5072
5073     if not ial.success:
5074       raise errors.OpPrereqError("Can't compute nodes using"
5075                                  " iallocator '%s': %s" % (self.op.iallocator,
5076                                                            ial.info))
5077     if len(ial.nodes) != ial.required_nodes:
5078       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5079                                  " of nodes (%s), required %s" %
5080                                  (len(ial.nodes), ial.required_nodes))
5081     self.op.remote_node = ial.nodes[0]
5082     self.LogInfo("Selected new secondary for the instance: %s",
5083                  self.op.remote_node)
5084
5085   def BuildHooksEnv(self):
5086     """Build hooks env.
5087
5088     This runs on the master, the primary and all the secondaries.
5089
5090     """
5091     env = {
5092       "MODE": self.op.mode,
5093       "NEW_SECONDARY": self.op.remote_node,
5094       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5095       }
5096     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5097     nl = [
5098       self.cfg.GetMasterNode(),
5099       self.instance.primary_node,
5100       ]
5101     if self.op.remote_node is not None:
5102       nl.append(self.op.remote_node)
5103     return env, nl, nl
5104
5105   def CheckPrereq(self):
5106     """Check prerequisites.
5107
5108     This checks that the instance is in the cluster.
5109
5110     """
5111     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5112     assert instance is not None, \
5113       "Cannot retrieve locked instance %s" % self.op.instance_name
5114     self.instance = instance
5115
5116     if instance.disk_template != constants.DT_DRBD8:
5117       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5118                                  " instances")
5119
5120     if len(instance.secondary_nodes) != 1:
5121       raise errors.OpPrereqError("The instance has a strange layout,"
5122                                  " expected one secondary but found %d" %
5123                                  len(instance.secondary_nodes))
5124
5125     self.sec_node = instance.secondary_nodes[0]
5126
5127     if self.op.iallocator is not None:
5128       self._RunAllocator()
5129
5130     remote_node = self.op.remote_node
5131     if remote_node is not None:
5132       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5133       assert self.remote_node_info is not None, \
5134         "Cannot retrieve locked node %s" % remote_node
5135     else:
5136       self.remote_node_info = None
5137     if remote_node == instance.primary_node:
5138       raise errors.OpPrereqError("The specified node is the primary node of"
5139                                  " the instance.")
5140     elif remote_node == self.sec_node:
5141       raise errors.OpPrereqError("The specified node is already the"
5142                                  " secondary node of the instance.")
5143
5144     if self.op.mode == constants.REPLACE_DISK_PRI:
5145       n1 = self.tgt_node = instance.primary_node
5146       n2 = self.oth_node = self.sec_node
5147     elif self.op.mode == constants.REPLACE_DISK_SEC:
5148       n1 = self.tgt_node = self.sec_node
5149       n2 = self.oth_node = instance.primary_node
5150     elif self.op.mode == constants.REPLACE_DISK_CHG:
5151       n1 = self.new_node = remote_node
5152       n2 = self.oth_node = instance.primary_node
5153       self.tgt_node = self.sec_node
5154       _CheckNodeNotDrained(self, remote_node)
5155     else:
5156       raise errors.ProgrammerError("Unhandled disk replace mode")
5157
5158     _CheckNodeOnline(self, n1)
5159     _CheckNodeOnline(self, n2)
5160
5161     if not self.op.disks:
5162       self.op.disks = range(len(instance.disks))
5163
5164     for disk_idx in self.op.disks:
5165       instance.FindDisk(disk_idx)
5166
5167   def _ExecD8DiskOnly(self, feedback_fn):
5168     """Replace a disk on the primary or secondary for dbrd8.
5169
5170     The algorithm for replace is quite complicated:
5171
5172       1. for each disk to be replaced:
5173
5174         1. create new LVs on the target node with unique names
5175         1. detach old LVs from the drbd device
5176         1. rename old LVs to name_replaced.<time_t>
5177         1. rename new LVs to old LVs
5178         1. attach the new LVs (with the old names now) to the drbd device
5179
5180       1. wait for sync across all devices
5181
5182       1. for each modified disk:
5183
5184         1. remove old LVs (which have the name name_replaces.<time_t>)
5185
5186     Failures are not very well handled.
5187
5188     """
5189     steps_total = 6
5190     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5191     instance = self.instance
5192     iv_names = {}
5193     vgname = self.cfg.GetVGName()
5194     # start of work
5195     cfg = self.cfg
5196     tgt_node = self.tgt_node
5197     oth_node = self.oth_node
5198
5199     # Step: check device activation
5200     self.proc.LogStep(1, steps_total, "check device existence")
5201     info("checking volume groups")
5202     my_vg = cfg.GetVGName()
5203     results = self.rpc.call_vg_list([oth_node, tgt_node])
5204     if not results:
5205       raise errors.OpExecError("Can't list volume groups on the nodes")
5206     for node in oth_node, tgt_node:
5207       res = results[node]
5208       res.Raise("Error checking node %s" % node)
5209       if my_vg not in res.payload:
5210         raise errors.OpExecError("Volume group '%s' not found on %s" %
5211                                  (my_vg, node))
5212     for idx, dev in enumerate(instance.disks):
5213       if idx not in self.op.disks:
5214         continue
5215       for node in tgt_node, oth_node:
5216         info("checking disk/%d on %s" % (idx, node))
5217         cfg.SetDiskID(dev, node)
5218         result = self.rpc.call_blockdev_find(node, dev)
5219         msg = result.fail_msg
5220         if not msg and not result.payload:
5221           msg = "disk not found"
5222         if msg:
5223           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5224                                    (idx, node, msg))
5225
5226     # Step: check other node consistency
5227     self.proc.LogStep(2, steps_total, "check peer consistency")
5228     for idx, dev in enumerate(instance.disks):
5229       if idx not in self.op.disks:
5230         continue
5231       info("checking disk/%d consistency on %s" % (idx, oth_node))
5232       if not _CheckDiskConsistency(self, dev, oth_node,
5233                                    oth_node==instance.primary_node):
5234         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5235                                  " to replace disks on this node (%s)" %
5236                                  (oth_node, tgt_node))
5237
5238     # Step: create new storage
5239     self.proc.LogStep(3, steps_total, "allocate new storage")
5240     for idx, dev in enumerate(instance.disks):
5241       if idx not in self.op.disks:
5242         continue
5243       size = dev.size
5244       cfg.SetDiskID(dev, tgt_node)
5245       lv_names = [".disk%d_%s" % (idx, suf)
5246                   for suf in ["data", "meta"]]
5247       names = _GenerateUniqueNames(self, lv_names)
5248       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5249                              logical_id=(vgname, names[0]))
5250       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5251                              logical_id=(vgname, names[1]))
5252       new_lvs = [lv_data, lv_meta]
5253       old_lvs = dev.children
5254       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5255       info("creating new local storage on %s for %s" %
5256            (tgt_node, dev.iv_name))
5257       # we pass force_create=True to force the LVM creation
5258       for new_lv in new_lvs:
5259         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5260                         _GetInstanceInfoText(instance), False)
5261
5262     # Step: for each lv, detach+rename*2+attach
5263     self.proc.LogStep(4, steps_total, "change drbd configuration")
5264     for dev, old_lvs, new_lvs in iv_names.itervalues():
5265       info("detaching %s drbd from local storage" % dev.iv_name)
5266       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5267       result.Raise("Can't detach drbd from local storage on node"
5268                    " %s for device %s" % (tgt_node, dev.iv_name))
5269       #dev.children = []
5270       #cfg.Update(instance)
5271
5272       # ok, we created the new LVs, so now we know we have the needed
5273       # storage; as such, we proceed on the target node to rename
5274       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5275       # using the assumption that logical_id == physical_id (which in
5276       # turn is the unique_id on that node)
5277
5278       # FIXME(iustin): use a better name for the replaced LVs
5279       temp_suffix = int(time.time())
5280       ren_fn = lambda d, suff: (d.physical_id[0],
5281                                 d.physical_id[1] + "_replaced-%s" % suff)
5282       # build the rename list based on what LVs exist on the node
5283       rlist = []
5284       for to_ren in old_lvs:
5285         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5286         if not result.fail_msg and result.payload:
5287           # device exists
5288           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5289
5290       info("renaming the old LVs on the target node")
5291       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5292       result.Raise("Can't rename old LVs on node %s" % tgt_node)
5293       # now we rename the new LVs to the old LVs
5294       info("renaming the new LVs on the target node")
5295       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5296       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5297       result.Raise("Can't rename new LVs on node %s" % tgt_node)
5298
5299       for old, new in zip(old_lvs, new_lvs):
5300         new.logical_id = old.logical_id
5301         cfg.SetDiskID(new, tgt_node)
5302
5303       for disk in old_lvs:
5304         disk.logical_id = ren_fn(disk, temp_suffix)
5305         cfg.SetDiskID(disk, tgt_node)
5306
5307       # now that the new lvs have the old name, we can add them to the device
5308       info("adding new mirror component on %s" % tgt_node)
5309       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5310       msg = result.fail_msg
5311       if msg:
5312         for new_lv in new_lvs:
5313           msg2 = self.rpc.call_blockdev_remove(tgt_node, new_lv).fail_msg
5314           if msg2:
5315             warning("Can't rollback device %s: %s", dev, msg2,
5316                     hint="cleanup manually the unused logical volumes")
5317         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5318
5319       dev.children = new_lvs
5320       cfg.Update(instance)
5321
5322     # Step: wait for sync
5323
5324     # this can fail as the old devices are degraded and _WaitForSync
5325     # does a combined result over all disks, so we don't check its
5326     # return value
5327     self.proc.LogStep(5, steps_total, "sync devices")
5328     _WaitForSync(self, instance, unlock=True)
5329
5330     # so check manually all the devices
5331     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5332       cfg.SetDiskID(dev, instance.primary_node)
5333       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5334       msg = result.fail_msg
5335       if not msg and not result.payload:
5336         msg = "disk not found"
5337       if msg:
5338         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5339                                  (name, msg))
5340       if result.payload[5]:
5341         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5342
5343     # Step: remove old storage
5344     self.proc.LogStep(6, steps_total, "removing old storage")
5345     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5346       info("remove logical volumes for %s" % name)
5347       for lv in old_lvs:
5348         cfg.SetDiskID(lv, tgt_node)
5349         msg = self.rpc.call_blockdev_remove(tgt_node, lv).fail_msg
5350         if msg:
5351           warning("Can't remove old LV: %s" % msg,
5352                   hint="manually remove unused LVs")
5353           continue
5354
5355   def _ExecD8Secondary(self, feedback_fn):
5356     """Replace the secondary node for drbd8.
5357
5358     The algorithm for replace is quite complicated:
5359       - for all disks of the instance:
5360         - create new LVs on the new node with same names
5361         - shutdown the drbd device on the old secondary
5362         - disconnect the drbd network on the primary
5363         - create the drbd device on the new secondary
5364         - network attach the drbd on the primary, using an artifice:
5365           the drbd code for Attach() will connect to the network if it
5366           finds a device which is connected to the good local disks but
5367           not network enabled
5368       - wait for sync across all devices
5369       - remove all disks from the old secondary
5370
5371     Failures are not very well handled.
5372
5373     """
5374     steps_total = 6
5375     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5376     instance = self.instance
5377     iv_names = {}
5378     # start of work
5379     cfg = self.cfg
5380     old_node = self.tgt_node
5381     new_node = self.new_node
5382     pri_node = instance.primary_node
5383     nodes_ip = {
5384       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5385       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5386       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5387       }
5388
5389     # Step: check device activation
5390     self.proc.LogStep(1, steps_total, "check device existence")
5391     info("checking volume groups")
5392     my_vg = cfg.GetVGName()
5393     results = self.rpc.call_vg_list([pri_node, new_node])
5394     for node in pri_node, new_node:
5395       res = results[node]
5396       res.Raise("Error checking node %s" % node)
5397       if my_vg not in res.payload:
5398         raise errors.OpExecError("Volume group '%s' not found on %s" %
5399                                  (my_vg, node))
5400     for idx, dev in enumerate(instance.disks):
5401       if idx not in self.op.disks:
5402         continue
5403       info("checking disk/%d on %s" % (idx, pri_node))
5404       cfg.SetDiskID(dev, pri_node)
5405       result = self.rpc.call_blockdev_find(pri_node, dev)
5406       msg = result.fail_msg
5407       if not msg and not result.payload:
5408         msg = "disk not found"
5409       if msg:
5410         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5411                                  (idx, pri_node, msg))
5412
5413     # Step: check other node consistency
5414     self.proc.LogStep(2, steps_total, "check peer consistency")
5415     for idx, dev in enumerate(instance.disks):
5416       if idx not in self.op.disks:
5417         continue
5418       info("checking disk/%d consistency on %s" % (idx, pri_node))
5419       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5420         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5421                                  " unsafe to replace the secondary" %
5422                                  pri_node)
5423
5424     # Step: create new storage
5425     self.proc.LogStep(3, steps_total, "allocate new storage")
5426     for idx, dev in enumerate(instance.disks):
5427       info("adding new local storage on %s for disk/%d" %
5428            (new_node, idx))
5429       # we pass force_create=True to force LVM creation
5430       for new_lv in dev.children:
5431         _CreateBlockDev(self, new_node, instance, new_lv, True,
5432                         _GetInstanceInfoText(instance), False)
5433
5434     # Step 4: dbrd minors and drbd setups changes
5435     # after this, we must manually remove the drbd minors on both the
5436     # error and the success paths
5437     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5438                                    instance.name)
5439     logging.debug("Allocated minors %s" % (minors,))
5440     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5441     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5442       size = dev.size
5443       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5444       # create new devices on new_node; note that we create two IDs:
5445       # one without port, so the drbd will be activated without
5446       # networking information on the new node at this stage, and one
5447       # with network, for the latter activation in step 4
5448       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5449       if pri_node == o_node1:
5450         p_minor = o_minor1
5451       else:
5452         p_minor = o_minor2
5453
5454       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5455       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5456
5457       iv_names[idx] = (dev, dev.children, new_net_id)
5458       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5459                     new_net_id)
5460       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5461                               logical_id=new_alone_id,
5462                               children=dev.children,
5463                               size=dev.size)
5464       try:
5465         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5466                               _GetInstanceInfoText(instance), False)
5467       except errors.GenericError:
5468         self.cfg.ReleaseDRBDMinors(instance.name)
5469         raise
5470
5471     for idx, dev in enumerate(instance.disks):
5472       # we have new devices, shutdown the drbd on the old secondary
5473       info("shutting down drbd for disk/%d on old node" % idx)
5474       cfg.SetDiskID(dev, old_node)
5475       msg = self.rpc.call_blockdev_shutdown(old_node, dev).fail_msg
5476       if msg:
5477         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5478                 (idx, msg),
5479                 hint="Please cleanup this device manually as soon as possible")
5480
5481     info("detaching primary drbds from the network (=> standalone)")
5482     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5483                                                instance.disks)[pri_node]
5484
5485     msg = result.fail_msg
5486     if msg:
5487       # detaches didn't succeed (unlikely)
5488       self.cfg.ReleaseDRBDMinors(instance.name)
5489       raise errors.OpExecError("Can't detach the disks from the network on"
5490                                " old node: %s" % (msg,))
5491
5492     # if we managed to detach at least one, we update all the disks of
5493     # the instance to point to the new secondary
5494     info("updating instance configuration")
5495     for dev, _, new_logical_id in iv_names.itervalues():
5496       dev.logical_id = new_logical_id
5497       cfg.SetDiskID(dev, pri_node)
5498     cfg.Update(instance)
5499
5500     # and now perform the drbd attach
5501     info("attaching primary drbds to new secondary (standalone => connected)")
5502     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5503                                            instance.disks, instance.name,
5504                                            False)
5505     for to_node, to_result in result.items():
5506       msg = to_result.fail_msg
5507       if msg:
5508         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5509                 hint="please do a gnt-instance info to see the"
5510                 " status of disks")
5511
5512     # this can fail as the old devices are degraded and _WaitForSync
5513     # does a combined result over all disks, so we don't check its
5514     # return value
5515     self.proc.LogStep(5, steps_total, "sync devices")
5516     _WaitForSync(self, instance, unlock=True)
5517
5518     # so check manually all the devices
5519     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5520       cfg.SetDiskID(dev, pri_node)
5521       result = self.rpc.call_blockdev_find(pri_node, dev)
5522       msg = result.fail_msg
5523       if not msg and not result.payload:
5524         msg = "disk not found"
5525       if msg:
5526         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5527                                  (idx, msg))
5528       if result.payload[5]:
5529         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5530
5531     self.proc.LogStep(6, steps_total, "removing old storage")
5532     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5533       info("remove logical volumes for disk/%d" % idx)
5534       for lv in old_lvs:
5535         cfg.SetDiskID(lv, old_node)
5536         msg = self.rpc.call_blockdev_remove(old_node, lv).fail_msg
5537         if msg:
5538           warning("Can't remove LV on old secondary: %s", msg,
5539                   hint="Cleanup stale volumes by hand")
5540
5541   def Exec(self, feedback_fn):
5542     """Execute disk replacement.
5543
5544     This dispatches the disk replacement to the appropriate handler.
5545
5546     """
5547     instance = self.instance
5548
5549     # Activate the instance disks if we're replacing them on a down instance
5550     if not instance.admin_up:
5551       _StartInstanceDisks(self, instance, True)
5552
5553     if self.op.mode == constants.REPLACE_DISK_CHG:
5554       fn = self._ExecD8Secondary
5555     else:
5556       fn = self._ExecD8DiskOnly
5557
5558     ret = fn(feedback_fn)
5559
5560     # Deactivate the instance disks if we're replacing them on a down instance
5561     if not instance.admin_up:
5562       _SafeShutdownInstanceDisks(self, instance)
5563
5564     return ret
5565
5566
5567 class LUGrowDisk(LogicalUnit):
5568   """Grow a disk of an instance.
5569
5570   """
5571   HPATH = "disk-grow"
5572   HTYPE = constants.HTYPE_INSTANCE
5573   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5574   REQ_BGL = False
5575
5576   def ExpandNames(self):
5577     self._ExpandAndLockInstance()
5578     self.needed_locks[locking.LEVEL_NODE] = []
5579     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5580
5581   def DeclareLocks(self, level):
5582     if level == locking.LEVEL_NODE:
5583       self._LockInstancesNodes()
5584
5585   def BuildHooksEnv(self):
5586     """Build hooks env.
5587
5588     This runs on the master, the primary and all the secondaries.
5589
5590     """
5591     env = {
5592       "DISK": self.op.disk,
5593       "AMOUNT": self.op.amount,
5594       }
5595     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5596     nl = [
5597       self.cfg.GetMasterNode(),
5598       self.instance.primary_node,
5599       ]
5600     return env, nl, nl
5601
5602   def CheckPrereq(self):
5603     """Check prerequisites.
5604
5605     This checks that the instance is in the cluster.
5606
5607     """
5608     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5609     assert instance is not None, \
5610       "Cannot retrieve locked instance %s" % self.op.instance_name
5611     nodenames = list(instance.all_nodes)
5612     for node in nodenames:
5613       _CheckNodeOnline(self, node)
5614
5615
5616     self.instance = instance
5617
5618     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5619       raise errors.OpPrereqError("Instance's disk layout does not support"
5620                                  " growing.")
5621
5622     self.disk = instance.FindDisk(self.op.disk)
5623
5624     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5625                                        instance.hypervisor)
5626     for node in nodenames:
5627       info = nodeinfo[node]
5628       info.Raise("Cannot get current information from node %s" % node)
5629       vg_free = info.payload.get('vg_free', None)
5630       if not isinstance(vg_free, int):
5631         raise errors.OpPrereqError("Can't compute free disk space on"
5632                                    " node %s" % node)
5633       if self.op.amount > vg_free:
5634         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5635                                    " %d MiB available, %d MiB required" %
5636                                    (node, vg_free, self.op.amount))
5637
5638   def Exec(self, feedback_fn):
5639     """Execute disk grow.
5640
5641     """
5642     instance = self.instance
5643     disk = self.disk
5644     for node in instance.all_nodes:
5645       self.cfg.SetDiskID(disk, node)
5646       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5647       result.Raise("Grow request failed to node %s" % node)
5648     disk.RecordGrow(self.op.amount)
5649     self.cfg.Update(instance)
5650     if self.op.wait_for_sync:
5651       disk_abort = not _WaitForSync(self, instance)
5652       if disk_abort:
5653         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5654                              " status.\nPlease check the instance.")
5655
5656
5657 class LUQueryInstanceData(NoHooksLU):
5658   """Query runtime instance data.
5659
5660   """
5661   _OP_REQP = ["instances", "static"]
5662   REQ_BGL = False
5663
5664   def ExpandNames(self):
5665     self.needed_locks = {}
5666     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5667
5668     if not isinstance(self.op.instances, list):
5669       raise errors.OpPrereqError("Invalid argument type 'instances'")
5670
5671     if self.op.instances:
5672       self.wanted_names = []
5673       for name in self.op.instances:
5674         full_name = self.cfg.ExpandInstanceName(name)
5675         if full_name is None:
5676           raise errors.OpPrereqError("Instance '%s' not known" % name)
5677         self.wanted_names.append(full_name)
5678       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5679     else:
5680       self.wanted_names = None
5681       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5682
5683     self.needed_locks[locking.LEVEL_NODE] = []
5684     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5685
5686   def DeclareLocks(self, level):
5687     if level == locking.LEVEL_NODE:
5688       self._LockInstancesNodes()
5689
5690   def CheckPrereq(self):
5691     """Check prerequisites.
5692
5693     This only checks the optional instance list against the existing names.
5694
5695     """
5696     if self.wanted_names is None:
5697       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5698
5699     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5700                              in self.wanted_names]
5701     return
5702
5703   def _ComputeDiskStatus(self, instance, snode, dev):
5704     """Compute block device status.
5705
5706     """
5707     static = self.op.static
5708     if not static:
5709       self.cfg.SetDiskID(dev, instance.primary_node)
5710       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5711       if dev_pstatus.offline:
5712         dev_pstatus = None
5713       else:
5714         dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
5715         dev_pstatus = dev_pstatus.payload
5716     else:
5717       dev_pstatus = None
5718
5719     if dev.dev_type in constants.LDS_DRBD:
5720       # we change the snode then (otherwise we use the one passed in)
5721       if dev.logical_id[0] == instance.primary_node:
5722         snode = dev.logical_id[1]
5723       else:
5724         snode = dev.logical_id[0]
5725
5726     if snode and not static:
5727       self.cfg.SetDiskID(dev, snode)
5728       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5729       if dev_sstatus.offline:
5730         dev_sstatus = None
5731       else:
5732         dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
5733         dev_sstatus = dev_sstatus.payload
5734     else:
5735       dev_sstatus = None
5736
5737     if dev.children:
5738       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5739                       for child in dev.children]
5740     else:
5741       dev_children = []
5742
5743     data = {
5744       "iv_name": dev.iv_name,
5745       "dev_type": dev.dev_type,
5746       "logical_id": dev.logical_id,
5747       "physical_id": dev.physical_id,
5748       "pstatus": dev_pstatus,
5749       "sstatus": dev_sstatus,
5750       "children": dev_children,
5751       "mode": dev.mode,
5752       }
5753
5754     return data
5755
5756   def Exec(self, feedback_fn):
5757     """Gather and return data"""
5758     result = {}
5759
5760     cluster = self.cfg.GetClusterInfo()
5761
5762     for instance in self.wanted_instances:
5763       if not self.op.static:
5764         remote_info = self.rpc.call_instance_info(instance.primary_node,
5765                                                   instance.name,
5766                                                   instance.hypervisor)
5767         remote_info.Raise("Error checking node %s" % instance.primary_node)
5768         remote_info = remote_info.payload
5769         if remote_info and "state" in remote_info:
5770           remote_state = "up"
5771         else:
5772           remote_state = "down"
5773       else:
5774         remote_state = None
5775       if instance.admin_up:
5776         config_state = "up"
5777       else:
5778         config_state = "down"
5779
5780       disks = [self._ComputeDiskStatus(instance, None, device)
5781                for device in instance.disks]
5782
5783       idict = {
5784         "name": instance.name,
5785         "config_state": config_state,
5786         "run_state": remote_state,
5787         "pnode": instance.primary_node,
5788         "snodes": instance.secondary_nodes,
5789         "os": instance.os,
5790         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5791         "disks": disks,
5792         "hypervisor": instance.hypervisor,
5793         "network_port": instance.network_port,
5794         "hv_instance": instance.hvparams,
5795         "hv_actual": cluster.FillHV(instance),
5796         "be_instance": instance.beparams,
5797         "be_actual": cluster.FillBE(instance),
5798         }
5799
5800       result[instance.name] = idict
5801
5802     return result
5803
5804
5805 class LUSetInstanceParams(LogicalUnit):
5806   """Modifies an instances's parameters.
5807
5808   """
5809   HPATH = "instance-modify"
5810   HTYPE = constants.HTYPE_INSTANCE
5811   _OP_REQP = ["instance_name"]
5812   REQ_BGL = False
5813
5814   def CheckArguments(self):
5815     if not hasattr(self.op, 'nics'):
5816       self.op.nics = []
5817     if not hasattr(self.op, 'disks'):
5818       self.op.disks = []
5819     if not hasattr(self.op, 'beparams'):
5820       self.op.beparams = {}
5821     if not hasattr(self.op, 'hvparams'):
5822       self.op.hvparams = {}
5823     self.op.force = getattr(self.op, "force", False)
5824     if not (self.op.nics or self.op.disks or
5825             self.op.hvparams or self.op.beparams):
5826       raise errors.OpPrereqError("No changes submitted")
5827
5828     # Disk validation
5829     disk_addremove = 0
5830     for disk_op, disk_dict in self.op.disks:
5831       if disk_op == constants.DDM_REMOVE:
5832         disk_addremove += 1
5833         continue
5834       elif disk_op == constants.DDM_ADD:
5835         disk_addremove += 1
5836       else:
5837         if not isinstance(disk_op, int):
5838           raise errors.OpPrereqError("Invalid disk index")
5839       if disk_op == constants.DDM_ADD:
5840         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5841         if mode not in constants.DISK_ACCESS_SET:
5842           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5843         size = disk_dict.get('size', None)
5844         if size is None:
5845           raise errors.OpPrereqError("Required disk parameter size missing")
5846         try:
5847           size = int(size)
5848         except ValueError, err:
5849           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5850                                      str(err))
5851         disk_dict['size'] = size
5852       else:
5853         # modification of disk
5854         if 'size' in disk_dict:
5855           raise errors.OpPrereqError("Disk size change not possible, use"
5856                                      " grow-disk")
5857
5858     if disk_addremove > 1:
5859       raise errors.OpPrereqError("Only one disk add or remove operation"
5860                                  " supported at a time")
5861
5862     # NIC validation
5863     nic_addremove = 0
5864     for nic_op, nic_dict in self.op.nics:
5865       if nic_op == constants.DDM_REMOVE:
5866         nic_addremove += 1
5867         continue
5868       elif nic_op == constants.DDM_ADD:
5869         nic_addremove += 1
5870       else:
5871         if not isinstance(nic_op, int):
5872           raise errors.OpPrereqError("Invalid nic index")
5873
5874       # nic_dict should be a dict
5875       nic_ip = nic_dict.get('ip', None)
5876       if nic_ip is not None:
5877         if nic_ip.lower() == constants.VALUE_NONE:
5878           nic_dict['ip'] = None
5879         else:
5880           if not utils.IsValidIP(nic_ip):
5881             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5882
5883       nic_bridge = nic_dict.get('bridge', None)
5884       nic_link = nic_dict.get('link', None)
5885       if nic_bridge and nic_link:
5886         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5887       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5888         nic_dict['bridge'] = None
5889       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5890         nic_dict['link'] = None
5891
5892       if nic_op == constants.DDM_ADD:
5893         nic_mac = nic_dict.get('mac', None)
5894         if nic_mac is None:
5895           nic_dict['mac'] = constants.VALUE_AUTO
5896
5897       if 'mac' in nic_dict:
5898         nic_mac = nic_dict['mac']
5899         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5900           if not utils.IsValidMac(nic_mac):
5901             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5902         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5903           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5904                                      " modifying an existing nic")
5905
5906     if nic_addremove > 1:
5907       raise errors.OpPrereqError("Only one NIC add or remove operation"
5908                                  " supported at a time")
5909
5910   def ExpandNames(self):
5911     self._ExpandAndLockInstance()
5912     self.needed_locks[locking.LEVEL_NODE] = []
5913     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5914
5915   def DeclareLocks(self, level):
5916     if level == locking.LEVEL_NODE:
5917       self._LockInstancesNodes()
5918
5919   def BuildHooksEnv(self):
5920     """Build hooks env.
5921
5922     This runs on the master, primary and secondaries.
5923
5924     """
5925     args = dict()
5926     if constants.BE_MEMORY in self.be_new:
5927       args['memory'] = self.be_new[constants.BE_MEMORY]
5928     if constants.BE_VCPUS in self.be_new:
5929       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5930     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5931     # information at all.
5932     if self.op.nics:
5933       args['nics'] = []
5934       nic_override = dict(self.op.nics)
5935       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
5936       for idx, nic in enumerate(self.instance.nics):
5937         if idx in nic_override:
5938           this_nic_override = nic_override[idx]
5939         else:
5940           this_nic_override = {}
5941         if 'ip' in this_nic_override:
5942           ip = this_nic_override['ip']
5943         else:
5944           ip = nic.ip
5945         if 'mac' in this_nic_override:
5946           mac = this_nic_override['mac']
5947         else:
5948           mac = nic.mac
5949         if idx in self.nic_pnew:
5950           nicparams = self.nic_pnew[idx]
5951         else:
5952           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
5953         mode = nicparams[constants.NIC_MODE]
5954         link = nicparams[constants.NIC_LINK]
5955         args['nics'].append((ip, mac, mode, link))
5956       if constants.DDM_ADD in nic_override:
5957         ip = nic_override[constants.DDM_ADD].get('ip', None)
5958         mac = nic_override[constants.DDM_ADD]['mac']
5959         nicparams = self.nic_pnew[constants.DDM_ADD]
5960         mode = nicparams[constants.NIC_MODE]
5961         link = nicparams[constants.NIC_LINK]
5962         args['nics'].append((ip, mac, mode, link))
5963       elif constants.DDM_REMOVE in nic_override:
5964         del args['nics'][-1]
5965
5966     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5967     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5968     return env, nl, nl
5969
5970   def _GetUpdatedParams(self, old_params, update_dict,
5971                         default_values, parameter_types):
5972     """Return the new params dict for the given params.
5973
5974     @type old_params: dict
5975     @type old_params: old parameters
5976     @type update_dict: dict
5977     @type update_dict: dict containing new parameter values,
5978                        or constants.VALUE_DEFAULT to reset the
5979                        parameter to its default value
5980     @type default_values: dict
5981     @param default_values: default values for the filled parameters
5982     @type parameter_types: dict
5983     @param parameter_types: dict mapping target dict keys to types
5984                             in constants.ENFORCEABLE_TYPES
5985     @rtype: (dict, dict)
5986     @return: (new_parameters, filled_parameters)
5987
5988     """
5989     params_copy = copy.deepcopy(old_params)
5990     for key, val in update_dict.iteritems():
5991       if val == constants.VALUE_DEFAULT:
5992         try:
5993           del params_copy[key]
5994         except KeyError:
5995           pass
5996       else:
5997         params_copy[key] = val
5998     utils.ForceDictType(params_copy, parameter_types)
5999     params_filled = objects.FillDict(default_values, params_copy)
6000     return (params_copy, params_filled)
6001
6002   def CheckPrereq(self):
6003     """Check prerequisites.
6004
6005     This only checks the instance list against the existing names.
6006
6007     """
6008     force = self.force = self.op.force
6009
6010     # checking the new params on the primary/secondary nodes
6011
6012     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6013     cluster = self.cluster = self.cfg.GetClusterInfo()
6014     assert self.instance is not None, \
6015       "Cannot retrieve locked instance %s" % self.op.instance_name
6016     pnode = instance.primary_node
6017     nodelist = list(instance.all_nodes)
6018
6019     # hvparams processing
6020     if self.op.hvparams:
6021       i_hvdict, hv_new = self._GetUpdatedParams(
6022                              instance.hvparams, self.op.hvparams,
6023                              cluster.hvparams[instance.hypervisor],
6024                              constants.HVS_PARAMETER_TYPES)
6025       # local check
6026       hypervisor.GetHypervisor(
6027         instance.hypervisor).CheckParameterSyntax(hv_new)
6028       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6029       self.hv_new = hv_new # the new actual values
6030       self.hv_inst = i_hvdict # the new dict (without defaults)
6031     else:
6032       self.hv_new = self.hv_inst = {}
6033
6034     # beparams processing
6035     if self.op.beparams:
6036       i_bedict, be_new = self._GetUpdatedParams(
6037                              instance.beparams, self.op.beparams,
6038                              cluster.beparams[constants.PP_DEFAULT],
6039                              constants.BES_PARAMETER_TYPES)
6040       self.be_new = be_new # the new actual values
6041       self.be_inst = i_bedict # the new dict (without defaults)
6042     else:
6043       self.be_new = self.be_inst = {}
6044
6045     self.warn = []
6046
6047     if constants.BE_MEMORY in self.op.beparams and not self.force:
6048       mem_check_list = [pnode]
6049       if be_new[constants.BE_AUTO_BALANCE]:
6050         # either we changed auto_balance to yes or it was from before
6051         mem_check_list.extend(instance.secondary_nodes)
6052       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6053                                                   instance.hypervisor)
6054       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6055                                          instance.hypervisor)
6056       pninfo = nodeinfo[pnode]
6057       msg = pninfo.fail_msg
6058       if msg:
6059         # Assume the primary node is unreachable and go ahead
6060         self.warn.append("Can't get info from primary node %s: %s" %
6061                          (pnode,  msg))
6062       elif not isinstance(pninfo.payload.get('memory_free', None), int):
6063         self.warn.append("Node data from primary node %s doesn't contain"
6064                          " free memory information" % pnode)
6065       elif instance_info.fail_msg:
6066         self.warn.append("Can't get instance runtime information: %s" %
6067                         instance_info.fail_msg)
6068       else:
6069         if instance_info.payload:
6070           current_mem = int(instance_info.payload['memory'])
6071         else:
6072           # Assume instance not running
6073           # (there is a slight race condition here, but it's not very probable,
6074           # and we have no other way to check)
6075           current_mem = 0
6076         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6077                     pninfo.payload['memory_free'])
6078         if miss_mem > 0:
6079           raise errors.OpPrereqError("This change will prevent the instance"
6080                                      " from starting, due to %d MB of memory"
6081                                      " missing on its primary node" % miss_mem)
6082
6083       if be_new[constants.BE_AUTO_BALANCE]:
6084         for node, nres in nodeinfo.items():
6085           if node not in instance.secondary_nodes:
6086             continue
6087           msg = nres.fail_msg
6088           if msg:
6089             self.warn.append("Can't get info from secondary node %s: %s" %
6090                              (node, msg))
6091           elif not isinstance(nres.payload.get('memory_free', None), int):
6092             self.warn.append("Secondary node %s didn't return free"
6093                              " memory information" % node)
6094           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6095             self.warn.append("Not enough memory to failover instance to"
6096                              " secondary node %s" % node)
6097
6098     # NIC processing
6099     self.nic_pnew = {}
6100     self.nic_pinst = {}
6101     for nic_op, nic_dict in self.op.nics:
6102       if nic_op == constants.DDM_REMOVE:
6103         if not instance.nics:
6104           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6105         continue
6106       if nic_op != constants.DDM_ADD:
6107         # an existing nic
6108         if nic_op < 0 or nic_op >= len(instance.nics):
6109           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6110                                      " are 0 to %d" %
6111                                      (nic_op, len(instance.nics)))
6112         old_nic_params = instance.nics[nic_op].nicparams
6113         old_nic_ip = instance.nics[nic_op].ip
6114       else:
6115         old_nic_params = {}
6116         old_nic_ip = None
6117
6118       update_params_dict = dict([(key, nic_dict[key])
6119                                  for key in constants.NICS_PARAMETERS
6120                                  if key in nic_dict])
6121
6122       if 'bridge' in nic_dict:
6123         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6124
6125       new_nic_params, new_filled_nic_params = \
6126           self._GetUpdatedParams(old_nic_params, update_params_dict,
6127                                  cluster.nicparams[constants.PP_DEFAULT],
6128                                  constants.NICS_PARAMETER_TYPES)
6129       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6130       self.nic_pinst[nic_op] = new_nic_params
6131       self.nic_pnew[nic_op] = new_filled_nic_params
6132       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6133
6134       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6135         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6136         msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6137         if msg:
6138           msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6139           if self.force:
6140             self.warn.append(msg)
6141           else:
6142             raise errors.OpPrereqError(msg)
6143       if new_nic_mode == constants.NIC_MODE_ROUTED:
6144         if 'ip' in nic_dict:
6145           nic_ip = nic_dict['ip']
6146         else:
6147           nic_ip = old_nic_ip
6148         if nic_ip is None:
6149           raise errors.OpPrereqError('Cannot set the nic ip to None'
6150                                      ' on a routed nic')
6151       if 'mac' in nic_dict:
6152         nic_mac = nic_dict['mac']
6153         if nic_mac is None:
6154           raise errors.OpPrereqError('Cannot set the nic mac to None')
6155         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6156           # otherwise generate the mac
6157           nic_dict['mac'] = self.cfg.GenerateMAC()
6158         else:
6159           # or validate/reserve the current one
6160           if self.cfg.IsMacInUse(nic_mac):
6161             raise errors.OpPrereqError("MAC address %s already in use"
6162                                        " in cluster" % nic_mac)
6163
6164     # DISK processing
6165     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6166       raise errors.OpPrereqError("Disk operations not supported for"
6167                                  " diskless instances")
6168     for disk_op, disk_dict in self.op.disks:
6169       if disk_op == constants.DDM_REMOVE:
6170         if len(instance.disks) == 1:
6171           raise errors.OpPrereqError("Cannot remove the last disk of"
6172                                      " an instance")
6173         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6174         ins_l = ins_l[pnode]
6175         msg = ins_l.fail_msg
6176         if msg:
6177           raise errors.OpPrereqError("Can't contact node %s: %s" %
6178                                      (pnode, msg))
6179         if instance.name in ins_l.payload:
6180           raise errors.OpPrereqError("Instance is running, can't remove"
6181                                      " disks.")
6182
6183       if (disk_op == constants.DDM_ADD and
6184           len(instance.nics) >= constants.MAX_DISKS):
6185         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6186                                    " add more" % constants.MAX_DISKS)
6187       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6188         # an existing disk
6189         if disk_op < 0 or disk_op >= len(instance.disks):
6190           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6191                                      " are 0 to %d" %
6192                                      (disk_op, len(instance.disks)))
6193
6194     return
6195
6196   def Exec(self, feedback_fn):
6197     """Modifies an instance.
6198
6199     All parameters take effect only at the next restart of the instance.
6200
6201     """
6202     # Process here the warnings from CheckPrereq, as we don't have a
6203     # feedback_fn there.
6204     for warn in self.warn:
6205       feedback_fn("WARNING: %s" % warn)
6206
6207     result = []
6208     instance = self.instance
6209     cluster = self.cluster
6210     # disk changes
6211     for disk_op, disk_dict in self.op.disks:
6212       if disk_op == constants.DDM_REMOVE:
6213         # remove the last disk
6214         device = instance.disks.pop()
6215         device_idx = len(instance.disks)
6216         for node, disk in device.ComputeNodeTree(instance.primary_node):
6217           self.cfg.SetDiskID(disk, node)
6218           msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6219           if msg:
6220             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6221                             " continuing anyway", device_idx, node, msg)
6222         result.append(("disk/%d" % device_idx, "remove"))
6223       elif disk_op == constants.DDM_ADD:
6224         # add a new disk
6225         if instance.disk_template == constants.DT_FILE:
6226           file_driver, file_path = instance.disks[0].logical_id
6227           file_path = os.path.dirname(file_path)
6228         else:
6229           file_driver = file_path = None
6230         disk_idx_base = len(instance.disks)
6231         new_disk = _GenerateDiskTemplate(self,
6232                                          instance.disk_template,
6233                                          instance.name, instance.primary_node,
6234                                          instance.secondary_nodes,
6235                                          [disk_dict],
6236                                          file_path,
6237                                          file_driver,
6238                                          disk_idx_base)[0]
6239         instance.disks.append(new_disk)
6240         info = _GetInstanceInfoText(instance)
6241
6242         logging.info("Creating volume %s for instance %s",
6243                      new_disk.iv_name, instance.name)
6244         # Note: this needs to be kept in sync with _CreateDisks
6245         #HARDCODE
6246         for node in instance.all_nodes:
6247           f_create = node == instance.primary_node
6248           try:
6249             _CreateBlockDev(self, node, instance, new_disk,
6250                             f_create, info, f_create)
6251           except errors.OpExecError, err:
6252             self.LogWarning("Failed to create volume %s (%s) on"
6253                             " node %s: %s",
6254                             new_disk.iv_name, new_disk, node, err)
6255         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6256                        (new_disk.size, new_disk.mode)))
6257       else:
6258         # change a given disk
6259         instance.disks[disk_op].mode = disk_dict['mode']
6260         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6261     # NIC changes
6262     for nic_op, nic_dict in self.op.nics:
6263       if nic_op == constants.DDM_REMOVE:
6264         # remove the last nic
6265         del instance.nics[-1]
6266         result.append(("nic.%d" % len(instance.nics), "remove"))
6267       elif nic_op == constants.DDM_ADD:
6268         # mac and bridge should be set, by now
6269         mac = nic_dict['mac']
6270         ip = nic_dict.get('ip', None)
6271         nicparams = self.nic_pinst[constants.DDM_ADD]
6272         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6273         instance.nics.append(new_nic)
6274         result.append(("nic.%d" % (len(instance.nics) - 1),
6275                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6276                        (new_nic.mac, new_nic.ip,
6277                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6278                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6279                        )))
6280       else:
6281         for key in 'mac', 'ip':
6282           if key in nic_dict:
6283             setattr(instance.nics[nic_op], key, nic_dict[key])
6284         if nic_op in self.nic_pnew:
6285           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6286         for key, val in nic_dict.iteritems():
6287           result.append(("nic.%s/%d" % (key, nic_op), val))
6288
6289     # hvparams changes
6290     if self.op.hvparams:
6291       instance.hvparams = self.hv_inst
6292       for key, val in self.op.hvparams.iteritems():
6293         result.append(("hv/%s" % key, val))
6294
6295     # beparams changes
6296     if self.op.beparams:
6297       instance.beparams = self.be_inst
6298       for key, val in self.op.beparams.iteritems():
6299         result.append(("be/%s" % key, val))
6300
6301     self.cfg.Update(instance)
6302
6303     return result
6304
6305
6306 class LUQueryExports(NoHooksLU):
6307   """Query the exports list
6308
6309   """
6310   _OP_REQP = ['nodes']
6311   REQ_BGL = False
6312
6313   def ExpandNames(self):
6314     self.needed_locks = {}
6315     self.share_locks[locking.LEVEL_NODE] = 1
6316     if not self.op.nodes:
6317       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6318     else:
6319       self.needed_locks[locking.LEVEL_NODE] = \
6320         _GetWantedNodes(self, self.op.nodes)
6321
6322   def CheckPrereq(self):
6323     """Check prerequisites.
6324
6325     """
6326     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6327
6328   def Exec(self, feedback_fn):
6329     """Compute the list of all the exported system images.
6330
6331     @rtype: dict
6332     @return: a dictionary with the structure node->(export-list)
6333         where export-list is a list of the instances exported on
6334         that node.
6335
6336     """
6337     rpcresult = self.rpc.call_export_list(self.nodes)
6338     result = {}
6339     for node in rpcresult:
6340       if rpcresult[node].fail_msg:
6341         result[node] = False
6342       else:
6343         result[node] = rpcresult[node].payload
6344
6345     return result
6346
6347
6348 class LUExportInstance(LogicalUnit):
6349   """Export an instance to an image in the cluster.
6350
6351   """
6352   HPATH = "instance-export"
6353   HTYPE = constants.HTYPE_INSTANCE
6354   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6355   REQ_BGL = False
6356
6357   def ExpandNames(self):
6358     self._ExpandAndLockInstance()
6359     # FIXME: lock only instance primary and destination node
6360     #
6361     # Sad but true, for now we have do lock all nodes, as we don't know where
6362     # the previous export might be, and and in this LU we search for it and
6363     # remove it from its current node. In the future we could fix this by:
6364     #  - making a tasklet to search (share-lock all), then create the new one,
6365     #    then one to remove, after
6366     #  - removing the removal operation altoghether
6367     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6368
6369   def DeclareLocks(self, level):
6370     """Last minute lock declaration."""
6371     # All nodes are locked anyway, so nothing to do here.
6372
6373   def BuildHooksEnv(self):
6374     """Build hooks env.
6375
6376     This will run on the master, primary node and target node.
6377
6378     """
6379     env = {
6380       "EXPORT_NODE": self.op.target_node,
6381       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6382       }
6383     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6384     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6385           self.op.target_node]
6386     return env, nl, nl
6387
6388   def CheckPrereq(self):
6389     """Check prerequisites.
6390
6391     This checks that the instance and node names are valid.
6392
6393     """
6394     instance_name = self.op.instance_name
6395     self.instance = self.cfg.GetInstanceInfo(instance_name)
6396     assert self.instance is not None, \
6397           "Cannot retrieve locked instance %s" % self.op.instance_name
6398     _CheckNodeOnline(self, self.instance.primary_node)
6399
6400     self.dst_node = self.cfg.GetNodeInfo(
6401       self.cfg.ExpandNodeName(self.op.target_node))
6402
6403     if self.dst_node is None:
6404       # This is wrong node name, not a non-locked node
6405       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6406     _CheckNodeOnline(self, self.dst_node.name)
6407     _CheckNodeNotDrained(self, self.dst_node.name)
6408
6409     # instance disk type verification
6410     for disk in self.instance.disks:
6411       if disk.dev_type == constants.LD_FILE:
6412         raise errors.OpPrereqError("Export not supported for instances with"
6413                                    " file-based disks")
6414
6415   def Exec(self, feedback_fn):
6416     """Export an instance to an image in the cluster.
6417
6418     """
6419     instance = self.instance
6420     dst_node = self.dst_node
6421     src_node = instance.primary_node
6422     if self.op.shutdown:
6423       # shutdown the instance, but not the disks
6424       result = self.rpc.call_instance_shutdown(src_node, instance)
6425       result.Raise("Could not shutdown instance %s on"
6426                    " node %s" % (instance.name, src_node))
6427
6428     vgname = self.cfg.GetVGName()
6429
6430     snap_disks = []
6431
6432     # set the disks ID correctly since call_instance_start needs the
6433     # correct drbd minor to create the symlinks
6434     for disk in instance.disks:
6435       self.cfg.SetDiskID(disk, src_node)
6436
6437     try:
6438       for idx, disk in enumerate(instance.disks):
6439         # result.payload will be a snapshot of an lvm leaf of the one we passed
6440         result = self.rpc.call_blockdev_snapshot(src_node, disk)
6441         msg = result.fail_msg
6442         if msg:
6443           self.LogWarning("Could not snapshot disk/%s on node %s: %s",
6444                           idx, src_node, msg)
6445           snap_disks.append(False)
6446         else:
6447           disk_id = (vgname, result.payload)
6448           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6449                                  logical_id=disk_id, physical_id=disk_id,
6450                                  iv_name=disk.iv_name)
6451           snap_disks.append(new_dev)
6452
6453     finally:
6454       if self.op.shutdown and instance.admin_up:
6455         result = self.rpc.call_instance_start(src_node, instance, None, None)
6456         msg = result.fail_msg
6457         if msg:
6458           _ShutdownInstanceDisks(self, instance)
6459           raise errors.OpExecError("Could not start instance: %s" % msg)
6460
6461     # TODO: check for size
6462
6463     cluster_name = self.cfg.GetClusterName()
6464     for idx, dev in enumerate(snap_disks):
6465       if dev:
6466         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6467                                                instance, cluster_name, idx)
6468         msg = result.fail_msg
6469         if msg:
6470           self.LogWarning("Could not export disk/%s from node %s to"
6471                           " node %s: %s", idx, src_node, dst_node.name, msg)
6472         msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
6473         if msg:
6474           self.LogWarning("Could not remove snapshot for disk/%d from node"
6475                           " %s: %s", idx, src_node, msg)
6476
6477     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6478     msg = result.fail_msg
6479     if msg:
6480       self.LogWarning("Could not finalize export for instance %s"
6481                       " on node %s: %s", instance.name, dst_node.name, msg)
6482
6483     nodelist = self.cfg.GetNodeList()
6484     nodelist.remove(dst_node.name)
6485
6486     # on one-node clusters nodelist will be empty after the removal
6487     # if we proceed the backup would be removed because OpQueryExports
6488     # substitutes an empty list with the full cluster node list.
6489     iname = instance.name
6490     if nodelist:
6491       exportlist = self.rpc.call_export_list(nodelist)
6492       for node in exportlist:
6493         if exportlist[node].fail_msg:
6494           continue
6495         if iname in exportlist[node].payload:
6496           msg = self.rpc.call_export_remove(node, iname).fail_msg
6497           if msg:
6498             self.LogWarning("Could not remove older export for instance %s"
6499                             " on node %s: %s", iname, node, msg)
6500
6501
6502 class LURemoveExport(NoHooksLU):
6503   """Remove exports related to the named instance.
6504
6505   """
6506   _OP_REQP = ["instance_name"]
6507   REQ_BGL = False
6508
6509   def ExpandNames(self):
6510     self.needed_locks = {}
6511     # We need all nodes to be locked in order for RemoveExport to work, but we
6512     # don't need to lock the instance itself, as nothing will happen to it (and
6513     # we can remove exports also for a removed instance)
6514     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6515
6516   def CheckPrereq(self):
6517     """Check prerequisites.
6518     """
6519     pass
6520
6521   def Exec(self, feedback_fn):
6522     """Remove any export.
6523
6524     """
6525     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6526     # If the instance was not found we'll try with the name that was passed in.
6527     # This will only work if it was an FQDN, though.
6528     fqdn_warn = False
6529     if not instance_name:
6530       fqdn_warn = True
6531       instance_name = self.op.instance_name
6532
6533     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6534     exportlist = self.rpc.call_export_list(locked_nodes)
6535     found = False
6536     for node in exportlist:
6537       msg = exportlist[node].fail_msg
6538       if msg:
6539         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6540         continue
6541       if instance_name in exportlist[node].payload:
6542         found = True
6543         result = self.rpc.call_export_remove(node, instance_name)
6544         msg = result.fail_msg
6545         if msg:
6546           logging.error("Could not remove export for instance %s"
6547                         " on node %s: %s", instance_name, node, msg)
6548
6549     if fqdn_warn and not found:
6550       feedback_fn("Export not found. If trying to remove an export belonging"
6551                   " to a deleted instance please use its Fully Qualified"
6552                   " Domain Name.")
6553
6554
6555 class TagsLU(NoHooksLU):
6556   """Generic tags LU.
6557
6558   This is an abstract class which is the parent of all the other tags LUs.
6559
6560   """
6561
6562   def ExpandNames(self):
6563     self.needed_locks = {}
6564     if self.op.kind == constants.TAG_NODE:
6565       name = self.cfg.ExpandNodeName(self.op.name)
6566       if name is None:
6567         raise errors.OpPrereqError("Invalid node name (%s)" %
6568                                    (self.op.name,))
6569       self.op.name = name
6570       self.needed_locks[locking.LEVEL_NODE] = name
6571     elif self.op.kind == constants.TAG_INSTANCE:
6572       name = self.cfg.ExpandInstanceName(self.op.name)
6573       if name is None:
6574         raise errors.OpPrereqError("Invalid instance name (%s)" %
6575                                    (self.op.name,))
6576       self.op.name = name
6577       self.needed_locks[locking.LEVEL_INSTANCE] = name
6578
6579   def CheckPrereq(self):
6580     """Check prerequisites.
6581
6582     """
6583     if self.op.kind == constants.TAG_CLUSTER:
6584       self.target = self.cfg.GetClusterInfo()
6585     elif self.op.kind == constants.TAG_NODE:
6586       self.target = self.cfg.GetNodeInfo(self.op.name)
6587     elif self.op.kind == constants.TAG_INSTANCE:
6588       self.target = self.cfg.GetInstanceInfo(self.op.name)
6589     else:
6590       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6591                                  str(self.op.kind))
6592
6593
6594 class LUGetTags(TagsLU):
6595   """Returns the tags of a given object.
6596
6597   """
6598   _OP_REQP = ["kind", "name"]
6599   REQ_BGL = False
6600
6601   def Exec(self, feedback_fn):
6602     """Returns the tag list.
6603
6604     """
6605     return list(self.target.GetTags())
6606
6607
6608 class LUSearchTags(NoHooksLU):
6609   """Searches the tags for a given pattern.
6610
6611   """
6612   _OP_REQP = ["pattern"]
6613   REQ_BGL = False
6614
6615   def ExpandNames(self):
6616     self.needed_locks = {}
6617
6618   def CheckPrereq(self):
6619     """Check prerequisites.
6620
6621     This checks the pattern passed for validity by compiling it.
6622
6623     """
6624     try:
6625       self.re = re.compile(self.op.pattern)
6626     except re.error, err:
6627       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6628                                  (self.op.pattern, err))
6629
6630   def Exec(self, feedback_fn):
6631     """Returns the tag list.
6632
6633     """
6634     cfg = self.cfg
6635     tgts = [("/cluster", cfg.GetClusterInfo())]
6636     ilist = cfg.GetAllInstancesInfo().values()
6637     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6638     nlist = cfg.GetAllNodesInfo().values()
6639     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6640     results = []
6641     for path, target in tgts:
6642       for tag in target.GetTags():
6643         if self.re.search(tag):
6644           results.append((path, tag))
6645     return results
6646
6647
6648 class LUAddTags(TagsLU):
6649   """Sets a tag on a given object.
6650
6651   """
6652   _OP_REQP = ["kind", "name", "tags"]
6653   REQ_BGL = False
6654
6655   def CheckPrereq(self):
6656     """Check prerequisites.
6657
6658     This checks the type and length of the tag name and value.
6659
6660     """
6661     TagsLU.CheckPrereq(self)
6662     for tag in self.op.tags:
6663       objects.TaggableObject.ValidateTag(tag)
6664
6665   def Exec(self, feedback_fn):
6666     """Sets the tag.
6667
6668     """
6669     try:
6670       for tag in self.op.tags:
6671         self.target.AddTag(tag)
6672     except errors.TagError, err:
6673       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6674     try:
6675       self.cfg.Update(self.target)
6676     except errors.ConfigurationError:
6677       raise errors.OpRetryError("There has been a modification to the"
6678                                 " config file and the operation has been"
6679                                 " aborted. Please retry.")
6680
6681
6682 class LUDelTags(TagsLU):
6683   """Delete a list of tags from a given object.
6684
6685   """
6686   _OP_REQP = ["kind", "name", "tags"]
6687   REQ_BGL = False
6688
6689   def CheckPrereq(self):
6690     """Check prerequisites.
6691
6692     This checks that we have the given tag.
6693
6694     """
6695     TagsLU.CheckPrereq(self)
6696     for tag in self.op.tags:
6697       objects.TaggableObject.ValidateTag(tag)
6698     del_tags = frozenset(self.op.tags)
6699     cur_tags = self.target.GetTags()
6700     if not del_tags <= cur_tags:
6701       diff_tags = del_tags - cur_tags
6702       diff_names = ["'%s'" % tag for tag in diff_tags]
6703       diff_names.sort()
6704       raise errors.OpPrereqError("Tag(s) %s not found" %
6705                                  (",".join(diff_names)))
6706
6707   def Exec(self, feedback_fn):
6708     """Remove the tag from the object.
6709
6710     """
6711     for tag in self.op.tags:
6712       self.target.RemoveTag(tag)
6713     try:
6714       self.cfg.Update(self.target)
6715     except errors.ConfigurationError:
6716       raise errors.OpRetryError("There has been a modification to the"
6717                                 " config file and the operation has been"
6718                                 " aborted. Please retry.")
6719
6720
6721 class LUTestDelay(NoHooksLU):
6722   """Sleep for a specified amount of time.
6723
6724   This LU sleeps on the master and/or nodes for a specified amount of
6725   time.
6726
6727   """
6728   _OP_REQP = ["duration", "on_master", "on_nodes"]
6729   REQ_BGL = False
6730
6731   def ExpandNames(self):
6732     """Expand names and set required locks.
6733
6734     This expands the node list, if any.
6735
6736     """
6737     self.needed_locks = {}
6738     if self.op.on_nodes:
6739       # _GetWantedNodes can be used here, but is not always appropriate to use
6740       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6741       # more information.
6742       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6743       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6744
6745   def CheckPrereq(self):
6746     """Check prerequisites.
6747
6748     """
6749
6750   def Exec(self, feedback_fn):
6751     """Do the actual sleep.
6752
6753     """
6754     if self.op.on_master:
6755       if not utils.TestDelay(self.op.duration):
6756         raise errors.OpExecError("Error during master delay test")
6757     if self.op.on_nodes:
6758       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6759       for node, node_result in result.items():
6760         node_result.Raise("Failure during rpc call to node %s" % node)
6761
6762
6763 class IAllocator(object):
6764   """IAllocator framework.
6765
6766   An IAllocator instance has three sets of attributes:
6767     - cfg that is needed to query the cluster
6768     - input data (all members of the _KEYS class attribute are required)
6769     - four buffer attributes (in|out_data|text), that represent the
6770       input (to the external script) in text and data structure format,
6771       and the output from it, again in two formats
6772     - the result variables from the script (success, info, nodes) for
6773       easy usage
6774
6775   """
6776   _ALLO_KEYS = [
6777     "mem_size", "disks", "disk_template",
6778     "os", "tags", "nics", "vcpus", "hypervisor",
6779     ]
6780   _RELO_KEYS = [
6781     "relocate_from",
6782     ]
6783
6784   def __init__(self, lu, mode, name, **kwargs):
6785     self.lu = lu
6786     # init buffer variables
6787     self.in_text = self.out_text = self.in_data = self.out_data = None
6788     # init all input fields so that pylint is happy
6789     self.mode = mode
6790     self.name = name
6791     self.mem_size = self.disks = self.disk_template = None
6792     self.os = self.tags = self.nics = self.vcpus = None
6793     self.hypervisor = None
6794     self.relocate_from = None
6795     # computed fields
6796     self.required_nodes = None
6797     # init result fields
6798     self.success = self.info = self.nodes = None
6799     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6800       keyset = self._ALLO_KEYS
6801     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6802       keyset = self._RELO_KEYS
6803     else:
6804       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6805                                    " IAllocator" % self.mode)
6806     for key in kwargs:
6807       if key not in keyset:
6808         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6809                                      " IAllocator" % key)
6810       setattr(self, key, kwargs[key])
6811     for key in keyset:
6812       if key not in kwargs:
6813         raise errors.ProgrammerError("Missing input parameter '%s' to"
6814                                      " IAllocator" % key)
6815     self._BuildInputData()
6816
6817   def _ComputeClusterData(self):
6818     """Compute the generic allocator input data.
6819
6820     This is the data that is independent of the actual operation.
6821
6822     """
6823     cfg = self.lu.cfg
6824     cluster_info = cfg.GetClusterInfo()
6825     # cluster data
6826     data = {
6827       "version": constants.IALLOCATOR_VERSION,
6828       "cluster_name": cfg.GetClusterName(),
6829       "cluster_tags": list(cluster_info.GetTags()),
6830       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6831       # we don't have job IDs
6832       }
6833     iinfo = cfg.GetAllInstancesInfo().values()
6834     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6835
6836     # node data
6837     node_results = {}
6838     node_list = cfg.GetNodeList()
6839
6840     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6841       hypervisor_name = self.hypervisor
6842     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6843       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6844
6845     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6846                                            hypervisor_name)
6847     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6848                        cluster_info.enabled_hypervisors)
6849     for nname, nresult in node_data.items():
6850       # first fill in static (config-based) values
6851       ninfo = cfg.GetNodeInfo(nname)
6852       pnr = {
6853         "tags": list(ninfo.GetTags()),
6854         "primary_ip": ninfo.primary_ip,
6855         "secondary_ip": ninfo.secondary_ip,
6856         "offline": ninfo.offline,
6857         "drained": ninfo.drained,
6858         "master_candidate": ninfo.master_candidate,
6859         }
6860
6861       if not ninfo.offline:
6862         nresult.Raise("Can't get data for node %s" % nname)
6863         node_iinfo[nname].Raise("Can't get node instance info from node %s" %
6864                                 nname)
6865         remote_info = nresult.payload
6866         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6867                      'vg_size', 'vg_free', 'cpu_total']:
6868           if attr not in remote_info:
6869             raise errors.OpExecError("Node '%s' didn't return attribute"
6870                                      " '%s'" % (nname, attr))
6871           if not isinstance(remote_info[attr], int):
6872             raise errors.OpExecError("Node '%s' returned invalid value"
6873                                      " for '%s': %s" %
6874                                      (nname, attr, remote_info[attr]))
6875         # compute memory used by primary instances
6876         i_p_mem = i_p_up_mem = 0
6877         for iinfo, beinfo in i_list:
6878           if iinfo.primary_node == nname:
6879             i_p_mem += beinfo[constants.BE_MEMORY]
6880             if iinfo.name not in node_iinfo[nname].payload:
6881               i_used_mem = 0
6882             else:
6883               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6884             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6885             remote_info['memory_free'] -= max(0, i_mem_diff)
6886
6887             if iinfo.admin_up:
6888               i_p_up_mem += beinfo[constants.BE_MEMORY]
6889
6890         # compute memory used by instances
6891         pnr_dyn = {
6892           "total_memory": remote_info['memory_total'],
6893           "reserved_memory": remote_info['memory_dom0'],
6894           "free_memory": remote_info['memory_free'],
6895           "total_disk": remote_info['vg_size'],
6896           "free_disk": remote_info['vg_free'],
6897           "total_cpus": remote_info['cpu_total'],
6898           "i_pri_memory": i_p_mem,
6899           "i_pri_up_memory": i_p_up_mem,
6900           }
6901         pnr.update(pnr_dyn)
6902
6903       node_results[nname] = pnr
6904     data["nodes"] = node_results
6905
6906     # instance data
6907     instance_data = {}
6908     for iinfo, beinfo in i_list:
6909       nic_data = []
6910       for nic in iinfo.nics:
6911         filled_params = objects.FillDict(
6912             cluster_info.nicparams[constants.PP_DEFAULT],
6913             nic.nicparams)
6914         nic_dict = {"mac": nic.mac,
6915                     "ip": nic.ip,
6916                     "mode": filled_params[constants.NIC_MODE],
6917                     "link": filled_params[constants.NIC_LINK],
6918                    }
6919         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
6920           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
6921         nic_data.append(nic_dict)
6922       pir = {
6923         "tags": list(iinfo.GetTags()),
6924         "admin_up": iinfo.admin_up,
6925         "vcpus": beinfo[constants.BE_VCPUS],
6926         "memory": beinfo[constants.BE_MEMORY],
6927         "os": iinfo.os,
6928         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6929         "nics": nic_data,
6930         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6931         "disk_template": iinfo.disk_template,
6932         "hypervisor": iinfo.hypervisor,
6933         }
6934       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6935                                                  pir["disks"])
6936       instance_data[iinfo.name] = pir
6937
6938     data["instances"] = instance_data
6939
6940     self.in_data = data
6941
6942   def _AddNewInstance(self):
6943     """Add new instance data to allocator structure.
6944
6945     This in combination with _AllocatorGetClusterData will create the
6946     correct structure needed as input for the allocator.
6947
6948     The checks for the completeness of the opcode must have already been
6949     done.
6950
6951     """
6952     data = self.in_data
6953
6954     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6955
6956     if self.disk_template in constants.DTS_NET_MIRROR:
6957       self.required_nodes = 2
6958     else:
6959       self.required_nodes = 1
6960     request = {
6961       "type": "allocate",
6962       "name": self.name,
6963       "disk_template": self.disk_template,
6964       "tags": self.tags,
6965       "os": self.os,
6966       "vcpus": self.vcpus,
6967       "memory": self.mem_size,
6968       "disks": self.disks,
6969       "disk_space_total": disk_space,
6970       "nics": self.nics,
6971       "required_nodes": self.required_nodes,
6972       }
6973     data["request"] = request
6974
6975   def _AddRelocateInstance(self):
6976     """Add relocate instance data to allocator structure.
6977
6978     This in combination with _IAllocatorGetClusterData will create the
6979     correct structure needed as input for the allocator.
6980
6981     The checks for the completeness of the opcode must have already been
6982     done.
6983
6984     """
6985     instance = self.lu.cfg.GetInstanceInfo(self.name)
6986     if instance is None:
6987       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6988                                    " IAllocator" % self.name)
6989
6990     if instance.disk_template not in constants.DTS_NET_MIRROR:
6991       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6992
6993     if len(instance.secondary_nodes) != 1:
6994       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6995
6996     self.required_nodes = 1
6997     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6998     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6999
7000     request = {
7001       "type": "relocate",
7002       "name": self.name,
7003       "disk_space_total": disk_space,
7004       "required_nodes": self.required_nodes,
7005       "relocate_from": self.relocate_from,
7006       }
7007     self.in_data["request"] = request
7008
7009   def _BuildInputData(self):
7010     """Build input data structures.
7011
7012     """
7013     self._ComputeClusterData()
7014
7015     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7016       self._AddNewInstance()
7017     else:
7018       self._AddRelocateInstance()
7019
7020     self.in_text = serializer.Dump(self.in_data)
7021
7022   def Run(self, name, validate=True, call_fn=None):
7023     """Run an instance allocator and return the results.
7024
7025     """
7026     if call_fn is None:
7027       call_fn = self.lu.rpc.call_iallocator_runner
7028     data = self.in_text
7029
7030     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7031     result.Raise("Failure while running the iallocator script")
7032
7033     self.out_text = result.payload
7034     if validate:
7035       self._ValidateResult()
7036
7037   def _ValidateResult(self):
7038     """Process the allocator results.
7039
7040     This will process and if successful save the result in
7041     self.out_data and the other parameters.
7042
7043     """
7044     try:
7045       rdict = serializer.Load(self.out_text)
7046     except Exception, err:
7047       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7048
7049     if not isinstance(rdict, dict):
7050       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7051
7052     for key in "success", "info", "nodes":
7053       if key not in rdict:
7054         raise errors.OpExecError("Can't parse iallocator results:"
7055                                  " missing key '%s'" % key)
7056       setattr(self, key, rdict[key])
7057
7058     if not isinstance(rdict["nodes"], list):
7059       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7060                                " is not a list")
7061     self.out_data = rdict
7062
7063
7064 class LUTestAllocator(NoHooksLU):
7065   """Run allocator tests.
7066
7067   This LU runs the allocator tests
7068
7069   """
7070   _OP_REQP = ["direction", "mode", "name"]
7071
7072   def CheckPrereq(self):
7073     """Check prerequisites.
7074
7075     This checks the opcode parameters depending on the director and mode test.
7076
7077     """
7078     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7079       for attr in ["name", "mem_size", "disks", "disk_template",
7080                    "os", "tags", "nics", "vcpus"]:
7081         if not hasattr(self.op, attr):
7082           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7083                                      attr)
7084       iname = self.cfg.ExpandInstanceName(self.op.name)
7085       if iname is not None:
7086         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7087                                    iname)
7088       if not isinstance(self.op.nics, list):
7089         raise errors.OpPrereqError("Invalid parameter 'nics'")
7090       for row in self.op.nics:
7091         if (not isinstance(row, dict) or
7092             "mac" not in row or
7093             "ip" not in row or
7094             "bridge" not in row):
7095           raise errors.OpPrereqError("Invalid contents of the"
7096                                      " 'nics' parameter")
7097       if not isinstance(self.op.disks, list):
7098         raise errors.OpPrereqError("Invalid parameter 'disks'")
7099       for row in self.op.disks:
7100         if (not isinstance(row, dict) or
7101             "size" not in row or
7102             not isinstance(row["size"], int) or
7103             "mode" not in row or
7104             row["mode"] not in ['r', 'w']):
7105           raise errors.OpPrereqError("Invalid contents of the"
7106                                      " 'disks' parameter")
7107       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7108         self.op.hypervisor = self.cfg.GetHypervisorType()
7109     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7110       if not hasattr(self.op, "name"):
7111         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7112       fname = self.cfg.ExpandInstanceName(self.op.name)
7113       if fname is None:
7114         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7115                                    self.op.name)
7116       self.op.name = fname
7117       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7118     else:
7119       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7120                                  self.op.mode)
7121
7122     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7123       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7124         raise errors.OpPrereqError("Missing allocator name")
7125     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7126       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7127                                  self.op.direction)
7128
7129   def Exec(self, feedback_fn):
7130     """Run the allocator test.
7131
7132     """
7133     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7134       ial = IAllocator(self,
7135                        mode=self.op.mode,
7136                        name=self.op.name,
7137                        mem_size=self.op.mem_size,
7138                        disks=self.op.disks,
7139                        disk_template=self.op.disk_template,
7140                        os=self.op.os,
7141                        tags=self.op.tags,
7142                        nics=self.op.nics,
7143                        vcpus=self.op.vcpus,
7144                        hypervisor=self.op.hypervisor,
7145                        )
7146     else:
7147       ial = IAllocator(self,
7148                        mode=self.op.mode,
7149                        name=self.op.name,
7150                        relocate_from=list(self.relocate_from),
7151                        )
7152
7153     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7154       result = ial.in_text
7155     else:
7156       ial.Run(self.op.allocator, validate=False)
7157       result = ial.out_text
7158     return result