Cleanup config data when draining nodes
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks,
457                           bep, hvp, hypervisor):
458   """Builds instance related env variables for hooks
459
460   This builds the hook environment from individual variables.
461
462   @type name: string
463   @param name: the name of the instance
464   @type primary_node: string
465   @param primary_node: the name of the instance's primary node
466   @type secondary_nodes: list
467   @param secondary_nodes: list of secondary nodes as strings
468   @type os_type: string
469   @param os_type: the name of the instance's OS
470   @type status: boolean
471   @param status: the should_run status of the instance
472   @type memory: string
473   @param memory: the memory size of the instance
474   @type vcpus: string
475   @param vcpus: the count of VCPUs the instance has
476   @type nics: list
477   @param nics: list of tuples (ip, bridge, mac) representing
478       the NICs the instance  has
479   @type disk_template: string
480   @param disk_template: the distk template of the instance
481   @type disks: list
482   @param disks: the list of (size, mode) pairs
483   @type bep: dict
484   @param bep: the backend parameters for the instance
485   @type hvp: dict
486   @param hvp: the hypervisor parameters for the instance
487   @type hypervisor: string
488   @param hypervisor: the hypervisor for the instance
489   @rtype: dict
490   @return: the hook environment for this instance
491
492   """
493   if status:
494     str_status = "up"
495   else:
496     str_status = "down"
497   env = {
498     "OP_TARGET": name,
499     "INSTANCE_NAME": name,
500     "INSTANCE_PRIMARY": primary_node,
501     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
502     "INSTANCE_OS_TYPE": os_type,
503     "INSTANCE_STATUS": str_status,
504     "INSTANCE_MEMORY": memory,
505     "INSTANCE_VCPUS": vcpus,
506     "INSTANCE_DISK_TEMPLATE": disk_template,
507     "INSTANCE_HYPERVISOR": hypervisor,
508   }
509
510   if nics:
511     nic_count = len(nics)
512     for idx, (ip, bridge, mac) in enumerate(nics):
513       if ip is None:
514         ip = ""
515       env["INSTANCE_NIC%d_IP" % idx] = ip
516       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
517       env["INSTANCE_NIC%d_MAC" % idx] = mac
518   else:
519     nic_count = 0
520
521   env["INSTANCE_NIC_COUNT"] = nic_count
522
523   if disks:
524     disk_count = len(disks)
525     for idx, (size, mode) in enumerate(disks):
526       env["INSTANCE_DISK%d_SIZE" % idx] = size
527       env["INSTANCE_DISK%d_MODE" % idx] = mode
528   else:
529     disk_count = 0
530
531   env["INSTANCE_DISK_COUNT"] = disk_count
532
533   for source, kind in [(bep, "BE"), (hvp, "HV")]:
534     for key, value in source.items():
535       env["INSTANCE_%s_%s" % (kind, key)] = value
536
537   return env
538
539
540 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
541   """Builds instance related env variables for hooks from an object.
542
543   @type lu: L{LogicalUnit}
544   @param lu: the logical unit on whose behalf we execute
545   @type instance: L{objects.Instance}
546   @param instance: the instance for which we should build the
547       environment
548   @type override: dict
549   @param override: dictionary with key/values that will override
550       our values
551   @rtype: dict
552   @return: the hook environment dictionary
553
554   """
555   cluster = lu.cfg.GetClusterInfo()
556   bep = cluster.FillBE(instance)
557   hvp = cluster.FillHV(instance)
558   args = {
559     'name': instance.name,
560     'primary_node': instance.primary_node,
561     'secondary_nodes': instance.secondary_nodes,
562     'os_type': instance.os,
563     'status': instance.admin_up,
564     'memory': bep[constants.BE_MEMORY],
565     'vcpus': bep[constants.BE_VCPUS],
566     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
567     'disk_template': instance.disk_template,
568     'disks': [(disk.size, disk.mode) for disk in instance.disks],
569     'bep': bep,
570     'hvp': hvp,
571     'hypervisor': instance.hypervisor,
572   }
573   if override:
574     args.update(override)
575   return _BuildInstanceHookEnv(**args)
576
577
578 def _AdjustCandidatePool(lu):
579   """Adjust the candidate pool after node operations.
580
581   """
582   mod_list = lu.cfg.MaintainCandidatePool()
583   if mod_list:
584     lu.LogInfo("Promoted nodes to master candidate role: %s",
585                ", ".join(node.name for node in mod_list))
586     for name in mod_list:
587       lu.context.ReaddNode(name)
588   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
589   if mc_now > mc_max:
590     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
591                (mc_now, mc_max))
592
593
594 def _CheckInstanceBridgesExist(lu, instance):
595   """Check that the brigdes needed by an instance exist.
596
597   """
598   # check bridges existance
599   brlist = [nic.bridge for nic in instance.nics]
600   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
601   result.Raise()
602   if not result.data:
603     raise errors.OpPrereqError("One or more target bridges %s does not"
604                                " exist on destination node '%s'" %
605                                (brlist, instance.primary_node))
606
607
608 class LUDestroyCluster(NoHooksLU):
609   """Logical unit for destroying the cluster.
610
611   """
612   _OP_REQP = []
613
614   def CheckPrereq(self):
615     """Check prerequisites.
616
617     This checks whether the cluster is empty.
618
619     Any errors are signalled by raising errors.OpPrereqError.
620
621     """
622     master = self.cfg.GetMasterNode()
623
624     nodelist = self.cfg.GetNodeList()
625     if len(nodelist) != 1 or nodelist[0] != master:
626       raise errors.OpPrereqError("There are still %d node(s) in"
627                                  " this cluster." % (len(nodelist) - 1))
628     instancelist = self.cfg.GetInstanceList()
629     if instancelist:
630       raise errors.OpPrereqError("There are still %d instance(s) in"
631                                  " this cluster." % len(instancelist))
632
633   def Exec(self, feedback_fn):
634     """Destroys the cluster.
635
636     """
637     master = self.cfg.GetMasterNode()
638     result = self.rpc.call_node_stop_master(master, False)
639     result.Raise()
640     if not result.data:
641       raise errors.OpExecError("Could not disable the master role")
642     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
643     utils.CreateBackup(priv_key)
644     utils.CreateBackup(pub_key)
645     return master
646
647
648 class LUVerifyCluster(LogicalUnit):
649   """Verifies the cluster status.
650
651   """
652   HPATH = "cluster-verify"
653   HTYPE = constants.HTYPE_CLUSTER
654   _OP_REQP = ["skip_checks"]
655   REQ_BGL = False
656
657   def ExpandNames(self):
658     self.needed_locks = {
659       locking.LEVEL_NODE: locking.ALL_SET,
660       locking.LEVEL_INSTANCE: locking.ALL_SET,
661     }
662     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
663
664   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
665                   node_result, feedback_fn, master_files,
666                   drbd_map, vg_name):
667     """Run multiple tests against a node.
668
669     Test list:
670
671       - compares ganeti version
672       - checks vg existance and size > 20G
673       - checks config file checksum
674       - checks ssh to other nodes
675
676     @type nodeinfo: L{objects.Node}
677     @param nodeinfo: the node to check
678     @param file_list: required list of files
679     @param local_cksum: dictionary of local files and their checksums
680     @param node_result: the results from the node
681     @param feedback_fn: function used to accumulate results
682     @param master_files: list of files that only masters should have
683     @param drbd_map: the useddrbd minors for this node, in
684         form of minor: (instance, must_exist) which correspond to instances
685         and their running status
686     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
687
688     """
689     node = nodeinfo.name
690
691     # main result, node_result should be a non-empty dict
692     if not node_result or not isinstance(node_result, dict):
693       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
694       return True
695
696     # compares ganeti version
697     local_version = constants.PROTOCOL_VERSION
698     remote_version = node_result.get('version', None)
699     if not (remote_version and isinstance(remote_version, (list, tuple)) and
700             len(remote_version) == 2):
701       feedback_fn("  - ERROR: connection to %s failed" % (node))
702       return True
703
704     if local_version != remote_version[0]:
705       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
706                   " node %s %s" % (local_version, node, remote_version[0]))
707       return True
708
709     # node seems compatible, we can actually try to look into its results
710
711     bad = False
712
713     # full package version
714     if constants.RELEASE_VERSION != remote_version[1]:
715       feedback_fn("  - WARNING: software version mismatch: master %s,"
716                   " node %s %s" %
717                   (constants.RELEASE_VERSION, node, remote_version[1]))
718
719     # checks vg existence and size > 20G
720     if vg_name is not None:
721       vglist = node_result.get(constants.NV_VGLIST, None)
722       if not vglist:
723         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
724                         (node,))
725         bad = True
726       else:
727         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
728                                               constants.MIN_VG_SIZE)
729         if vgstatus:
730           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
731           bad = True
732
733     # checks config file checksum
734
735     remote_cksum = node_result.get(constants.NV_FILELIST, None)
736     if not isinstance(remote_cksum, dict):
737       bad = True
738       feedback_fn("  - ERROR: node hasn't returned file checksum data")
739     else:
740       for file_name in file_list:
741         node_is_mc = nodeinfo.master_candidate
742         must_have_file = file_name not in master_files
743         if file_name not in remote_cksum:
744           if node_is_mc or must_have_file:
745             bad = True
746             feedback_fn("  - ERROR: file '%s' missing" % file_name)
747         elif remote_cksum[file_name] != local_cksum[file_name]:
748           if node_is_mc or must_have_file:
749             bad = True
750             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
751           else:
752             # not candidate and this is not a must-have file
753             bad = True
754             feedback_fn("  - ERROR: file '%s' should not exist on non master"
755                         " candidates (and the file is outdated)" % file_name)
756         else:
757           # all good, except non-master/non-must have combination
758           if not node_is_mc and not must_have_file:
759             feedback_fn("  - ERROR: file '%s' should not exist on non master"
760                         " candidates" % file_name)
761
762     # checks ssh to any
763
764     if constants.NV_NODELIST not in node_result:
765       bad = True
766       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
767     else:
768       if node_result[constants.NV_NODELIST]:
769         bad = True
770         for node in node_result[constants.NV_NODELIST]:
771           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
772                           (node, node_result[constants.NV_NODELIST][node]))
773
774     if constants.NV_NODENETTEST not in node_result:
775       bad = True
776       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
777     else:
778       if node_result[constants.NV_NODENETTEST]:
779         bad = True
780         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
781         for node in nlist:
782           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
783                           (node, node_result[constants.NV_NODENETTEST][node]))
784
785     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
786     if isinstance(hyp_result, dict):
787       for hv_name, hv_result in hyp_result.iteritems():
788         if hv_result is not None:
789           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
790                       (hv_name, hv_result))
791
792     # check used drbd list
793     if vg_name is not None:
794       used_minors = node_result.get(constants.NV_DRBDLIST, [])
795       if not isinstance(used_minors, (tuple, list)):
796         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
797                     str(used_minors))
798       else:
799         for minor, (iname, must_exist) in drbd_map.items():
800           if minor not in used_minors and must_exist:
801             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
802                         " not active" % (minor, iname))
803             bad = True
804         for minor in used_minors:
805           if minor not in drbd_map:
806             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
807                         minor)
808             bad = True
809
810     return bad
811
812   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
813                       node_instance, feedback_fn, n_offline):
814     """Verify an instance.
815
816     This function checks to see if the required block devices are
817     available on the instance's node.
818
819     """
820     bad = False
821
822     node_current = instanceconfig.primary_node
823
824     node_vol_should = {}
825     instanceconfig.MapLVsByNode(node_vol_should)
826
827     for node in node_vol_should:
828       if node in n_offline:
829         # ignore missing volumes on offline nodes
830         continue
831       for volume in node_vol_should[node]:
832         if node not in node_vol_is or volume not in node_vol_is[node]:
833           feedback_fn("  - ERROR: volume %s missing on node %s" %
834                           (volume, node))
835           bad = True
836
837     if instanceconfig.admin_up:
838       if ((node_current not in node_instance or
839           not instance in node_instance[node_current]) and
840           node_current not in n_offline):
841         feedback_fn("  - ERROR: instance %s not running on node %s" %
842                         (instance, node_current))
843         bad = True
844
845     for node in node_instance:
846       if (not node == node_current):
847         if instance in node_instance[node]:
848           feedback_fn("  - ERROR: instance %s should not run on node %s" %
849                           (instance, node))
850           bad = True
851
852     return bad
853
854   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
855     """Verify if there are any unknown volumes in the cluster.
856
857     The .os, .swap and backup volumes are ignored. All other volumes are
858     reported as unknown.
859
860     """
861     bad = False
862
863     for node in node_vol_is:
864       for volume in node_vol_is[node]:
865         if node not in node_vol_should or volume not in node_vol_should[node]:
866           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
867                       (volume, node))
868           bad = True
869     return bad
870
871   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
872     """Verify the list of running instances.
873
874     This checks what instances are running but unknown to the cluster.
875
876     """
877     bad = False
878     for node in node_instance:
879       for runninginstance in node_instance[node]:
880         if runninginstance not in instancelist:
881           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
882                           (runninginstance, node))
883           bad = True
884     return bad
885
886   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
887     """Verify N+1 Memory Resilience.
888
889     Check that if one single node dies we can still start all the instances it
890     was primary for.
891
892     """
893     bad = False
894
895     for node, nodeinfo in node_info.iteritems():
896       # This code checks that every node which is now listed as secondary has
897       # enough memory to host all instances it is supposed to should a single
898       # other node in the cluster fail.
899       # FIXME: not ready for failover to an arbitrary node
900       # FIXME: does not support file-backed instances
901       # WARNING: we currently take into account down instances as well as up
902       # ones, considering that even if they're down someone might want to start
903       # them even in the event of a node failure.
904       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
905         needed_mem = 0
906         for instance in instances:
907           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
908           if bep[constants.BE_AUTO_BALANCE]:
909             needed_mem += bep[constants.BE_MEMORY]
910         if nodeinfo['mfree'] < needed_mem:
911           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
912                       " failovers should node %s fail" % (node, prinode))
913           bad = True
914     return bad
915
916   def CheckPrereq(self):
917     """Check prerequisites.
918
919     Transform the list of checks we're going to skip into a set and check that
920     all its members are valid.
921
922     """
923     self.skip_set = frozenset(self.op.skip_checks)
924     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
925       raise errors.OpPrereqError("Invalid checks to be skipped specified")
926
927   def BuildHooksEnv(self):
928     """Build hooks env.
929
930     Cluster-Verify hooks just rone in the post phase and their failure makes
931     the output be logged in the verify output and the verification to fail.
932
933     """
934     all_nodes = self.cfg.GetNodeList()
935     env = {
936       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
937       }
938     for node in self.cfg.GetAllNodesInfo().values():
939       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
940
941     return env, [], all_nodes
942
943   def Exec(self, feedback_fn):
944     """Verify integrity of cluster, performing various test on nodes.
945
946     """
947     bad = False
948     feedback_fn("* Verifying global settings")
949     for msg in self.cfg.VerifyConfig():
950       feedback_fn("  - ERROR: %s" % msg)
951
952     vg_name = self.cfg.GetVGName()
953     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
954     nodelist = utils.NiceSort(self.cfg.GetNodeList())
955     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
956     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
957     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
958                         for iname in instancelist)
959     i_non_redundant = [] # Non redundant instances
960     i_non_a_balanced = [] # Non auto-balanced instances
961     n_offline = [] # List of offline nodes
962     n_drained = [] # List of nodes being drained
963     node_volume = {}
964     node_instance = {}
965     node_info = {}
966     instance_cfg = {}
967
968     # FIXME: verify OS list
969     # do local checksums
970     master_files = [constants.CLUSTER_CONF_FILE]
971
972     file_names = ssconf.SimpleStore().GetFileList()
973     file_names.append(constants.SSL_CERT_FILE)
974     file_names.append(constants.RAPI_CERT_FILE)
975     file_names.extend(master_files)
976
977     local_checksums = utils.FingerprintFiles(file_names)
978
979     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
980     node_verify_param = {
981       constants.NV_FILELIST: file_names,
982       constants.NV_NODELIST: [node.name for node in nodeinfo
983                               if not node.offline],
984       constants.NV_HYPERVISOR: hypervisors,
985       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
986                                   node.secondary_ip) for node in nodeinfo
987                                  if not node.offline],
988       constants.NV_INSTANCELIST: hypervisors,
989       constants.NV_VERSION: None,
990       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
991       }
992     if vg_name is not None:
993       node_verify_param[constants.NV_VGLIST] = None
994       node_verify_param[constants.NV_LVLIST] = vg_name
995       node_verify_param[constants.NV_DRBDLIST] = None
996     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
997                                            self.cfg.GetClusterName())
998
999     cluster = self.cfg.GetClusterInfo()
1000     master_node = self.cfg.GetMasterNode()
1001     all_drbd_map = self.cfg.ComputeDRBDMap()
1002
1003     for node_i in nodeinfo:
1004       node = node_i.name
1005       nresult = all_nvinfo[node].data
1006
1007       if node_i.offline:
1008         feedback_fn("* Skipping offline node %s" % (node,))
1009         n_offline.append(node)
1010         continue
1011
1012       if node == master_node:
1013         ntype = "master"
1014       elif node_i.master_candidate:
1015         ntype = "master candidate"
1016       elif node_i.drained:
1017         ntype = "drained"
1018         n_drained.append(node)
1019       else:
1020         ntype = "regular"
1021       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1022
1023       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1024         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1025         bad = True
1026         continue
1027
1028       node_drbd = {}
1029       for minor, instance in all_drbd_map[node].items():
1030         if instance not in instanceinfo:
1031           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1032                       instance)
1033           # ghost instance should not be running, but otherwise we
1034           # don't give double warnings (both ghost instance and
1035           # unallocated minor in use)
1036           node_drbd[minor] = (instance, False)
1037         else:
1038           instance = instanceinfo[instance]
1039           node_drbd[minor] = (instance.name, instance.admin_up)
1040       result = self._VerifyNode(node_i, file_names, local_checksums,
1041                                 nresult, feedback_fn, master_files,
1042                                 node_drbd, vg_name)
1043       bad = bad or result
1044
1045       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1046       if vg_name is None:
1047         node_volume[node] = {}
1048       elif isinstance(lvdata, basestring):
1049         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1050                     (node, utils.SafeEncode(lvdata)))
1051         bad = True
1052         node_volume[node] = {}
1053       elif not isinstance(lvdata, dict):
1054         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1055         bad = True
1056         continue
1057       else:
1058         node_volume[node] = lvdata
1059
1060       # node_instance
1061       idata = nresult.get(constants.NV_INSTANCELIST, None)
1062       if not isinstance(idata, list):
1063         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1064                     (node,))
1065         bad = True
1066         continue
1067
1068       node_instance[node] = idata
1069
1070       # node_info
1071       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1072       if not isinstance(nodeinfo, dict):
1073         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1074         bad = True
1075         continue
1076
1077       try:
1078         node_info[node] = {
1079           "mfree": int(nodeinfo['memory_free']),
1080           "pinst": [],
1081           "sinst": [],
1082           # dictionary holding all instances this node is secondary for,
1083           # grouped by their primary node. Each key is a cluster node, and each
1084           # value is a list of instances which have the key as primary and the
1085           # current node as secondary.  this is handy to calculate N+1 memory
1086           # availability if you can only failover from a primary to its
1087           # secondary.
1088           "sinst-by-pnode": {},
1089         }
1090         # FIXME: devise a free space model for file based instances as well
1091         if vg_name is not None:
1092           if (constants.NV_VGLIST not in nresult or
1093               vg_name not in nresult[constants.NV_VGLIST]):
1094             feedback_fn("  - ERROR: node %s didn't return data for the"
1095                         " volume group '%s' - it is either missing or broken" %
1096                         (node, vg_name))
1097             bad = True
1098             continue
1099           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1100       except (ValueError, KeyError):
1101         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1102                     " from node %s" % (node,))
1103         bad = True
1104         continue
1105
1106     node_vol_should = {}
1107
1108     for instance in instancelist:
1109       feedback_fn("* Verifying instance %s" % instance)
1110       inst_config = instanceinfo[instance]
1111       result =  self._VerifyInstance(instance, inst_config, node_volume,
1112                                      node_instance, feedback_fn, n_offline)
1113       bad = bad or result
1114       inst_nodes_offline = []
1115
1116       inst_config.MapLVsByNode(node_vol_should)
1117
1118       instance_cfg[instance] = inst_config
1119
1120       pnode = inst_config.primary_node
1121       if pnode in node_info:
1122         node_info[pnode]['pinst'].append(instance)
1123       elif pnode not in n_offline:
1124         feedback_fn("  - ERROR: instance %s, connection to primary node"
1125                     " %s failed" % (instance, pnode))
1126         bad = True
1127
1128       if pnode in n_offline:
1129         inst_nodes_offline.append(pnode)
1130
1131       # If the instance is non-redundant we cannot survive losing its primary
1132       # node, so we are not N+1 compliant. On the other hand we have no disk
1133       # templates with more than one secondary so that situation is not well
1134       # supported either.
1135       # FIXME: does not support file-backed instances
1136       if len(inst_config.secondary_nodes) == 0:
1137         i_non_redundant.append(instance)
1138       elif len(inst_config.secondary_nodes) > 1:
1139         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1140                     % instance)
1141
1142       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1143         i_non_a_balanced.append(instance)
1144
1145       for snode in inst_config.secondary_nodes:
1146         if snode in node_info:
1147           node_info[snode]['sinst'].append(instance)
1148           if pnode not in node_info[snode]['sinst-by-pnode']:
1149             node_info[snode]['sinst-by-pnode'][pnode] = []
1150           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1151         elif snode not in n_offline:
1152           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1153                       " %s failed" % (instance, snode))
1154           bad = True
1155         if snode in n_offline:
1156           inst_nodes_offline.append(snode)
1157
1158       if inst_nodes_offline:
1159         # warn that the instance lives on offline nodes, and set bad=True
1160         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1161                     ", ".join(inst_nodes_offline))
1162         bad = True
1163
1164     feedback_fn("* Verifying orphan volumes")
1165     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1166                                        feedback_fn)
1167     bad = bad or result
1168
1169     feedback_fn("* Verifying remaining instances")
1170     result = self._VerifyOrphanInstances(instancelist, node_instance,
1171                                          feedback_fn)
1172     bad = bad or result
1173
1174     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1175       feedback_fn("* Verifying N+1 Memory redundancy")
1176       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1177       bad = bad or result
1178
1179     feedback_fn("* Other Notes")
1180     if i_non_redundant:
1181       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1182                   % len(i_non_redundant))
1183
1184     if i_non_a_balanced:
1185       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1186                   % len(i_non_a_balanced))
1187
1188     if n_offline:
1189       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1190
1191     if n_drained:
1192       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1193
1194     return not bad
1195
1196   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1197     """Analize the post-hooks' result
1198
1199     This method analyses the hook result, handles it, and sends some
1200     nicely-formatted feedback back to the user.
1201
1202     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1203         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1204     @param hooks_results: the results of the multi-node hooks rpc call
1205     @param feedback_fn: function used send feedback back to the caller
1206     @param lu_result: previous Exec result
1207     @return: the new Exec result, based on the previous result
1208         and hook results
1209
1210     """
1211     # We only really run POST phase hooks, and are only interested in
1212     # their results
1213     if phase == constants.HOOKS_PHASE_POST:
1214       # Used to change hooks' output to proper indentation
1215       indent_re = re.compile('^', re.M)
1216       feedback_fn("* Hooks Results")
1217       if not hooks_results:
1218         feedback_fn("  - ERROR: general communication failure")
1219         lu_result = 1
1220       else:
1221         for node_name in hooks_results:
1222           show_node_header = True
1223           res = hooks_results[node_name]
1224           if res.failed or res.data is False or not isinstance(res.data, list):
1225             if res.offline:
1226               # no need to warn or set fail return value
1227               continue
1228             feedback_fn("    Communication failure in hooks execution")
1229             lu_result = 1
1230             continue
1231           for script, hkr, output in res.data:
1232             if hkr == constants.HKR_FAIL:
1233               # The node header is only shown once, if there are
1234               # failing hooks on that node
1235               if show_node_header:
1236                 feedback_fn("  Node %s:" % node_name)
1237                 show_node_header = False
1238               feedback_fn("    ERROR: Script %s failed, output:" % script)
1239               output = indent_re.sub('      ', output)
1240               feedback_fn("%s" % output)
1241               lu_result = 1
1242
1243       return lu_result
1244
1245
1246 class LUVerifyDisks(NoHooksLU):
1247   """Verifies the cluster disks status.
1248
1249   """
1250   _OP_REQP = []
1251   REQ_BGL = False
1252
1253   def ExpandNames(self):
1254     self.needed_locks = {
1255       locking.LEVEL_NODE: locking.ALL_SET,
1256       locking.LEVEL_INSTANCE: locking.ALL_SET,
1257     }
1258     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1259
1260   def CheckPrereq(self):
1261     """Check prerequisites.
1262
1263     This has no prerequisites.
1264
1265     """
1266     pass
1267
1268   def Exec(self, feedback_fn):
1269     """Verify integrity of cluster disks.
1270
1271     """
1272     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1273
1274     vg_name = self.cfg.GetVGName()
1275     nodes = utils.NiceSort(self.cfg.GetNodeList())
1276     instances = [self.cfg.GetInstanceInfo(name)
1277                  for name in self.cfg.GetInstanceList()]
1278
1279     nv_dict = {}
1280     for inst in instances:
1281       inst_lvs = {}
1282       if (not inst.admin_up or
1283           inst.disk_template not in constants.DTS_NET_MIRROR):
1284         continue
1285       inst.MapLVsByNode(inst_lvs)
1286       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1287       for node, vol_list in inst_lvs.iteritems():
1288         for vol in vol_list:
1289           nv_dict[(node, vol)] = inst
1290
1291     if not nv_dict:
1292       return result
1293
1294     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1295
1296     to_act = set()
1297     for node in nodes:
1298       # node_volume
1299       lvs = node_lvs[node]
1300       if lvs.failed:
1301         if not lvs.offline:
1302           self.LogWarning("Connection to node %s failed: %s" %
1303                           (node, lvs.data))
1304         continue
1305       lvs = lvs.data
1306       if isinstance(lvs, basestring):
1307         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1308         res_nlvm[node] = lvs
1309         continue
1310       elif not isinstance(lvs, dict):
1311         logging.warning("Connection to node %s failed or invalid data"
1312                         " returned", node)
1313         res_nodes.append(node)
1314         continue
1315
1316       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1317         inst = nv_dict.pop((node, lv_name), None)
1318         if (not lv_online and inst is not None
1319             and inst.name not in res_instances):
1320           res_instances.append(inst.name)
1321
1322     # any leftover items in nv_dict are missing LVs, let's arrange the
1323     # data better
1324     for key, inst in nv_dict.iteritems():
1325       if inst.name not in res_missing:
1326         res_missing[inst.name] = []
1327       res_missing[inst.name].append(key)
1328
1329     return result
1330
1331
1332 class LURenameCluster(LogicalUnit):
1333   """Rename the cluster.
1334
1335   """
1336   HPATH = "cluster-rename"
1337   HTYPE = constants.HTYPE_CLUSTER
1338   _OP_REQP = ["name"]
1339
1340   def BuildHooksEnv(self):
1341     """Build hooks env.
1342
1343     """
1344     env = {
1345       "OP_TARGET": self.cfg.GetClusterName(),
1346       "NEW_NAME": self.op.name,
1347       }
1348     mn = self.cfg.GetMasterNode()
1349     return env, [mn], [mn]
1350
1351   def CheckPrereq(self):
1352     """Verify that the passed name is a valid one.
1353
1354     """
1355     hostname = utils.HostInfo(self.op.name)
1356
1357     new_name = hostname.name
1358     self.ip = new_ip = hostname.ip
1359     old_name = self.cfg.GetClusterName()
1360     old_ip = self.cfg.GetMasterIP()
1361     if new_name == old_name and new_ip == old_ip:
1362       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1363                                  " cluster has changed")
1364     if new_ip != old_ip:
1365       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1366         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1367                                    " reachable on the network. Aborting." %
1368                                    new_ip)
1369
1370     self.op.name = new_name
1371
1372   def Exec(self, feedback_fn):
1373     """Rename the cluster.
1374
1375     """
1376     clustername = self.op.name
1377     ip = self.ip
1378
1379     # shutdown the master IP
1380     master = self.cfg.GetMasterNode()
1381     result = self.rpc.call_node_stop_master(master, False)
1382     if result.failed or not result.data:
1383       raise errors.OpExecError("Could not disable the master role")
1384
1385     try:
1386       cluster = self.cfg.GetClusterInfo()
1387       cluster.cluster_name = clustername
1388       cluster.master_ip = ip
1389       self.cfg.Update(cluster)
1390
1391       # update the known hosts file
1392       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1393       node_list = self.cfg.GetNodeList()
1394       try:
1395         node_list.remove(master)
1396       except ValueError:
1397         pass
1398       result = self.rpc.call_upload_file(node_list,
1399                                          constants.SSH_KNOWN_HOSTS_FILE)
1400       for to_node, to_result in result.iteritems():
1401         if to_result.failed or not to_result.data:
1402           logging.error("Copy of file %s to node %s failed",
1403                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1404
1405     finally:
1406       result = self.rpc.call_node_start_master(master, False)
1407       if result.failed or not result.data:
1408         self.LogWarning("Could not re-enable the master role on"
1409                         " the master, please restart manually.")
1410
1411
1412 def _RecursiveCheckIfLVMBased(disk):
1413   """Check if the given disk or its children are lvm-based.
1414
1415   @type disk: L{objects.Disk}
1416   @param disk: the disk to check
1417   @rtype: booleean
1418   @return: boolean indicating whether a LD_LV dev_type was found or not
1419
1420   """
1421   if disk.children:
1422     for chdisk in disk.children:
1423       if _RecursiveCheckIfLVMBased(chdisk):
1424         return True
1425   return disk.dev_type == constants.LD_LV
1426
1427
1428 class LUSetClusterParams(LogicalUnit):
1429   """Change the parameters of the cluster.
1430
1431   """
1432   HPATH = "cluster-modify"
1433   HTYPE = constants.HTYPE_CLUSTER
1434   _OP_REQP = []
1435   REQ_BGL = False
1436
1437   def CheckArguments(self):
1438     """Check parameters
1439
1440     """
1441     if not hasattr(self.op, "candidate_pool_size"):
1442       self.op.candidate_pool_size = None
1443     if self.op.candidate_pool_size is not None:
1444       try:
1445         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1446       except (ValueError, TypeError), err:
1447         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1448                                    str(err))
1449       if self.op.candidate_pool_size < 1:
1450         raise errors.OpPrereqError("At least one master candidate needed")
1451
1452   def ExpandNames(self):
1453     # FIXME: in the future maybe other cluster params won't require checking on
1454     # all nodes to be modified.
1455     self.needed_locks = {
1456       locking.LEVEL_NODE: locking.ALL_SET,
1457     }
1458     self.share_locks[locking.LEVEL_NODE] = 1
1459
1460   def BuildHooksEnv(self):
1461     """Build hooks env.
1462
1463     """
1464     env = {
1465       "OP_TARGET": self.cfg.GetClusterName(),
1466       "NEW_VG_NAME": self.op.vg_name,
1467       }
1468     mn = self.cfg.GetMasterNode()
1469     return env, [mn], [mn]
1470
1471   def CheckPrereq(self):
1472     """Check prerequisites.
1473
1474     This checks whether the given params don't conflict and
1475     if the given volume group is valid.
1476
1477     """
1478     if self.op.vg_name is not None and not self.op.vg_name:
1479       instances = self.cfg.GetAllInstancesInfo().values()
1480       for inst in instances:
1481         for disk in inst.disks:
1482           if _RecursiveCheckIfLVMBased(disk):
1483             raise errors.OpPrereqError("Cannot disable lvm storage while"
1484                                        " lvm-based instances exist")
1485
1486     node_list = self.acquired_locks[locking.LEVEL_NODE]
1487
1488     # if vg_name not None, checks given volume group on all nodes
1489     if self.op.vg_name:
1490       vglist = self.rpc.call_vg_list(node_list)
1491       for node in node_list:
1492         if vglist[node].failed:
1493           # ignoring down node
1494           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1495           continue
1496         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1497                                               self.op.vg_name,
1498                                               constants.MIN_VG_SIZE)
1499         if vgstatus:
1500           raise errors.OpPrereqError("Error on node '%s': %s" %
1501                                      (node, vgstatus))
1502
1503     self.cluster = cluster = self.cfg.GetClusterInfo()
1504     # validate beparams changes
1505     if self.op.beparams:
1506       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1507       self.new_beparams = cluster.FillDict(
1508         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1509
1510     # hypervisor list/parameters
1511     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1512     if self.op.hvparams:
1513       if not isinstance(self.op.hvparams, dict):
1514         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1515       for hv_name, hv_dict in self.op.hvparams.items():
1516         if hv_name not in self.new_hvparams:
1517           self.new_hvparams[hv_name] = hv_dict
1518         else:
1519           self.new_hvparams[hv_name].update(hv_dict)
1520
1521     if self.op.enabled_hypervisors is not None:
1522       self.hv_list = self.op.enabled_hypervisors
1523     else:
1524       self.hv_list = cluster.enabled_hypervisors
1525
1526     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1527       # either the enabled list has changed, or the parameters have, validate
1528       for hv_name, hv_params in self.new_hvparams.items():
1529         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1530             (self.op.enabled_hypervisors and
1531              hv_name in self.op.enabled_hypervisors)):
1532           # either this is a new hypervisor, or its parameters have changed
1533           hv_class = hypervisor.GetHypervisor(hv_name)
1534           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1535           hv_class.CheckParameterSyntax(hv_params)
1536           _CheckHVParams(self, node_list, hv_name, hv_params)
1537
1538   def Exec(self, feedback_fn):
1539     """Change the parameters of the cluster.
1540
1541     """
1542     if self.op.vg_name is not None:
1543       new_volume = self.op.vg_name
1544       if not new_volume:
1545         new_volume = None
1546       if new_volume != self.cfg.GetVGName():
1547         self.cfg.SetVGName(new_volume)
1548       else:
1549         feedback_fn("Cluster LVM configuration already in desired"
1550                     " state, not changing")
1551     if self.op.hvparams:
1552       self.cluster.hvparams = self.new_hvparams
1553     if self.op.enabled_hypervisors is not None:
1554       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1555     if self.op.beparams:
1556       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1557     if self.op.candidate_pool_size is not None:
1558       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1559       # we need to update the pool size here, otherwise the save will fail
1560       _AdjustCandidatePool(self)
1561
1562     self.cfg.Update(self.cluster)
1563
1564
1565 class LURedistributeConfig(NoHooksLU):
1566   """Force the redistribution of cluster configuration.
1567
1568   This is a very simple LU.
1569
1570   """
1571   _OP_REQP = []
1572   REQ_BGL = False
1573
1574   def ExpandNames(self):
1575     self.needed_locks = {
1576       locking.LEVEL_NODE: locking.ALL_SET,
1577     }
1578     self.share_locks[locking.LEVEL_NODE] = 1
1579
1580   def CheckPrereq(self):
1581     """Check prerequisites.
1582
1583     """
1584
1585   def Exec(self, feedback_fn):
1586     """Redistribute the configuration.
1587
1588     """
1589     self.cfg.Update(self.cfg.GetClusterInfo())
1590
1591
1592 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1593   """Sleep and poll for an instance's disk to sync.
1594
1595   """
1596   if not instance.disks:
1597     return True
1598
1599   if not oneshot:
1600     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1601
1602   node = instance.primary_node
1603
1604   for dev in instance.disks:
1605     lu.cfg.SetDiskID(dev, node)
1606
1607   retries = 0
1608   degr_retries = 10 # in seconds, as we sleep 1 second each time
1609   while True:
1610     max_time = 0
1611     done = True
1612     cumul_degraded = False
1613     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1614     if rstats.failed or not rstats.data:
1615       lu.LogWarning("Can't get any data from node %s", node)
1616       retries += 1
1617       if retries >= 10:
1618         raise errors.RemoteError("Can't contact node %s for mirror data,"
1619                                  " aborting." % node)
1620       time.sleep(6)
1621       continue
1622     rstats = rstats.data
1623     retries = 0
1624     for i, mstat in enumerate(rstats):
1625       if mstat is None:
1626         lu.LogWarning("Can't compute data for node %s/%s",
1627                            node, instance.disks[i].iv_name)
1628         continue
1629       # we ignore the ldisk parameter
1630       perc_done, est_time, is_degraded, _ = mstat
1631       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1632       if perc_done is not None:
1633         done = False
1634         if est_time is not None:
1635           rem_time = "%d estimated seconds remaining" % est_time
1636           max_time = est_time
1637         else:
1638           rem_time = "no time estimate"
1639         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1640                         (instance.disks[i].iv_name, perc_done, rem_time))
1641
1642     # if we're done but degraded, let's do a few small retries, to
1643     # make sure we see a stable and not transient situation; therefore
1644     # we force restart of the loop
1645     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1646       logging.info("Degraded disks found, %d retries left", degr_retries)
1647       degr_retries -= 1
1648       time.sleep(1)
1649       continue
1650
1651     if done or oneshot:
1652       break
1653
1654     time.sleep(min(60, max_time))
1655
1656   if done:
1657     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1658   return not cumul_degraded
1659
1660
1661 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1662   """Check that mirrors are not degraded.
1663
1664   The ldisk parameter, if True, will change the test from the
1665   is_degraded attribute (which represents overall non-ok status for
1666   the device(s)) to the ldisk (representing the local storage status).
1667
1668   """
1669   lu.cfg.SetDiskID(dev, node)
1670   if ldisk:
1671     idx = 6
1672   else:
1673     idx = 5
1674
1675   result = True
1676   if on_primary or dev.AssembleOnSecondary():
1677     rstats = lu.rpc.call_blockdev_find(node, dev)
1678     msg = rstats.RemoteFailMsg()
1679     if msg:
1680       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1681       result = False
1682     elif not rstats.payload:
1683       lu.LogWarning("Can't find disk on node %s", node)
1684       result = False
1685     else:
1686       result = result and (not rstats.payload[idx])
1687   if dev.children:
1688     for child in dev.children:
1689       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1690
1691   return result
1692
1693
1694 class LUDiagnoseOS(NoHooksLU):
1695   """Logical unit for OS diagnose/query.
1696
1697   """
1698   _OP_REQP = ["output_fields", "names"]
1699   REQ_BGL = False
1700   _FIELDS_STATIC = utils.FieldSet()
1701   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1702
1703   def ExpandNames(self):
1704     if self.op.names:
1705       raise errors.OpPrereqError("Selective OS query not supported")
1706
1707     _CheckOutputFields(static=self._FIELDS_STATIC,
1708                        dynamic=self._FIELDS_DYNAMIC,
1709                        selected=self.op.output_fields)
1710
1711     # Lock all nodes, in shared mode
1712     # Temporary removal of locks, should be reverted later
1713     # TODO: reintroduce locks when they are lighter-weight
1714     self.needed_locks = {}
1715     #self.share_locks[locking.LEVEL_NODE] = 1
1716     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1717
1718   def CheckPrereq(self):
1719     """Check prerequisites.
1720
1721     """
1722
1723   @staticmethod
1724   def _DiagnoseByOS(node_list, rlist):
1725     """Remaps a per-node return list into an a per-os per-node dictionary
1726
1727     @param node_list: a list with the names of all nodes
1728     @param rlist: a map with node names as keys and OS objects as values
1729
1730     @rtype: dict
1731     @return: a dictionary with osnames as keys and as value another map, with
1732         nodes as keys and list of OS objects as values, eg::
1733
1734           {"debian-etch": {"node1": [<object>,...],
1735                            "node2": [<object>,]}
1736           }
1737
1738     """
1739     all_os = {}
1740     # we build here the list of nodes that didn't fail the RPC (at RPC
1741     # level), so that nodes with a non-responding node daemon don't
1742     # make all OSes invalid
1743     good_nodes = [node_name for node_name in rlist
1744                   if not rlist[node_name].failed]
1745     for node_name, nr in rlist.iteritems():
1746       if nr.failed or not nr.data:
1747         continue
1748       for os_obj in nr.data:
1749         if os_obj.name not in all_os:
1750           # build a list of nodes for this os containing empty lists
1751           # for each node in node_list
1752           all_os[os_obj.name] = {}
1753           for nname in good_nodes:
1754             all_os[os_obj.name][nname] = []
1755         all_os[os_obj.name][node_name].append(os_obj)
1756     return all_os
1757
1758   def Exec(self, feedback_fn):
1759     """Compute the list of OSes.
1760
1761     """
1762     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1763     node_data = self.rpc.call_os_diagnose(valid_nodes)
1764     if node_data == False:
1765       raise errors.OpExecError("Can't gather the list of OSes")
1766     pol = self._DiagnoseByOS(valid_nodes, node_data)
1767     output = []
1768     for os_name, os_data in pol.iteritems():
1769       row = []
1770       for field in self.op.output_fields:
1771         if field == "name":
1772           val = os_name
1773         elif field == "valid":
1774           val = utils.all([osl and osl[0] for osl in os_data.values()])
1775         elif field == "node_status":
1776           val = {}
1777           for node_name, nos_list in os_data.iteritems():
1778             val[node_name] = [(v.status, v.path) for v in nos_list]
1779         else:
1780           raise errors.ParameterError(field)
1781         row.append(val)
1782       output.append(row)
1783
1784     return output
1785
1786
1787 class LURemoveNode(LogicalUnit):
1788   """Logical unit for removing a node.
1789
1790   """
1791   HPATH = "node-remove"
1792   HTYPE = constants.HTYPE_NODE
1793   _OP_REQP = ["node_name"]
1794
1795   def BuildHooksEnv(self):
1796     """Build hooks env.
1797
1798     This doesn't run on the target node in the pre phase as a failed
1799     node would then be impossible to remove.
1800
1801     """
1802     env = {
1803       "OP_TARGET": self.op.node_name,
1804       "NODE_NAME": self.op.node_name,
1805       }
1806     all_nodes = self.cfg.GetNodeList()
1807     all_nodes.remove(self.op.node_name)
1808     return env, all_nodes, all_nodes
1809
1810   def CheckPrereq(self):
1811     """Check prerequisites.
1812
1813     This checks:
1814      - the node exists in the configuration
1815      - it does not have primary or secondary instances
1816      - it's not the master
1817
1818     Any errors are signalled by raising errors.OpPrereqError.
1819
1820     """
1821     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1822     if node is None:
1823       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1824
1825     instance_list = self.cfg.GetInstanceList()
1826
1827     masternode = self.cfg.GetMasterNode()
1828     if node.name == masternode:
1829       raise errors.OpPrereqError("Node is the master node,"
1830                                  " you need to failover first.")
1831
1832     for instance_name in instance_list:
1833       instance = self.cfg.GetInstanceInfo(instance_name)
1834       if node.name in instance.all_nodes:
1835         raise errors.OpPrereqError("Instance %s is still running on the node,"
1836                                    " please remove first." % instance_name)
1837     self.op.node_name = node.name
1838     self.node = node
1839
1840   def Exec(self, feedback_fn):
1841     """Removes the node from the cluster.
1842
1843     """
1844     node = self.node
1845     logging.info("Stopping the node daemon and removing configs from node %s",
1846                  node.name)
1847
1848     self.context.RemoveNode(node.name)
1849
1850     self.rpc.call_node_leave_cluster(node.name)
1851
1852     # Promote nodes to master candidate as needed
1853     _AdjustCandidatePool(self)
1854
1855
1856 class LUQueryNodes(NoHooksLU):
1857   """Logical unit for querying nodes.
1858
1859   """
1860   _OP_REQP = ["output_fields", "names", "use_locking"]
1861   REQ_BGL = False
1862   _FIELDS_DYNAMIC = utils.FieldSet(
1863     "dtotal", "dfree",
1864     "mtotal", "mnode", "mfree",
1865     "bootid",
1866     "ctotal", "cnodes", "csockets",
1867     )
1868
1869   _FIELDS_STATIC = utils.FieldSet(
1870     "name", "pinst_cnt", "sinst_cnt",
1871     "pinst_list", "sinst_list",
1872     "pip", "sip", "tags",
1873     "serial_no",
1874     "master_candidate",
1875     "master",
1876     "offline",
1877     "drained",
1878     "role",
1879     )
1880
1881   def ExpandNames(self):
1882     _CheckOutputFields(static=self._FIELDS_STATIC,
1883                        dynamic=self._FIELDS_DYNAMIC,
1884                        selected=self.op.output_fields)
1885
1886     self.needed_locks = {}
1887     self.share_locks[locking.LEVEL_NODE] = 1
1888
1889     if self.op.names:
1890       self.wanted = _GetWantedNodes(self, self.op.names)
1891     else:
1892       self.wanted = locking.ALL_SET
1893
1894     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1895     self.do_locking = self.do_node_query and self.op.use_locking
1896     if self.do_locking:
1897       # if we don't request only static fields, we need to lock the nodes
1898       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1899
1900
1901   def CheckPrereq(self):
1902     """Check prerequisites.
1903
1904     """
1905     # The validation of the node list is done in the _GetWantedNodes,
1906     # if non empty, and if empty, there's no validation to do
1907     pass
1908
1909   def Exec(self, feedback_fn):
1910     """Computes the list of nodes and their attributes.
1911
1912     """
1913     all_info = self.cfg.GetAllNodesInfo()
1914     if self.do_locking:
1915       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1916     elif self.wanted != locking.ALL_SET:
1917       nodenames = self.wanted
1918       missing = set(nodenames).difference(all_info.keys())
1919       if missing:
1920         raise errors.OpExecError(
1921           "Some nodes were removed before retrieving their data: %s" % missing)
1922     else:
1923       nodenames = all_info.keys()
1924
1925     nodenames = utils.NiceSort(nodenames)
1926     nodelist = [all_info[name] for name in nodenames]
1927
1928     # begin data gathering
1929
1930     if self.do_node_query:
1931       live_data = {}
1932       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1933                                           self.cfg.GetHypervisorType())
1934       for name in nodenames:
1935         nodeinfo = node_data[name]
1936         if not nodeinfo.failed and nodeinfo.data:
1937           nodeinfo = nodeinfo.data
1938           fn = utils.TryConvert
1939           live_data[name] = {
1940             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1941             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1942             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1943             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1944             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1945             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1946             "bootid": nodeinfo.get('bootid', None),
1947             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1948             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1949             }
1950         else:
1951           live_data[name] = {}
1952     else:
1953       live_data = dict.fromkeys(nodenames, {})
1954
1955     node_to_primary = dict([(name, set()) for name in nodenames])
1956     node_to_secondary = dict([(name, set()) for name in nodenames])
1957
1958     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1959                              "sinst_cnt", "sinst_list"))
1960     if inst_fields & frozenset(self.op.output_fields):
1961       instancelist = self.cfg.GetInstanceList()
1962
1963       for instance_name in instancelist:
1964         inst = self.cfg.GetInstanceInfo(instance_name)
1965         if inst.primary_node in node_to_primary:
1966           node_to_primary[inst.primary_node].add(inst.name)
1967         for secnode in inst.secondary_nodes:
1968           if secnode in node_to_secondary:
1969             node_to_secondary[secnode].add(inst.name)
1970
1971     master_node = self.cfg.GetMasterNode()
1972
1973     # end data gathering
1974
1975     output = []
1976     for node in nodelist:
1977       node_output = []
1978       for field in self.op.output_fields:
1979         if field == "name":
1980           val = node.name
1981         elif field == "pinst_list":
1982           val = list(node_to_primary[node.name])
1983         elif field == "sinst_list":
1984           val = list(node_to_secondary[node.name])
1985         elif field == "pinst_cnt":
1986           val = len(node_to_primary[node.name])
1987         elif field == "sinst_cnt":
1988           val = len(node_to_secondary[node.name])
1989         elif field == "pip":
1990           val = node.primary_ip
1991         elif field == "sip":
1992           val = node.secondary_ip
1993         elif field == "tags":
1994           val = list(node.GetTags())
1995         elif field == "serial_no":
1996           val = node.serial_no
1997         elif field == "master_candidate":
1998           val = node.master_candidate
1999         elif field == "master":
2000           val = node.name == master_node
2001         elif field == "offline":
2002           val = node.offline
2003         elif field == "drained":
2004           val = node.drained
2005         elif self._FIELDS_DYNAMIC.Matches(field):
2006           val = live_data[node.name].get(field, None)
2007         elif field == "role":
2008           if node.name == master_node:
2009             val = "M"
2010           elif node.master_candidate:
2011             val = "C"
2012           elif node.drained:
2013             val = "D"
2014           elif node.offline:
2015             val = "O"
2016           else:
2017             val = "R"
2018         else:
2019           raise errors.ParameterError(field)
2020         node_output.append(val)
2021       output.append(node_output)
2022
2023     return output
2024
2025
2026 class LUQueryNodeVolumes(NoHooksLU):
2027   """Logical unit for getting volumes on node(s).
2028
2029   """
2030   _OP_REQP = ["nodes", "output_fields"]
2031   REQ_BGL = False
2032   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2033   _FIELDS_STATIC = utils.FieldSet("node")
2034
2035   def ExpandNames(self):
2036     _CheckOutputFields(static=self._FIELDS_STATIC,
2037                        dynamic=self._FIELDS_DYNAMIC,
2038                        selected=self.op.output_fields)
2039
2040     self.needed_locks = {}
2041     self.share_locks[locking.LEVEL_NODE] = 1
2042     if not self.op.nodes:
2043       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2044     else:
2045       self.needed_locks[locking.LEVEL_NODE] = \
2046         _GetWantedNodes(self, self.op.nodes)
2047
2048   def CheckPrereq(self):
2049     """Check prerequisites.
2050
2051     This checks that the fields required are valid output fields.
2052
2053     """
2054     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2055
2056   def Exec(self, feedback_fn):
2057     """Computes the list of nodes and their attributes.
2058
2059     """
2060     nodenames = self.nodes
2061     volumes = self.rpc.call_node_volumes(nodenames)
2062
2063     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2064              in self.cfg.GetInstanceList()]
2065
2066     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2067
2068     output = []
2069     for node in nodenames:
2070       if node not in volumes or volumes[node].failed or not volumes[node].data:
2071         continue
2072
2073       node_vols = volumes[node].data[:]
2074       node_vols.sort(key=lambda vol: vol['dev'])
2075
2076       for vol in node_vols:
2077         node_output = []
2078         for field in self.op.output_fields:
2079           if field == "node":
2080             val = node
2081           elif field == "phys":
2082             val = vol['dev']
2083           elif field == "vg":
2084             val = vol['vg']
2085           elif field == "name":
2086             val = vol['name']
2087           elif field == "size":
2088             val = int(float(vol['size']))
2089           elif field == "instance":
2090             for inst in ilist:
2091               if node not in lv_by_node[inst]:
2092                 continue
2093               if vol['name'] in lv_by_node[inst][node]:
2094                 val = inst.name
2095                 break
2096             else:
2097               val = '-'
2098           else:
2099             raise errors.ParameterError(field)
2100           node_output.append(str(val))
2101
2102         output.append(node_output)
2103
2104     return output
2105
2106
2107 class LUAddNode(LogicalUnit):
2108   """Logical unit for adding node to the cluster.
2109
2110   """
2111   HPATH = "node-add"
2112   HTYPE = constants.HTYPE_NODE
2113   _OP_REQP = ["node_name"]
2114
2115   def BuildHooksEnv(self):
2116     """Build hooks env.
2117
2118     This will run on all nodes before, and on all nodes + the new node after.
2119
2120     """
2121     env = {
2122       "OP_TARGET": self.op.node_name,
2123       "NODE_NAME": self.op.node_name,
2124       "NODE_PIP": self.op.primary_ip,
2125       "NODE_SIP": self.op.secondary_ip,
2126       }
2127     nodes_0 = self.cfg.GetNodeList()
2128     nodes_1 = nodes_0 + [self.op.node_name, ]
2129     return env, nodes_0, nodes_1
2130
2131   def CheckPrereq(self):
2132     """Check prerequisites.
2133
2134     This checks:
2135      - the new node is not already in the config
2136      - it is resolvable
2137      - its parameters (single/dual homed) matches the cluster
2138
2139     Any errors are signalled by raising errors.OpPrereqError.
2140
2141     """
2142     node_name = self.op.node_name
2143     cfg = self.cfg
2144
2145     dns_data = utils.HostInfo(node_name)
2146
2147     node = dns_data.name
2148     primary_ip = self.op.primary_ip = dns_data.ip
2149     secondary_ip = getattr(self.op, "secondary_ip", None)
2150     if secondary_ip is None:
2151       secondary_ip = primary_ip
2152     if not utils.IsValidIP(secondary_ip):
2153       raise errors.OpPrereqError("Invalid secondary IP given")
2154     self.op.secondary_ip = secondary_ip
2155
2156     node_list = cfg.GetNodeList()
2157     if not self.op.readd and node in node_list:
2158       raise errors.OpPrereqError("Node %s is already in the configuration" %
2159                                  node)
2160     elif self.op.readd and node not in node_list:
2161       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2162
2163     for existing_node_name in node_list:
2164       existing_node = cfg.GetNodeInfo(existing_node_name)
2165
2166       if self.op.readd and node == existing_node_name:
2167         if (existing_node.primary_ip != primary_ip or
2168             existing_node.secondary_ip != secondary_ip):
2169           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2170                                      " address configuration as before")
2171         continue
2172
2173       if (existing_node.primary_ip == primary_ip or
2174           existing_node.secondary_ip == primary_ip or
2175           existing_node.primary_ip == secondary_ip or
2176           existing_node.secondary_ip == secondary_ip):
2177         raise errors.OpPrereqError("New node ip address(es) conflict with"
2178                                    " existing node %s" % existing_node.name)
2179
2180     # check that the type of the node (single versus dual homed) is the
2181     # same as for the master
2182     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2183     master_singlehomed = myself.secondary_ip == myself.primary_ip
2184     newbie_singlehomed = secondary_ip == primary_ip
2185     if master_singlehomed != newbie_singlehomed:
2186       if master_singlehomed:
2187         raise errors.OpPrereqError("The master has no private ip but the"
2188                                    " new node has one")
2189       else:
2190         raise errors.OpPrereqError("The master has a private ip but the"
2191                                    " new node doesn't have one")
2192
2193     # checks reachablity
2194     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2195       raise errors.OpPrereqError("Node not reachable by ping")
2196
2197     if not newbie_singlehomed:
2198       # check reachability from my secondary ip to newbie's secondary ip
2199       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2200                            source=myself.secondary_ip):
2201         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2202                                    " based ping to noded port")
2203
2204     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2205     if self.op.readd:
2206       exceptions = [node]
2207     else:
2208       exceptions = []
2209     mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2210     # the new node will increase mc_max with one, so:
2211     mc_max = min(mc_max + 1, cp_size)
2212     self.master_candidate = mc_now < mc_max
2213
2214     if self.op.readd:
2215       self.new_node = self.cfg.GetNodeInfo(node)
2216       assert self.new_node is not None, "Can't retrieve locked node %s" % node
2217     else:
2218       self.new_node = objects.Node(name=node,
2219                                    primary_ip=primary_ip,
2220                                    secondary_ip=secondary_ip,
2221                                    master_candidate=self.master_candidate,
2222                                    offline=False, drained=False)
2223
2224   def Exec(self, feedback_fn):
2225     """Adds the new node to the cluster.
2226
2227     """
2228     new_node = self.new_node
2229     node = new_node.name
2230
2231     # for re-adds, reset the offline/drained/master-candidate flags;
2232     # we need to reset here, otherwise offline would prevent RPC calls
2233     # later in the procedure; this also means that if the re-add
2234     # fails, we are left with a non-offlined, broken node
2235     if self.op.readd:
2236       new_node.drained = new_node.offline = False
2237       self.LogInfo("Readding a node, the offline/drained flags were reset")
2238       # if we demote the node, we do cleanup later in the procedure
2239       new_node.master_candidate = self.master_candidate
2240
2241     # notify the user about any possible mc promotion
2242     if new_node.master_candidate:
2243       self.LogInfo("Node will be a master candidate")
2244
2245     # check connectivity
2246     result = self.rpc.call_version([node])[node]
2247     result.Raise()
2248     if result.data:
2249       if constants.PROTOCOL_VERSION == result.data:
2250         logging.info("Communication to node %s fine, sw version %s match",
2251                      node, result.data)
2252       else:
2253         raise errors.OpExecError("Version mismatch master version %s,"
2254                                  " node version %s" %
2255                                  (constants.PROTOCOL_VERSION, result.data))
2256     else:
2257       raise errors.OpExecError("Cannot get version from the new node")
2258
2259     # setup ssh on node
2260     logging.info("Copy ssh key to node %s", node)
2261     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2262     keyarray = []
2263     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2264                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2265                 priv_key, pub_key]
2266
2267     for i in keyfiles:
2268       f = open(i, 'r')
2269       try:
2270         keyarray.append(f.read())
2271       finally:
2272         f.close()
2273
2274     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2275                                     keyarray[2],
2276                                     keyarray[3], keyarray[4], keyarray[5])
2277
2278     msg = result.RemoteFailMsg()
2279     if msg:
2280       raise errors.OpExecError("Cannot transfer ssh keys to the"
2281                                " new node: %s" % msg)
2282
2283     # Add node to our /etc/hosts, and add key to known_hosts
2284     utils.AddHostToEtcHosts(new_node.name)
2285
2286     if new_node.secondary_ip != new_node.primary_ip:
2287       result = self.rpc.call_node_has_ip_address(new_node.name,
2288                                                  new_node.secondary_ip)
2289       if result.failed or not result.data:
2290         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2291                                  " you gave (%s). Please fix and re-run this"
2292                                  " command." % new_node.secondary_ip)
2293
2294     node_verify_list = [self.cfg.GetMasterNode()]
2295     node_verify_param = {
2296       'nodelist': [node],
2297       # TODO: do a node-net-test as well?
2298     }
2299
2300     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2301                                        self.cfg.GetClusterName())
2302     for verifier in node_verify_list:
2303       if result[verifier].failed or not result[verifier].data:
2304         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2305                                  " for remote verification" % verifier)
2306       if result[verifier].data['nodelist']:
2307         for failed in result[verifier].data['nodelist']:
2308           feedback_fn("ssh/hostname verification failed %s -> %s" %
2309                       (verifier, result[verifier].data['nodelist'][failed]))
2310         raise errors.OpExecError("ssh/hostname verification failed.")
2311
2312     # Distribute updated /etc/hosts and known_hosts to all nodes,
2313     # including the node just added
2314     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2315     dist_nodes = self.cfg.GetNodeList()
2316     if not self.op.readd:
2317       dist_nodes.append(node)
2318     if myself.name in dist_nodes:
2319       dist_nodes.remove(myself.name)
2320
2321     logging.debug("Copying hosts and known_hosts to all nodes")
2322     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2323       result = self.rpc.call_upload_file(dist_nodes, fname)
2324       for to_node, to_result in result.iteritems():
2325         if to_result.failed or not to_result.data:
2326           logging.error("Copy of file %s to node %s failed", fname, to_node)
2327
2328     to_copy = []
2329     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2330     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2331       to_copy.append(constants.VNC_PASSWORD_FILE)
2332
2333     for fname in to_copy:
2334       result = self.rpc.call_upload_file([node], fname)
2335       if result[node].failed or not result[node]:
2336         logging.error("Could not copy file %s to node %s", fname, node)
2337
2338     if self.op.readd:
2339       self.context.ReaddNode(new_node)
2340       # make sure we redistribute the config
2341       self.cfg.Update(new_node)
2342       # and make sure the new node will not have old files around
2343       if not new_node.master_candidate:
2344         result = self.rpc.call_node_demote_from_mc(new_node.name)
2345         msg = result.RemoteFailMsg()
2346         if msg:
2347           self.LogWarning("Node failed to demote itself from master"
2348                           " candidate status: %s" % msg)
2349     else:
2350       self.context.AddNode(new_node)
2351
2352
2353 class LUSetNodeParams(LogicalUnit):
2354   """Modifies the parameters of a node.
2355
2356   """
2357   HPATH = "node-modify"
2358   HTYPE = constants.HTYPE_NODE
2359   _OP_REQP = ["node_name"]
2360   REQ_BGL = False
2361
2362   def CheckArguments(self):
2363     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2364     if node_name is None:
2365       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2366     self.op.node_name = node_name
2367     _CheckBooleanOpField(self.op, 'master_candidate')
2368     _CheckBooleanOpField(self.op, 'offline')
2369     _CheckBooleanOpField(self.op, 'drained')
2370     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2371     if all_mods.count(None) == 3:
2372       raise errors.OpPrereqError("Please pass at least one modification")
2373     if all_mods.count(True) > 1:
2374       raise errors.OpPrereqError("Can't set the node into more than one"
2375                                  " state at the same time")
2376
2377   def ExpandNames(self):
2378     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2379
2380   def BuildHooksEnv(self):
2381     """Build hooks env.
2382
2383     This runs on the master node.
2384
2385     """
2386     env = {
2387       "OP_TARGET": self.op.node_name,
2388       "MASTER_CANDIDATE": str(self.op.master_candidate),
2389       "OFFLINE": str(self.op.offline),
2390       "DRAINED": str(self.op.drained),
2391       }
2392     nl = [self.cfg.GetMasterNode(),
2393           self.op.node_name]
2394     return env, nl, nl
2395
2396   def CheckPrereq(self):
2397     """Check prerequisites.
2398
2399     This only checks the instance list against the existing names.
2400
2401     """
2402     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2403
2404     if ((self.op.master_candidate == False or self.op.offline == True or
2405          self.op.drained == True) and node.master_candidate):
2406       # we will demote the node from master_candidate
2407       if self.op.node_name == self.cfg.GetMasterNode():
2408         raise errors.OpPrereqError("The master node has to be a"
2409                                    " master candidate, online and not drained")
2410       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2411       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2412       if num_candidates <= cp_size:
2413         msg = ("Not enough master candidates (desired"
2414                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2415         if self.op.force:
2416           self.LogWarning(msg)
2417         else:
2418           raise errors.OpPrereqError(msg)
2419
2420     if (self.op.master_candidate == True and
2421         ((node.offline and not self.op.offline == False) or
2422          (node.drained and not self.op.drained == False))):
2423       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2424                                  " to master_candidate" % node.name)
2425
2426     return
2427
2428   def Exec(self, feedback_fn):
2429     """Modifies a node.
2430
2431     """
2432     node = self.node
2433
2434     result = []
2435     changed_mc = False
2436
2437     if self.op.offline is not None:
2438       node.offline = self.op.offline
2439       result.append(("offline", str(self.op.offline)))
2440       if self.op.offline == True:
2441         if node.master_candidate:
2442           node.master_candidate = False
2443           changed_mc = True
2444           result.append(("master_candidate", "auto-demotion due to offline"))
2445         if node.drained:
2446           node.drained = False
2447           result.append(("drained", "clear drained status due to offline"))
2448
2449     if self.op.master_candidate is not None:
2450       node.master_candidate = self.op.master_candidate
2451       changed_mc = True
2452       result.append(("master_candidate", str(self.op.master_candidate)))
2453       if self.op.master_candidate == False:
2454         rrc = self.rpc.call_node_demote_from_mc(node.name)
2455         msg = rrc.RemoteFailMsg()
2456         if msg:
2457           self.LogWarning("Node failed to demote itself: %s" % msg)
2458
2459     if self.op.drained is not None:
2460       node.drained = self.op.drained
2461       result.append(("drained", str(self.op.drained)))
2462       if self.op.drained == True:
2463         if node.master_candidate:
2464           node.master_candidate = False
2465           changed_mc = True
2466           result.append(("master_candidate", "auto-demotion due to drain"))
2467           rrc = self.rpc.call_node_demote_from_mc(node.name)
2468           msg = rrc.RemoteFailMsg()
2469           if msg:
2470             self.LogWarning("Node failed to demote itself: %s" % msg)
2471         if node.offline:
2472           node.offline = False
2473           result.append(("offline", "clear offline status due to drain"))
2474
2475     # this will trigger configuration file update, if needed
2476     self.cfg.Update(node)
2477     # this will trigger job queue propagation or cleanup
2478     if changed_mc:
2479       self.context.ReaddNode(node)
2480
2481     return result
2482
2483
2484 class LUQueryClusterInfo(NoHooksLU):
2485   """Query cluster configuration.
2486
2487   """
2488   _OP_REQP = []
2489   REQ_BGL = False
2490
2491   def ExpandNames(self):
2492     self.needed_locks = {}
2493
2494   def CheckPrereq(self):
2495     """No prerequsites needed for this LU.
2496
2497     """
2498     pass
2499
2500   def Exec(self, feedback_fn):
2501     """Return cluster config.
2502
2503     """
2504     cluster = self.cfg.GetClusterInfo()
2505     result = {
2506       "software_version": constants.RELEASE_VERSION,
2507       "protocol_version": constants.PROTOCOL_VERSION,
2508       "config_version": constants.CONFIG_VERSION,
2509       "os_api_version": constants.OS_API_VERSION,
2510       "export_version": constants.EXPORT_VERSION,
2511       "architecture": (platform.architecture()[0], platform.machine()),
2512       "name": cluster.cluster_name,
2513       "master": cluster.master_node,
2514       "default_hypervisor": cluster.default_hypervisor,
2515       "enabled_hypervisors": cluster.enabled_hypervisors,
2516       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2517                         for hypervisor in cluster.enabled_hypervisors]),
2518       "beparams": cluster.beparams,
2519       "candidate_pool_size": cluster.candidate_pool_size,
2520       "default_bridge": cluster.default_bridge,
2521       "master_netdev": cluster.master_netdev,
2522       "volume_group_name": cluster.volume_group_name,
2523       "file_storage_dir": cluster.file_storage_dir,
2524       }
2525
2526     return result
2527
2528
2529 class LUQueryConfigValues(NoHooksLU):
2530   """Return configuration values.
2531
2532   """
2533   _OP_REQP = []
2534   REQ_BGL = False
2535   _FIELDS_DYNAMIC = utils.FieldSet()
2536   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2537
2538   def ExpandNames(self):
2539     self.needed_locks = {}
2540
2541     _CheckOutputFields(static=self._FIELDS_STATIC,
2542                        dynamic=self._FIELDS_DYNAMIC,
2543                        selected=self.op.output_fields)
2544
2545   def CheckPrereq(self):
2546     """No prerequisites.
2547
2548     """
2549     pass
2550
2551   def Exec(self, feedback_fn):
2552     """Dump a representation of the cluster config to the standard output.
2553
2554     """
2555     values = []
2556     for field in self.op.output_fields:
2557       if field == "cluster_name":
2558         entry = self.cfg.GetClusterName()
2559       elif field == "master_node":
2560         entry = self.cfg.GetMasterNode()
2561       elif field == "drain_flag":
2562         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2563       else:
2564         raise errors.ParameterError(field)
2565       values.append(entry)
2566     return values
2567
2568
2569 class LUActivateInstanceDisks(NoHooksLU):
2570   """Bring up an instance's disks.
2571
2572   """
2573   _OP_REQP = ["instance_name"]
2574   REQ_BGL = False
2575
2576   def ExpandNames(self):
2577     self._ExpandAndLockInstance()
2578     self.needed_locks[locking.LEVEL_NODE] = []
2579     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2580
2581   def DeclareLocks(self, level):
2582     if level == locking.LEVEL_NODE:
2583       self._LockInstancesNodes()
2584
2585   def CheckPrereq(self):
2586     """Check prerequisites.
2587
2588     This checks that the instance is in the cluster.
2589
2590     """
2591     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2592     assert self.instance is not None, \
2593       "Cannot retrieve locked instance %s" % self.op.instance_name
2594     _CheckNodeOnline(self, self.instance.primary_node)
2595
2596   def Exec(self, feedback_fn):
2597     """Activate the disks.
2598
2599     """
2600     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2601     if not disks_ok:
2602       raise errors.OpExecError("Cannot activate block devices")
2603
2604     return disks_info
2605
2606
2607 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2608   """Prepare the block devices for an instance.
2609
2610   This sets up the block devices on all nodes.
2611
2612   @type lu: L{LogicalUnit}
2613   @param lu: the logical unit on whose behalf we execute
2614   @type instance: L{objects.Instance}
2615   @param instance: the instance for whose disks we assemble
2616   @type ignore_secondaries: boolean
2617   @param ignore_secondaries: if true, errors on secondary nodes
2618       won't result in an error return from the function
2619   @return: False if the operation failed, otherwise a list of
2620       (host, instance_visible_name, node_visible_name)
2621       with the mapping from node devices to instance devices
2622
2623   """
2624   device_info = []
2625   disks_ok = True
2626   iname = instance.name
2627   # With the two passes mechanism we try to reduce the window of
2628   # opportunity for the race condition of switching DRBD to primary
2629   # before handshaking occured, but we do not eliminate it
2630
2631   # The proper fix would be to wait (with some limits) until the
2632   # connection has been made and drbd transitions from WFConnection
2633   # into any other network-connected state (Connected, SyncTarget,
2634   # SyncSource, etc.)
2635
2636   # 1st pass, assemble on all nodes in secondary mode
2637   for inst_disk in instance.disks:
2638     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2639       lu.cfg.SetDiskID(node_disk, node)
2640       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2641       msg = result.RemoteFailMsg()
2642       if msg:
2643         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2644                            " (is_primary=False, pass=1): %s",
2645                            inst_disk.iv_name, node, msg)
2646         if not ignore_secondaries:
2647           disks_ok = False
2648
2649   # FIXME: race condition on drbd migration to primary
2650
2651   # 2nd pass, do only the primary node
2652   for inst_disk in instance.disks:
2653     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2654       if node != instance.primary_node:
2655         continue
2656       lu.cfg.SetDiskID(node_disk, node)
2657       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2658       msg = result.RemoteFailMsg()
2659       if msg:
2660         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2661                            " (is_primary=True, pass=2): %s",
2662                            inst_disk.iv_name, node, msg)
2663         disks_ok = False
2664     device_info.append((instance.primary_node, inst_disk.iv_name,
2665                         result.payload))
2666
2667   # leave the disks configured for the primary node
2668   # this is a workaround that would be fixed better by
2669   # improving the logical/physical id handling
2670   for disk in instance.disks:
2671     lu.cfg.SetDiskID(disk, instance.primary_node)
2672
2673   return disks_ok, device_info
2674
2675
2676 def _StartInstanceDisks(lu, instance, force):
2677   """Start the disks of an instance.
2678
2679   """
2680   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2681                                            ignore_secondaries=force)
2682   if not disks_ok:
2683     _ShutdownInstanceDisks(lu, instance)
2684     if force is not None and not force:
2685       lu.proc.LogWarning("", hint="If the message above refers to a"
2686                          " secondary node,"
2687                          " you can retry the operation using '--force'.")
2688     raise errors.OpExecError("Disk consistency error")
2689
2690
2691 class LUDeactivateInstanceDisks(NoHooksLU):
2692   """Shutdown an instance's disks.
2693
2694   """
2695   _OP_REQP = ["instance_name"]
2696   REQ_BGL = False
2697
2698   def ExpandNames(self):
2699     self._ExpandAndLockInstance()
2700     self.needed_locks[locking.LEVEL_NODE] = []
2701     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2702
2703   def DeclareLocks(self, level):
2704     if level == locking.LEVEL_NODE:
2705       self._LockInstancesNodes()
2706
2707   def CheckPrereq(self):
2708     """Check prerequisites.
2709
2710     This checks that the instance is in the cluster.
2711
2712     """
2713     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2714     assert self.instance is not None, \
2715       "Cannot retrieve locked instance %s" % self.op.instance_name
2716
2717   def Exec(self, feedback_fn):
2718     """Deactivate the disks
2719
2720     """
2721     instance = self.instance
2722     _SafeShutdownInstanceDisks(self, instance)
2723
2724
2725 def _SafeShutdownInstanceDisks(lu, instance):
2726   """Shutdown block devices of an instance.
2727
2728   This function checks if an instance is running, before calling
2729   _ShutdownInstanceDisks.
2730
2731   """
2732   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2733                                       [instance.hypervisor])
2734   ins_l = ins_l[instance.primary_node]
2735   if ins_l.failed or not isinstance(ins_l.data, list):
2736     raise errors.OpExecError("Can't contact node '%s'" %
2737                              instance.primary_node)
2738
2739   if instance.name in ins_l.data:
2740     raise errors.OpExecError("Instance is running, can't shutdown"
2741                              " block devices.")
2742
2743   _ShutdownInstanceDisks(lu, instance)
2744
2745
2746 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2747   """Shutdown block devices of an instance.
2748
2749   This does the shutdown on all nodes of the instance.
2750
2751   If the ignore_primary is false, errors on the primary node are
2752   ignored.
2753
2754   """
2755   all_result = True
2756   for disk in instance.disks:
2757     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2758       lu.cfg.SetDiskID(top_disk, node)
2759       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2760       msg = result.RemoteFailMsg()
2761       if msg:
2762         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2763                       disk.iv_name, node, msg)
2764         if not ignore_primary or node != instance.primary_node:
2765           all_result = False
2766   return all_result
2767
2768
2769 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2770   """Checks if a node has enough free memory.
2771
2772   This function check if a given node has the needed amount of free
2773   memory. In case the node has less memory or we cannot get the
2774   information from the node, this function raise an OpPrereqError
2775   exception.
2776
2777   @type lu: C{LogicalUnit}
2778   @param lu: a logical unit from which we get configuration data
2779   @type node: C{str}
2780   @param node: the node to check
2781   @type reason: C{str}
2782   @param reason: string to use in the error message
2783   @type requested: C{int}
2784   @param requested: the amount of memory in MiB to check for
2785   @type hypervisor_name: C{str}
2786   @param hypervisor_name: the hypervisor to ask for memory stats
2787   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2788       we cannot check the node
2789
2790   """
2791   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2792   nodeinfo[node].Raise()
2793   free_mem = nodeinfo[node].data.get('memory_free')
2794   if not isinstance(free_mem, int):
2795     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2796                              " was '%s'" % (node, free_mem))
2797   if requested > free_mem:
2798     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2799                              " needed %s MiB, available %s MiB" %
2800                              (node, reason, requested, free_mem))
2801
2802
2803 class LUStartupInstance(LogicalUnit):
2804   """Starts an instance.
2805
2806   """
2807   HPATH = "instance-start"
2808   HTYPE = constants.HTYPE_INSTANCE
2809   _OP_REQP = ["instance_name", "force"]
2810   REQ_BGL = False
2811
2812   def ExpandNames(self):
2813     self._ExpandAndLockInstance()
2814
2815   def BuildHooksEnv(self):
2816     """Build hooks env.
2817
2818     This runs on master, primary and secondary nodes of the instance.
2819
2820     """
2821     env = {
2822       "FORCE": self.op.force,
2823       }
2824     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2825     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2826     return env, nl, nl
2827
2828   def CheckPrereq(self):
2829     """Check prerequisites.
2830
2831     This checks that the instance is in the cluster.
2832
2833     """
2834     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2835     assert self.instance is not None, \
2836       "Cannot retrieve locked instance %s" % self.op.instance_name
2837
2838     # extra beparams
2839     self.beparams = getattr(self.op, "beparams", {})
2840     if self.beparams:
2841       if not isinstance(self.beparams, dict):
2842         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2843                                    " dict" % (type(self.beparams), ))
2844       # fill the beparams dict
2845       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2846       self.op.beparams = self.beparams
2847
2848     # extra hvparams
2849     self.hvparams = getattr(self.op, "hvparams", {})
2850     if self.hvparams:
2851       if not isinstance(self.hvparams, dict):
2852         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2853                                    " dict" % (type(self.hvparams), ))
2854
2855       # check hypervisor parameter syntax (locally)
2856       cluster = self.cfg.GetClusterInfo()
2857       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2858       filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2859                                     instance.hvparams)
2860       filled_hvp.update(self.hvparams)
2861       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2862       hv_type.CheckParameterSyntax(filled_hvp)
2863       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2864       self.op.hvparams = self.hvparams
2865
2866     _CheckNodeOnline(self, instance.primary_node)
2867
2868     bep = self.cfg.GetClusterInfo().FillBE(instance)
2869     # check bridges existance
2870     _CheckInstanceBridgesExist(self, instance)
2871
2872     remote_info = self.rpc.call_instance_info(instance.primary_node,
2873                                               instance.name,
2874                                               instance.hypervisor)
2875     remote_info.Raise()
2876     if not remote_info.data:
2877       _CheckNodeFreeMemory(self, instance.primary_node,
2878                            "starting instance %s" % instance.name,
2879                            bep[constants.BE_MEMORY], instance.hypervisor)
2880
2881   def Exec(self, feedback_fn):
2882     """Start the instance.
2883
2884     """
2885     instance = self.instance
2886     force = self.op.force
2887
2888     self.cfg.MarkInstanceUp(instance.name)
2889
2890     node_current = instance.primary_node
2891
2892     _StartInstanceDisks(self, instance, force)
2893
2894     result = self.rpc.call_instance_start(node_current, instance,
2895                                           self.hvparams, self.beparams)
2896     msg = result.RemoteFailMsg()
2897     if msg:
2898       _ShutdownInstanceDisks(self, instance)
2899       raise errors.OpExecError("Could not start instance: %s" % msg)
2900
2901
2902 class LURebootInstance(LogicalUnit):
2903   """Reboot an instance.
2904
2905   """
2906   HPATH = "instance-reboot"
2907   HTYPE = constants.HTYPE_INSTANCE
2908   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2909   REQ_BGL = False
2910
2911   def ExpandNames(self):
2912     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2913                                    constants.INSTANCE_REBOOT_HARD,
2914                                    constants.INSTANCE_REBOOT_FULL]:
2915       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2916                                   (constants.INSTANCE_REBOOT_SOFT,
2917                                    constants.INSTANCE_REBOOT_HARD,
2918                                    constants.INSTANCE_REBOOT_FULL))
2919     self._ExpandAndLockInstance()
2920
2921   def BuildHooksEnv(self):
2922     """Build hooks env.
2923
2924     This runs on master, primary and secondary nodes of the instance.
2925
2926     """
2927     env = {
2928       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2929       "REBOOT_TYPE": self.op.reboot_type,
2930       }
2931     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2932     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2933     return env, nl, nl
2934
2935   def CheckPrereq(self):
2936     """Check prerequisites.
2937
2938     This checks that the instance is in the cluster.
2939
2940     """
2941     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2942     assert self.instance is not None, \
2943       "Cannot retrieve locked instance %s" % self.op.instance_name
2944
2945     _CheckNodeOnline(self, instance.primary_node)
2946
2947     # check bridges existance
2948     _CheckInstanceBridgesExist(self, instance)
2949
2950   def Exec(self, feedback_fn):
2951     """Reboot the instance.
2952
2953     """
2954     instance = self.instance
2955     ignore_secondaries = self.op.ignore_secondaries
2956     reboot_type = self.op.reboot_type
2957
2958     node_current = instance.primary_node
2959
2960     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2961                        constants.INSTANCE_REBOOT_HARD]:
2962       for disk in instance.disks:
2963         self.cfg.SetDiskID(disk, node_current)
2964       result = self.rpc.call_instance_reboot(node_current, instance,
2965                                              reboot_type)
2966       msg = result.RemoteFailMsg()
2967       if msg:
2968         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2969     else:
2970       result = self.rpc.call_instance_shutdown(node_current, instance)
2971       msg = result.RemoteFailMsg()
2972       if msg:
2973         raise errors.OpExecError("Could not shutdown instance for"
2974                                  " full reboot: %s" % msg)
2975       _ShutdownInstanceDisks(self, instance)
2976       _StartInstanceDisks(self, instance, ignore_secondaries)
2977       result = self.rpc.call_instance_start(node_current, instance, None, None)
2978       msg = result.RemoteFailMsg()
2979       if msg:
2980         _ShutdownInstanceDisks(self, instance)
2981         raise errors.OpExecError("Could not start instance for"
2982                                  " full reboot: %s" % msg)
2983
2984     self.cfg.MarkInstanceUp(instance.name)
2985
2986
2987 class LUShutdownInstance(LogicalUnit):
2988   """Shutdown an instance.
2989
2990   """
2991   HPATH = "instance-stop"
2992   HTYPE = constants.HTYPE_INSTANCE
2993   _OP_REQP = ["instance_name"]
2994   REQ_BGL = False
2995
2996   def ExpandNames(self):
2997     self._ExpandAndLockInstance()
2998
2999   def BuildHooksEnv(self):
3000     """Build hooks env.
3001
3002     This runs on master, primary and secondary nodes of the instance.
3003
3004     """
3005     env = _BuildInstanceHookEnvByObject(self, self.instance)
3006     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3007     return env, nl, nl
3008
3009   def CheckPrereq(self):
3010     """Check prerequisites.
3011
3012     This checks that the instance is in the cluster.
3013
3014     """
3015     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3016     assert self.instance is not None, \
3017       "Cannot retrieve locked instance %s" % self.op.instance_name
3018     _CheckNodeOnline(self, self.instance.primary_node)
3019
3020   def Exec(self, feedback_fn):
3021     """Shutdown the instance.
3022
3023     """
3024     instance = self.instance
3025     node_current = instance.primary_node
3026     self.cfg.MarkInstanceDown(instance.name)
3027     result = self.rpc.call_instance_shutdown(node_current, instance)
3028     msg = result.RemoteFailMsg()
3029     if msg:
3030       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3031
3032     _ShutdownInstanceDisks(self, instance)
3033
3034
3035 class LUReinstallInstance(LogicalUnit):
3036   """Reinstall an instance.
3037
3038   """
3039   HPATH = "instance-reinstall"
3040   HTYPE = constants.HTYPE_INSTANCE
3041   _OP_REQP = ["instance_name"]
3042   REQ_BGL = False
3043
3044   def ExpandNames(self):
3045     self._ExpandAndLockInstance()
3046
3047   def BuildHooksEnv(self):
3048     """Build hooks env.
3049
3050     This runs on master, primary and secondary nodes of the instance.
3051
3052     """
3053     env = _BuildInstanceHookEnvByObject(self, self.instance)
3054     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3055     return env, nl, nl
3056
3057   def CheckPrereq(self):
3058     """Check prerequisites.
3059
3060     This checks that the instance is in the cluster and is not running.
3061
3062     """
3063     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3064     assert instance is not None, \
3065       "Cannot retrieve locked instance %s" % self.op.instance_name
3066     _CheckNodeOnline(self, instance.primary_node)
3067
3068     if instance.disk_template == constants.DT_DISKLESS:
3069       raise errors.OpPrereqError("Instance '%s' has no disks" %
3070                                  self.op.instance_name)
3071     if instance.admin_up:
3072       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3073                                  self.op.instance_name)
3074     remote_info = self.rpc.call_instance_info(instance.primary_node,
3075                                               instance.name,
3076                                               instance.hypervisor)
3077     remote_info.Raise()
3078     if remote_info.data:
3079       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3080                                  (self.op.instance_name,
3081                                   instance.primary_node))
3082
3083     self.op.os_type = getattr(self.op, "os_type", None)
3084     if self.op.os_type is not None:
3085       # OS verification
3086       pnode = self.cfg.GetNodeInfo(
3087         self.cfg.ExpandNodeName(instance.primary_node))
3088       if pnode is None:
3089         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3090                                    self.op.pnode)
3091       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3092       result.Raise()
3093       if not isinstance(result.data, objects.OS):
3094         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3095                                    " primary node"  % self.op.os_type)
3096
3097     self.instance = instance
3098
3099   def Exec(self, feedback_fn):
3100     """Reinstall the instance.
3101
3102     """
3103     inst = self.instance
3104
3105     if self.op.os_type is not None:
3106       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3107       inst.os = self.op.os_type
3108       self.cfg.Update(inst)
3109
3110     _StartInstanceDisks(self, inst, None)
3111     try:
3112       feedback_fn("Running the instance OS create scripts...")
3113       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3114       msg = result.RemoteFailMsg()
3115       if msg:
3116         raise errors.OpExecError("Could not install OS for instance %s"
3117                                  " on node %s: %s" %
3118                                  (inst.name, inst.primary_node, msg))
3119     finally:
3120       _ShutdownInstanceDisks(self, inst)
3121
3122
3123 class LURenameInstance(LogicalUnit):
3124   """Rename an instance.
3125
3126   """
3127   HPATH = "instance-rename"
3128   HTYPE = constants.HTYPE_INSTANCE
3129   _OP_REQP = ["instance_name", "new_name"]
3130
3131   def BuildHooksEnv(self):
3132     """Build hooks env.
3133
3134     This runs on master, primary and secondary nodes of the instance.
3135
3136     """
3137     env = _BuildInstanceHookEnvByObject(self, self.instance)
3138     env["INSTANCE_NEW_NAME"] = self.op.new_name
3139     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3140     return env, nl, nl
3141
3142   def CheckPrereq(self):
3143     """Check prerequisites.
3144
3145     This checks that the instance is in the cluster and is not running.
3146
3147     """
3148     instance = self.cfg.GetInstanceInfo(
3149       self.cfg.ExpandInstanceName(self.op.instance_name))
3150     if instance is None:
3151       raise errors.OpPrereqError("Instance '%s' not known" %
3152                                  self.op.instance_name)
3153     _CheckNodeOnline(self, instance.primary_node)
3154
3155     if instance.admin_up:
3156       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3157                                  self.op.instance_name)
3158     remote_info = self.rpc.call_instance_info(instance.primary_node,
3159                                               instance.name,
3160                                               instance.hypervisor)
3161     remote_info.Raise()
3162     if remote_info.data:
3163       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3164                                  (self.op.instance_name,
3165                                   instance.primary_node))
3166     self.instance = instance
3167
3168     # new name verification
3169     name_info = utils.HostInfo(self.op.new_name)
3170
3171     self.op.new_name = new_name = name_info.name
3172     instance_list = self.cfg.GetInstanceList()
3173     if new_name in instance_list:
3174       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3175                                  new_name)
3176
3177     if not getattr(self.op, "ignore_ip", False):
3178       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3179         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3180                                    (name_info.ip, new_name))
3181
3182
3183   def Exec(self, feedback_fn):
3184     """Reinstall the instance.
3185
3186     """
3187     inst = self.instance
3188     old_name = inst.name
3189
3190     if inst.disk_template == constants.DT_FILE:
3191       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3192
3193     self.cfg.RenameInstance(inst.name, self.op.new_name)
3194     # Change the instance lock. This is definitely safe while we hold the BGL
3195     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3196     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3197
3198     # re-read the instance from the configuration after rename
3199     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3200
3201     if inst.disk_template == constants.DT_FILE:
3202       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3203       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3204                                                      old_file_storage_dir,
3205                                                      new_file_storage_dir)
3206       result.Raise()
3207       if not result.data:
3208         raise errors.OpExecError("Could not connect to node '%s' to rename"
3209                                  " directory '%s' to '%s' (but the instance"
3210                                  " has been renamed in Ganeti)" % (
3211                                  inst.primary_node, old_file_storage_dir,
3212                                  new_file_storage_dir))
3213
3214       if not result.data[0]:
3215         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3216                                  " (but the instance has been renamed in"
3217                                  " Ganeti)" % (old_file_storage_dir,
3218                                                new_file_storage_dir))
3219
3220     _StartInstanceDisks(self, inst, None)
3221     try:
3222       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3223                                                  old_name)
3224       msg = result.RemoteFailMsg()
3225       if msg:
3226         msg = ("Could not run OS rename script for instance %s on node %s"
3227                " (but the instance has been renamed in Ganeti): %s" %
3228                (inst.name, inst.primary_node, msg))
3229         self.proc.LogWarning(msg)
3230     finally:
3231       _ShutdownInstanceDisks(self, inst)
3232
3233
3234 class LURemoveInstance(LogicalUnit):
3235   """Remove an instance.
3236
3237   """
3238   HPATH = "instance-remove"
3239   HTYPE = constants.HTYPE_INSTANCE
3240   _OP_REQP = ["instance_name", "ignore_failures"]
3241   REQ_BGL = False
3242
3243   def ExpandNames(self):
3244     self._ExpandAndLockInstance()
3245     self.needed_locks[locking.LEVEL_NODE] = []
3246     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3247
3248   def DeclareLocks(self, level):
3249     if level == locking.LEVEL_NODE:
3250       self._LockInstancesNodes()
3251
3252   def BuildHooksEnv(self):
3253     """Build hooks env.
3254
3255     This runs on master, primary and secondary nodes of the instance.
3256
3257     """
3258     env = _BuildInstanceHookEnvByObject(self, self.instance)
3259     nl = [self.cfg.GetMasterNode()]
3260     return env, nl, nl
3261
3262   def CheckPrereq(self):
3263     """Check prerequisites.
3264
3265     This checks that the instance is in the cluster.
3266
3267     """
3268     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3269     assert self.instance is not None, \
3270       "Cannot retrieve locked instance %s" % self.op.instance_name
3271
3272   def Exec(self, feedback_fn):
3273     """Remove the instance.
3274
3275     """
3276     instance = self.instance
3277     logging.info("Shutting down instance %s on node %s",
3278                  instance.name, instance.primary_node)
3279
3280     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3281     msg = result.RemoteFailMsg()
3282     if msg:
3283       if self.op.ignore_failures:
3284         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3285       else:
3286         raise errors.OpExecError("Could not shutdown instance %s on"
3287                                  " node %s: %s" %
3288                                  (instance.name, instance.primary_node, msg))
3289
3290     logging.info("Removing block devices for instance %s", instance.name)
3291
3292     if not _RemoveDisks(self, instance):
3293       if self.op.ignore_failures:
3294         feedback_fn("Warning: can't remove instance's disks")
3295       else:
3296         raise errors.OpExecError("Can't remove instance's disks")
3297
3298     logging.info("Removing instance %s out of cluster config", instance.name)
3299
3300     self.cfg.RemoveInstance(instance.name)
3301     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3302
3303
3304 class LUQueryInstances(NoHooksLU):
3305   """Logical unit for querying instances.
3306
3307   """
3308   _OP_REQP = ["output_fields", "names", "use_locking"]
3309   REQ_BGL = False
3310   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3311                                     "admin_state",
3312                                     "disk_template", "ip", "mac", "bridge",
3313                                     "sda_size", "sdb_size", "vcpus", "tags",
3314                                     "network_port", "beparams",
3315                                     r"(disk)\.(size)/([0-9]+)",
3316                                     r"(disk)\.(sizes)", "disk_usage",
3317                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3318                                     r"(nic)\.(macs|ips|bridges)",
3319                                     r"(disk|nic)\.(count)",
3320                                     "serial_no", "hypervisor", "hvparams",] +
3321                                   ["hv/%s" % name
3322                                    for name in constants.HVS_PARAMETERS] +
3323                                   ["be/%s" % name
3324                                    for name in constants.BES_PARAMETERS])
3325   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3326
3327
3328   def ExpandNames(self):
3329     _CheckOutputFields(static=self._FIELDS_STATIC,
3330                        dynamic=self._FIELDS_DYNAMIC,
3331                        selected=self.op.output_fields)
3332
3333     self.needed_locks = {}
3334     self.share_locks[locking.LEVEL_INSTANCE] = 1
3335     self.share_locks[locking.LEVEL_NODE] = 1
3336
3337     if self.op.names:
3338       self.wanted = _GetWantedInstances(self, self.op.names)
3339     else:
3340       self.wanted = locking.ALL_SET
3341
3342     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3343     self.do_locking = self.do_node_query and self.op.use_locking
3344     if self.do_locking:
3345       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3346       self.needed_locks[locking.LEVEL_NODE] = []
3347       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3348
3349   def DeclareLocks(self, level):
3350     if level == locking.LEVEL_NODE and self.do_locking:
3351       self._LockInstancesNodes()
3352
3353   def CheckPrereq(self):
3354     """Check prerequisites.
3355
3356     """
3357     pass
3358
3359   def Exec(self, feedback_fn):
3360     """Computes the list of nodes and their attributes.
3361
3362     """
3363     all_info = self.cfg.GetAllInstancesInfo()
3364     if self.wanted == locking.ALL_SET:
3365       # caller didn't specify instance names, so ordering is not important
3366       if self.do_locking:
3367         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3368       else:
3369         instance_names = all_info.keys()
3370       instance_names = utils.NiceSort(instance_names)
3371     else:
3372       # caller did specify names, so we must keep the ordering
3373       if self.do_locking:
3374         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3375       else:
3376         tgt_set = all_info.keys()
3377       missing = set(self.wanted).difference(tgt_set)
3378       if missing:
3379         raise errors.OpExecError("Some instances were removed before"
3380                                  " retrieving their data: %s" % missing)
3381       instance_names = self.wanted
3382
3383     instance_list = [all_info[iname] for iname in instance_names]
3384
3385     # begin data gathering
3386
3387     nodes = frozenset([inst.primary_node for inst in instance_list])
3388     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3389
3390     bad_nodes = []
3391     off_nodes = []
3392     if self.do_node_query:
3393       live_data = {}
3394       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3395       for name in nodes:
3396         result = node_data[name]
3397         if result.offline:
3398           # offline nodes will be in both lists
3399           off_nodes.append(name)
3400         if result.failed:
3401           bad_nodes.append(name)
3402         else:
3403           if result.data:
3404             live_data.update(result.data)
3405             # else no instance is alive
3406     else:
3407       live_data = dict([(name, {}) for name in instance_names])
3408
3409     # end data gathering
3410
3411     HVPREFIX = "hv/"
3412     BEPREFIX = "be/"
3413     output = []
3414     for instance in instance_list:
3415       iout = []
3416       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3417       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3418       for field in self.op.output_fields:
3419         st_match = self._FIELDS_STATIC.Matches(field)
3420         if field == "name":
3421           val = instance.name
3422         elif field == "os":
3423           val = instance.os
3424         elif field == "pnode":
3425           val = instance.primary_node
3426         elif field == "snodes":
3427           val = list(instance.secondary_nodes)
3428         elif field == "admin_state":
3429           val = instance.admin_up
3430         elif field == "oper_state":
3431           if instance.primary_node in bad_nodes:
3432             val = None
3433           else:
3434             val = bool(live_data.get(instance.name))
3435         elif field == "status":
3436           if instance.primary_node in off_nodes:
3437             val = "ERROR_nodeoffline"
3438           elif instance.primary_node in bad_nodes:
3439             val = "ERROR_nodedown"
3440           else:
3441             running = bool(live_data.get(instance.name))
3442             if running:
3443               if instance.admin_up:
3444                 val = "running"
3445               else:
3446                 val = "ERROR_up"
3447             else:
3448               if instance.admin_up:
3449                 val = "ERROR_down"
3450               else:
3451                 val = "ADMIN_down"
3452         elif field == "oper_ram":
3453           if instance.primary_node in bad_nodes:
3454             val = None
3455           elif instance.name in live_data:
3456             val = live_data[instance.name].get("memory", "?")
3457           else:
3458             val = "-"
3459         elif field == "vcpus":
3460           val = i_be[constants.BE_VCPUS]
3461         elif field == "disk_template":
3462           val = instance.disk_template
3463         elif field == "ip":
3464           if instance.nics:
3465             val = instance.nics[0].ip
3466           else:
3467             val = None
3468         elif field == "bridge":
3469           if instance.nics:
3470             val = instance.nics[0].bridge
3471           else:
3472             val = None
3473         elif field == "mac":
3474           if instance.nics:
3475             val = instance.nics[0].mac
3476           else:
3477             val = None
3478         elif field == "sda_size" or field == "sdb_size":
3479           idx = ord(field[2]) - ord('a')
3480           try:
3481             val = instance.FindDisk(idx).size
3482           except errors.OpPrereqError:
3483             val = None
3484         elif field == "disk_usage": # total disk usage per node
3485           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3486           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3487         elif field == "tags":
3488           val = list(instance.GetTags())
3489         elif field == "serial_no":
3490           val = instance.serial_no
3491         elif field == "network_port":
3492           val = instance.network_port
3493         elif field == "hypervisor":
3494           val = instance.hypervisor
3495         elif field == "hvparams":
3496           val = i_hv
3497         elif (field.startswith(HVPREFIX) and
3498               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3499           val = i_hv.get(field[len(HVPREFIX):], None)
3500         elif field == "beparams":
3501           val = i_be
3502         elif (field.startswith(BEPREFIX) and
3503               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3504           val = i_be.get(field[len(BEPREFIX):], None)
3505         elif st_match and st_match.groups():
3506           # matches a variable list
3507           st_groups = st_match.groups()
3508           if st_groups and st_groups[0] == "disk":
3509             if st_groups[1] == "count":
3510               val = len(instance.disks)
3511             elif st_groups[1] == "sizes":
3512               val = [disk.size for disk in instance.disks]
3513             elif st_groups[1] == "size":
3514               try:
3515                 val = instance.FindDisk(st_groups[2]).size
3516               except errors.OpPrereqError:
3517                 val = None
3518             else:
3519               assert False, "Unhandled disk parameter"
3520           elif st_groups[0] == "nic":
3521             if st_groups[1] == "count":
3522               val = len(instance.nics)
3523             elif st_groups[1] == "macs":
3524               val = [nic.mac for nic in instance.nics]
3525             elif st_groups[1] == "ips":
3526               val = [nic.ip for nic in instance.nics]
3527             elif st_groups[1] == "bridges":
3528               val = [nic.bridge for nic in instance.nics]
3529             else:
3530               # index-based item
3531               nic_idx = int(st_groups[2])
3532               if nic_idx >= len(instance.nics):
3533                 val = None
3534               else:
3535                 if st_groups[1] == "mac":
3536                   val = instance.nics[nic_idx].mac
3537                 elif st_groups[1] == "ip":
3538                   val = instance.nics[nic_idx].ip
3539                 elif st_groups[1] == "bridge":
3540                   val = instance.nics[nic_idx].bridge
3541                 else:
3542                   assert False, "Unhandled NIC parameter"
3543           else:
3544             assert False, ("Declared but unhandled variable parameter '%s'" %
3545                            field)
3546         else:
3547           assert False, "Declared but unhandled parameter '%s'" % field
3548         iout.append(val)
3549       output.append(iout)
3550
3551     return output
3552
3553
3554 class LUFailoverInstance(LogicalUnit):
3555   """Failover an instance.
3556
3557   """
3558   HPATH = "instance-failover"
3559   HTYPE = constants.HTYPE_INSTANCE
3560   _OP_REQP = ["instance_name", "ignore_consistency"]
3561   REQ_BGL = False
3562
3563   def ExpandNames(self):
3564     self._ExpandAndLockInstance()
3565     self.needed_locks[locking.LEVEL_NODE] = []
3566     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3567
3568   def DeclareLocks(self, level):
3569     if level == locking.LEVEL_NODE:
3570       self._LockInstancesNodes()
3571
3572   def BuildHooksEnv(self):
3573     """Build hooks env.
3574
3575     This runs on master, primary and secondary nodes of the instance.
3576
3577     """
3578     env = {
3579       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3580       }
3581     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3582     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3583     return env, nl, nl
3584
3585   def CheckPrereq(self):
3586     """Check prerequisites.
3587
3588     This checks that the instance is in the cluster.
3589
3590     """
3591     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3592     assert self.instance is not None, \
3593       "Cannot retrieve locked instance %s" % self.op.instance_name
3594
3595     bep = self.cfg.GetClusterInfo().FillBE(instance)
3596     if instance.disk_template not in constants.DTS_NET_MIRROR:
3597       raise errors.OpPrereqError("Instance's disk layout is not"
3598                                  " network mirrored, cannot failover.")
3599
3600     secondary_nodes = instance.secondary_nodes
3601     if not secondary_nodes:
3602       raise errors.ProgrammerError("no secondary node but using "
3603                                    "a mirrored disk template")
3604
3605     target_node = secondary_nodes[0]
3606     _CheckNodeOnline(self, target_node)
3607     _CheckNodeNotDrained(self, target_node)
3608
3609     if instance.admin_up:
3610       # check memory requirements on the secondary node
3611       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3612                            instance.name, bep[constants.BE_MEMORY],
3613                            instance.hypervisor)
3614     else:
3615       self.LogInfo("Not checking memory on the secondary node as"
3616                    " instance will not be started")
3617
3618     # check bridge existance
3619     brlist = [nic.bridge for nic in instance.nics]
3620     result = self.rpc.call_bridges_exist(target_node, brlist)
3621     result.Raise()
3622     if not result.data:
3623       raise errors.OpPrereqError("One or more target bridges %s does not"
3624                                  " exist on destination node '%s'" %
3625                                  (brlist, target_node))
3626
3627   def Exec(self, feedback_fn):
3628     """Failover an instance.
3629
3630     The failover is done by shutting it down on its present node and
3631     starting it on the secondary.
3632
3633     """
3634     instance = self.instance
3635
3636     source_node = instance.primary_node
3637     target_node = instance.secondary_nodes[0]
3638
3639     feedback_fn("* checking disk consistency between source and target")
3640     for dev in instance.disks:
3641       # for drbd, these are drbd over lvm
3642       if not _CheckDiskConsistency(self, dev, target_node, False):
3643         if instance.admin_up and not self.op.ignore_consistency:
3644           raise errors.OpExecError("Disk %s is degraded on target node,"
3645                                    " aborting failover." % dev.iv_name)
3646
3647     feedback_fn("* shutting down instance on source node")
3648     logging.info("Shutting down instance %s on node %s",
3649                  instance.name, source_node)
3650
3651     result = self.rpc.call_instance_shutdown(source_node, instance)
3652     msg = result.RemoteFailMsg()
3653     if msg:
3654       if self.op.ignore_consistency:
3655         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3656                              " Proceeding anyway. Please make sure node"
3657                              " %s is down. Error details: %s",
3658                              instance.name, source_node, source_node, msg)
3659       else:
3660         raise errors.OpExecError("Could not shutdown instance %s on"
3661                                  " node %s: %s" %
3662                                  (instance.name, source_node, msg))
3663
3664     feedback_fn("* deactivating the instance's disks on source node")
3665     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3666       raise errors.OpExecError("Can't shut down the instance's disks.")
3667
3668     instance.primary_node = target_node
3669     # distribute new instance config to the other nodes
3670     self.cfg.Update(instance)
3671
3672     # Only start the instance if it's marked as up
3673     if instance.admin_up:
3674       feedback_fn("* activating the instance's disks on target node")
3675       logging.info("Starting instance %s on node %s",
3676                    instance.name, target_node)
3677
3678       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3679                                                ignore_secondaries=True)
3680       if not disks_ok:
3681         _ShutdownInstanceDisks(self, instance)
3682         raise errors.OpExecError("Can't activate the instance's disks")
3683
3684       feedback_fn("* starting the instance on the target node")
3685       result = self.rpc.call_instance_start(target_node, instance, None, None)
3686       msg = result.RemoteFailMsg()
3687       if msg:
3688         _ShutdownInstanceDisks(self, instance)
3689         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3690                                  (instance.name, target_node, msg))
3691
3692
3693 class LUMigrateInstance(LogicalUnit):
3694   """Migrate an instance.
3695
3696   This is migration without shutting down, compared to the failover,
3697   which is done with shutdown.
3698
3699   """
3700   HPATH = "instance-migrate"
3701   HTYPE = constants.HTYPE_INSTANCE
3702   _OP_REQP = ["instance_name", "live", "cleanup"]
3703
3704   REQ_BGL = False
3705
3706   def ExpandNames(self):
3707     self._ExpandAndLockInstance()
3708     self.needed_locks[locking.LEVEL_NODE] = []
3709     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3710
3711   def DeclareLocks(self, level):
3712     if level == locking.LEVEL_NODE:
3713       self._LockInstancesNodes()
3714
3715   def BuildHooksEnv(self):
3716     """Build hooks env.
3717
3718     This runs on master, primary and secondary nodes of the instance.
3719
3720     """
3721     env = _BuildInstanceHookEnvByObject(self, self.instance)
3722     env["MIGRATE_LIVE"] = self.op.live
3723     env["MIGRATE_CLEANUP"] = self.op.cleanup
3724     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3725     return env, nl, nl
3726
3727   def CheckPrereq(self):
3728     """Check prerequisites.
3729
3730     This checks that the instance is in the cluster.
3731
3732     """
3733     instance = self.cfg.GetInstanceInfo(
3734       self.cfg.ExpandInstanceName(self.op.instance_name))
3735     if instance is None:
3736       raise errors.OpPrereqError("Instance '%s' not known" %
3737                                  self.op.instance_name)
3738
3739     if instance.disk_template != constants.DT_DRBD8:
3740       raise errors.OpPrereqError("Instance's disk layout is not"
3741                                  " drbd8, cannot migrate.")
3742
3743     secondary_nodes = instance.secondary_nodes
3744     if not secondary_nodes:
3745       raise errors.ConfigurationError("No secondary node but using"
3746                                       " drbd8 disk template")
3747
3748     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3749
3750     target_node = secondary_nodes[0]
3751     # check memory requirements on the secondary node
3752     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3753                          instance.name, i_be[constants.BE_MEMORY],
3754                          instance.hypervisor)
3755
3756     # check bridge existance
3757     brlist = [nic.bridge for nic in instance.nics]
3758     result = self.rpc.call_bridges_exist(target_node, brlist)
3759     if result.failed or not result.data:
3760       raise errors.OpPrereqError("One or more target bridges %s does not"
3761                                  " exist on destination node '%s'" %
3762                                  (brlist, target_node))
3763
3764     if not self.op.cleanup:
3765       _CheckNodeNotDrained(self, target_node)
3766       result = self.rpc.call_instance_migratable(instance.primary_node,
3767                                                  instance)
3768       msg = result.RemoteFailMsg()
3769       if msg:
3770         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3771                                    msg)
3772
3773     self.instance = instance
3774
3775   def _WaitUntilSync(self):
3776     """Poll with custom rpc for disk sync.
3777
3778     This uses our own step-based rpc call.
3779
3780     """
3781     self.feedback_fn("* wait until resync is done")
3782     all_done = False
3783     while not all_done:
3784       all_done = True
3785       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3786                                             self.nodes_ip,
3787                                             self.instance.disks)
3788       min_percent = 100
3789       for node, nres in result.items():
3790         msg = nres.RemoteFailMsg()
3791         if msg:
3792           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3793                                    (node, msg))
3794         node_done, node_percent = nres.payload
3795         all_done = all_done and node_done
3796         if node_percent is not None:
3797           min_percent = min(min_percent, node_percent)
3798       if not all_done:
3799         if min_percent < 100:
3800           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3801         time.sleep(2)
3802
3803   def _EnsureSecondary(self, node):
3804     """Demote a node to secondary.
3805
3806     """
3807     self.feedback_fn("* switching node %s to secondary mode" % node)
3808
3809     for dev in self.instance.disks:
3810       self.cfg.SetDiskID(dev, node)
3811
3812     result = self.rpc.call_blockdev_close(node, self.instance.name,
3813                                           self.instance.disks)
3814     msg = result.RemoteFailMsg()
3815     if msg:
3816       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3817                                " error %s" % (node, msg))
3818
3819   def _GoStandalone(self):
3820     """Disconnect from the network.
3821
3822     """
3823     self.feedback_fn("* changing into standalone mode")
3824     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3825                                                self.instance.disks)
3826     for node, nres in result.items():
3827       msg = nres.RemoteFailMsg()
3828       if msg:
3829         raise errors.OpExecError("Cannot disconnect disks node %s,"
3830                                  " error %s" % (node, msg))
3831
3832   def _GoReconnect(self, multimaster):
3833     """Reconnect to the network.
3834
3835     """
3836     if multimaster:
3837       msg = "dual-master"
3838     else:
3839       msg = "single-master"
3840     self.feedback_fn("* changing disks into %s mode" % msg)
3841     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3842                                            self.instance.disks,
3843                                            self.instance.name, multimaster)
3844     for node, nres in result.items():
3845       msg = nres.RemoteFailMsg()
3846       if msg:
3847         raise errors.OpExecError("Cannot change disks config on node %s,"
3848                                  " error: %s" % (node, msg))
3849
3850   def _ExecCleanup(self):
3851     """Try to cleanup after a failed migration.
3852
3853     The cleanup is done by:
3854       - check that the instance is running only on one node
3855         (and update the config if needed)
3856       - change disks on its secondary node to secondary
3857       - wait until disks are fully synchronized
3858       - disconnect from the network
3859       - change disks into single-master mode
3860       - wait again until disks are fully synchronized
3861
3862     """
3863     instance = self.instance
3864     target_node = self.target_node
3865     source_node = self.source_node
3866
3867     # check running on only one node
3868     self.feedback_fn("* checking where the instance actually runs"
3869                      " (if this hangs, the hypervisor might be in"
3870                      " a bad state)")
3871     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3872     for node, result in ins_l.items():
3873       result.Raise()
3874       if not isinstance(result.data, list):
3875         raise errors.OpExecError("Can't contact node '%s'" % node)
3876
3877     runningon_source = instance.name in ins_l[source_node].data
3878     runningon_target = instance.name in ins_l[target_node].data
3879
3880     if runningon_source and runningon_target:
3881       raise errors.OpExecError("Instance seems to be running on two nodes,"
3882                                " or the hypervisor is confused. You will have"
3883                                " to ensure manually that it runs only on one"
3884                                " and restart this operation.")
3885
3886     if not (runningon_source or runningon_target):
3887       raise errors.OpExecError("Instance does not seem to be running at all."
3888                                " In this case, it's safer to repair by"
3889                                " running 'gnt-instance stop' to ensure disk"
3890                                " shutdown, and then restarting it.")
3891
3892     if runningon_target:
3893       # the migration has actually succeeded, we need to update the config
3894       self.feedback_fn("* instance running on secondary node (%s),"
3895                        " updating config" % target_node)
3896       instance.primary_node = target_node
3897       self.cfg.Update(instance)
3898       demoted_node = source_node
3899     else:
3900       self.feedback_fn("* instance confirmed to be running on its"
3901                        " primary node (%s)" % source_node)
3902       demoted_node = target_node
3903
3904     self._EnsureSecondary(demoted_node)
3905     try:
3906       self._WaitUntilSync()
3907     except errors.OpExecError:
3908       # we ignore here errors, since if the device is standalone, it
3909       # won't be able to sync
3910       pass
3911     self._GoStandalone()
3912     self._GoReconnect(False)
3913     self._WaitUntilSync()
3914
3915     self.feedback_fn("* done")
3916
3917   def _RevertDiskStatus(self):
3918     """Try to revert the disk status after a failed migration.
3919
3920     """
3921     target_node = self.target_node
3922     try:
3923       self._EnsureSecondary(target_node)
3924       self._GoStandalone()
3925       self._GoReconnect(False)
3926       self._WaitUntilSync()
3927     except errors.OpExecError, err:
3928       self.LogWarning("Migration failed and I can't reconnect the"
3929                       " drives: error '%s'\n"
3930                       "Please look and recover the instance status" %
3931                       str(err))
3932
3933   def _AbortMigration(self):
3934     """Call the hypervisor code to abort a started migration.
3935
3936     """
3937     instance = self.instance
3938     target_node = self.target_node
3939     migration_info = self.migration_info
3940
3941     abort_result = self.rpc.call_finalize_migration(target_node,
3942                                                     instance,
3943                                                     migration_info,
3944                                                     False)
3945     abort_msg = abort_result.RemoteFailMsg()
3946     if abort_msg:
3947       logging.error("Aborting migration failed on target node %s: %s" %
3948                     (target_node, abort_msg))
3949       # Don't raise an exception here, as we stil have to try to revert the
3950       # disk status, even if this step failed.
3951
3952   def _ExecMigration(self):
3953     """Migrate an instance.
3954
3955     The migrate is done by:
3956       - change the disks into dual-master mode
3957       - wait until disks are fully synchronized again
3958       - migrate the instance
3959       - change disks on the new secondary node (the old primary) to secondary
3960       - wait until disks are fully synchronized
3961       - change disks into single-master mode
3962
3963     """
3964     instance = self.instance
3965     target_node = self.target_node
3966     source_node = self.source_node
3967
3968     self.feedback_fn("* checking disk consistency between source and target")
3969     for dev in instance.disks:
3970       if not _CheckDiskConsistency(self, dev, target_node, False):
3971         raise errors.OpExecError("Disk %s is degraded or not fully"
3972                                  " synchronized on target node,"
3973                                  " aborting migrate." % dev.iv_name)
3974
3975     # First get the migration information from the remote node
3976     result = self.rpc.call_migration_info(source_node, instance)
3977     msg = result.RemoteFailMsg()
3978     if msg:
3979       log_err = ("Failed fetching source migration information from %s: %s" %
3980                  (source_node, msg))
3981       logging.error(log_err)
3982       raise errors.OpExecError(log_err)
3983
3984     self.migration_info = migration_info = result.payload
3985
3986     # Then switch the disks to master/master mode
3987     self._EnsureSecondary(target_node)
3988     self._GoStandalone()
3989     self._GoReconnect(True)
3990     self._WaitUntilSync()
3991
3992     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3993     result = self.rpc.call_accept_instance(target_node,
3994                                            instance,
3995                                            migration_info,
3996                                            self.nodes_ip[target_node])
3997
3998     msg = result.RemoteFailMsg()
3999     if msg:
4000       logging.error("Instance pre-migration failed, trying to revert"
4001                     " disk status: %s", msg)
4002       self._AbortMigration()
4003       self._RevertDiskStatus()
4004       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4005                                (instance.name, msg))
4006
4007     self.feedback_fn("* migrating instance to %s" % target_node)
4008     time.sleep(10)
4009     result = self.rpc.call_instance_migrate(source_node, instance,
4010                                             self.nodes_ip[target_node],
4011                                             self.op.live)
4012     msg = result.RemoteFailMsg()
4013     if msg:
4014       logging.error("Instance migration failed, trying to revert"
4015                     " disk status: %s", msg)
4016       self._AbortMigration()
4017       self._RevertDiskStatus()
4018       raise errors.OpExecError("Could not migrate instance %s: %s" %
4019                                (instance.name, msg))
4020     time.sleep(10)
4021
4022     instance.primary_node = target_node
4023     # distribute new instance config to the other nodes
4024     self.cfg.Update(instance)
4025
4026     result = self.rpc.call_finalize_migration(target_node,
4027                                               instance,
4028                                               migration_info,
4029                                               True)
4030     msg = result.RemoteFailMsg()
4031     if msg:
4032       logging.error("Instance migration succeeded, but finalization failed:"
4033                     " %s" % msg)
4034       raise errors.OpExecError("Could not finalize instance migration: %s" %
4035                                msg)
4036
4037     self._EnsureSecondary(source_node)
4038     self._WaitUntilSync()
4039     self._GoStandalone()
4040     self._GoReconnect(False)
4041     self._WaitUntilSync()
4042
4043     self.feedback_fn("* done")
4044
4045   def Exec(self, feedback_fn):
4046     """Perform the migration.
4047
4048     """
4049     self.feedback_fn = feedback_fn
4050
4051     self.source_node = self.instance.primary_node
4052     self.target_node = self.instance.secondary_nodes[0]
4053     self.all_nodes = [self.source_node, self.target_node]
4054     self.nodes_ip = {
4055       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4056       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4057       }
4058     if self.op.cleanup:
4059       return self._ExecCleanup()
4060     else:
4061       return self._ExecMigration()
4062
4063
4064 def _CreateBlockDev(lu, node, instance, device, force_create,
4065                     info, force_open):
4066   """Create a tree of block devices on a given node.
4067
4068   If this device type has to be created on secondaries, create it and
4069   all its children.
4070
4071   If not, just recurse to children keeping the same 'force' value.
4072
4073   @param lu: the lu on whose behalf we execute
4074   @param node: the node on which to create the device
4075   @type instance: L{objects.Instance}
4076   @param instance: the instance which owns the device
4077   @type device: L{objects.Disk}
4078   @param device: the device to create
4079   @type force_create: boolean
4080   @param force_create: whether to force creation of this device; this
4081       will be change to True whenever we find a device which has
4082       CreateOnSecondary() attribute
4083   @param info: the extra 'metadata' we should attach to the device
4084       (this will be represented as a LVM tag)
4085   @type force_open: boolean
4086   @param force_open: this parameter will be passes to the
4087       L{backend.BlockdevCreate} function where it specifies
4088       whether we run on primary or not, and it affects both
4089       the child assembly and the device own Open() execution
4090
4091   """
4092   if device.CreateOnSecondary():
4093     force_create = True
4094
4095   if device.children:
4096     for child in device.children:
4097       _CreateBlockDev(lu, node, instance, child, force_create,
4098                       info, force_open)
4099
4100   if not force_create:
4101     return
4102
4103   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4104
4105
4106 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4107   """Create a single block device on a given node.
4108
4109   This will not recurse over children of the device, so they must be
4110   created in advance.
4111
4112   @param lu: the lu on whose behalf we execute
4113   @param node: the node on which to create the device
4114   @type instance: L{objects.Instance}
4115   @param instance: the instance which owns the device
4116   @type device: L{objects.Disk}
4117   @param device: the device to create
4118   @param info: the extra 'metadata' we should attach to the device
4119       (this will be represented as a LVM tag)
4120   @type force_open: boolean
4121   @param force_open: this parameter will be passes to the
4122       L{backend.BlockdevCreate} function where it specifies
4123       whether we run on primary or not, and it affects both
4124       the child assembly and the device own Open() execution
4125
4126   """
4127   lu.cfg.SetDiskID(device, node)
4128   result = lu.rpc.call_blockdev_create(node, device, device.size,
4129                                        instance.name, force_open, info)
4130   msg = result.RemoteFailMsg()
4131   if msg:
4132     raise errors.OpExecError("Can't create block device %s on"
4133                              " node %s for instance %s: %s" %
4134                              (device, node, instance.name, msg))
4135   if device.physical_id is None:
4136     device.physical_id = result.payload
4137
4138
4139 def _GenerateUniqueNames(lu, exts):
4140   """Generate a suitable LV name.
4141
4142   This will generate a logical volume name for the given instance.
4143
4144   """
4145   results = []
4146   for val in exts:
4147     new_id = lu.cfg.GenerateUniqueID()
4148     results.append("%s%s" % (new_id, val))
4149   return results
4150
4151
4152 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4153                          p_minor, s_minor):
4154   """Generate a drbd8 device complete with its children.
4155
4156   """
4157   port = lu.cfg.AllocatePort()
4158   vgname = lu.cfg.GetVGName()
4159   shared_secret = lu.cfg.GenerateDRBDSecret()
4160   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4161                           logical_id=(vgname, names[0]))
4162   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4163                           logical_id=(vgname, names[1]))
4164   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4165                           logical_id=(primary, secondary, port,
4166                                       p_minor, s_minor,
4167                                       shared_secret),
4168                           children=[dev_data, dev_meta],
4169                           iv_name=iv_name)
4170   return drbd_dev
4171
4172
4173 def _GenerateDiskTemplate(lu, template_name,
4174                           instance_name, primary_node,
4175                           secondary_nodes, disk_info,
4176                           file_storage_dir, file_driver,
4177                           base_index):
4178   """Generate the entire disk layout for a given template type.
4179
4180   """
4181   #TODO: compute space requirements
4182
4183   vgname = lu.cfg.GetVGName()
4184   disk_count = len(disk_info)
4185   disks = []
4186   if template_name == constants.DT_DISKLESS:
4187     pass
4188   elif template_name == constants.DT_PLAIN:
4189     if len(secondary_nodes) != 0:
4190       raise errors.ProgrammerError("Wrong template configuration")
4191
4192     names = _GenerateUniqueNames(lu, [".disk%d" % i
4193                                       for i in range(disk_count)])
4194     for idx, disk in enumerate(disk_info):
4195       disk_index = idx + base_index
4196       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4197                               logical_id=(vgname, names[idx]),
4198                               iv_name="disk/%d" % disk_index,
4199                               mode=disk["mode"])
4200       disks.append(disk_dev)
4201   elif template_name == constants.DT_DRBD8:
4202     if len(secondary_nodes) != 1:
4203       raise errors.ProgrammerError("Wrong template configuration")
4204     remote_node = secondary_nodes[0]
4205     minors = lu.cfg.AllocateDRBDMinor(
4206       [primary_node, remote_node] * len(disk_info), instance_name)
4207
4208     names = []
4209     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4210                                                for i in range(disk_count)]):
4211       names.append(lv_prefix + "_data")
4212       names.append(lv_prefix + "_meta")
4213     for idx, disk in enumerate(disk_info):
4214       disk_index = idx + base_index
4215       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4216                                       disk["size"], names[idx*2:idx*2+2],
4217                                       "disk/%d" % disk_index,
4218                                       minors[idx*2], minors[idx*2+1])
4219       disk_dev.mode = disk["mode"]
4220       disks.append(disk_dev)
4221   elif template_name == constants.DT_FILE:
4222     if len(secondary_nodes) != 0:
4223       raise errors.ProgrammerError("Wrong template configuration")
4224
4225     for idx, disk in enumerate(disk_info):
4226       disk_index = idx + base_index
4227       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4228                               iv_name="disk/%d" % disk_index,
4229                               logical_id=(file_driver,
4230                                           "%s/disk%d" % (file_storage_dir,
4231                                                          disk_index)),
4232                               mode=disk["mode"])
4233       disks.append(disk_dev)
4234   else:
4235     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4236   return disks
4237
4238
4239 def _GetInstanceInfoText(instance):
4240   """Compute that text that should be added to the disk's metadata.
4241
4242   """
4243   return "originstname+%s" % instance.name
4244
4245
4246 def _CreateDisks(lu, instance):
4247   """Create all disks for an instance.
4248
4249   This abstracts away some work from AddInstance.
4250
4251   @type lu: L{LogicalUnit}
4252   @param lu: the logical unit on whose behalf we execute
4253   @type instance: L{objects.Instance}
4254   @param instance: the instance whose disks we should create
4255   @rtype: boolean
4256   @return: the success of the creation
4257
4258   """
4259   info = _GetInstanceInfoText(instance)
4260   pnode = instance.primary_node
4261
4262   if instance.disk_template == constants.DT_FILE:
4263     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4264     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4265
4266     if result.failed or not result.data:
4267       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4268
4269     if not result.data[0]:
4270       raise errors.OpExecError("Failed to create directory '%s'" %
4271                                file_storage_dir)
4272
4273   # Note: this needs to be kept in sync with adding of disks in
4274   # LUSetInstanceParams
4275   for device in instance.disks:
4276     logging.info("Creating volume %s for instance %s",
4277                  device.iv_name, instance.name)
4278     #HARDCODE
4279     for node in instance.all_nodes:
4280       f_create = node == pnode
4281       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4282
4283
4284 def _RemoveDisks(lu, instance):
4285   """Remove all disks for an instance.
4286
4287   This abstracts away some work from `AddInstance()` and
4288   `RemoveInstance()`. Note that in case some of the devices couldn't
4289   be removed, the removal will continue with the other ones (compare
4290   with `_CreateDisks()`).
4291
4292   @type lu: L{LogicalUnit}
4293   @param lu: the logical unit on whose behalf we execute
4294   @type instance: L{objects.Instance}
4295   @param instance: the instance whose disks we should remove
4296   @rtype: boolean
4297   @return: the success of the removal
4298
4299   """
4300   logging.info("Removing block devices for instance %s", instance.name)
4301
4302   all_result = True
4303   for device in instance.disks:
4304     for node, disk in device.ComputeNodeTree(instance.primary_node):
4305       lu.cfg.SetDiskID(disk, node)
4306       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4307       if msg:
4308         lu.LogWarning("Could not remove block device %s on node %s,"
4309                       " continuing anyway: %s", device.iv_name, node, msg)
4310         all_result = False
4311
4312   if instance.disk_template == constants.DT_FILE:
4313     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4314     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4315                                                  file_storage_dir)
4316     if result.failed or not result.data:
4317       logging.error("Could not remove directory '%s'", file_storage_dir)
4318       all_result = False
4319
4320   return all_result
4321
4322
4323 def _ComputeDiskSize(disk_template, disks):
4324   """Compute disk size requirements in the volume group
4325
4326   """
4327   # Required free disk space as a function of disk and swap space
4328   req_size_dict = {
4329     constants.DT_DISKLESS: None,
4330     constants.DT_PLAIN: sum(d["size"] for d in disks),
4331     # 128 MB are added for drbd metadata for each disk
4332     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4333     constants.DT_FILE: None,
4334   }
4335
4336   if disk_template not in req_size_dict:
4337     raise errors.ProgrammerError("Disk template '%s' size requirement"
4338                                  " is unknown" %  disk_template)
4339
4340   return req_size_dict[disk_template]
4341
4342
4343 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4344   """Hypervisor parameter validation.
4345
4346   This function abstract the hypervisor parameter validation to be
4347   used in both instance create and instance modify.
4348
4349   @type lu: L{LogicalUnit}
4350   @param lu: the logical unit for which we check
4351   @type nodenames: list
4352   @param nodenames: the list of nodes on which we should check
4353   @type hvname: string
4354   @param hvname: the name of the hypervisor we should use
4355   @type hvparams: dict
4356   @param hvparams: the parameters which we need to check
4357   @raise errors.OpPrereqError: if the parameters are not valid
4358
4359   """
4360   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4361                                                   hvname,
4362                                                   hvparams)
4363   for node in nodenames:
4364     info = hvinfo[node]
4365     if info.offline:
4366       continue
4367     msg = info.RemoteFailMsg()
4368     if msg:
4369       raise errors.OpPrereqError("Hypervisor parameter validation"
4370                                  " failed on node %s: %s" % (node, msg))
4371
4372
4373 class LUCreateInstance(LogicalUnit):
4374   """Create an instance.
4375
4376   """
4377   HPATH = "instance-add"
4378   HTYPE = constants.HTYPE_INSTANCE
4379   _OP_REQP = ["instance_name", "disks", "disk_template",
4380               "mode", "start",
4381               "wait_for_sync", "ip_check", "nics",
4382               "hvparams", "beparams"]
4383   REQ_BGL = False
4384
4385   def _ExpandNode(self, node):
4386     """Expands and checks one node name.
4387
4388     """
4389     node_full = self.cfg.ExpandNodeName(node)
4390     if node_full is None:
4391       raise errors.OpPrereqError("Unknown node %s" % node)
4392     return node_full
4393
4394   def ExpandNames(self):
4395     """ExpandNames for CreateInstance.
4396
4397     Figure out the right locks for instance creation.
4398
4399     """
4400     self.needed_locks = {}
4401
4402     # set optional parameters to none if they don't exist
4403     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4404       if not hasattr(self.op, attr):
4405         setattr(self.op, attr, None)
4406
4407     # cheap checks, mostly valid constants given
4408
4409     # verify creation mode
4410     if self.op.mode not in (constants.INSTANCE_CREATE,
4411                             constants.INSTANCE_IMPORT):
4412       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4413                                  self.op.mode)
4414
4415     # disk template and mirror node verification
4416     if self.op.disk_template not in constants.DISK_TEMPLATES:
4417       raise errors.OpPrereqError("Invalid disk template name")
4418
4419     if self.op.hypervisor is None:
4420       self.op.hypervisor = self.cfg.GetHypervisorType()
4421
4422     cluster = self.cfg.GetClusterInfo()
4423     enabled_hvs = cluster.enabled_hypervisors
4424     if self.op.hypervisor not in enabled_hvs:
4425       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4426                                  " cluster (%s)" % (self.op.hypervisor,
4427                                   ",".join(enabled_hvs)))
4428
4429     # check hypervisor parameter syntax (locally)
4430     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4431     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4432                                   self.op.hvparams)
4433     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4434     hv_type.CheckParameterSyntax(filled_hvp)
4435     self.hv_full = filled_hvp
4436
4437     # fill and remember the beparams dict
4438     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4439     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4440                                     self.op.beparams)
4441
4442     #### instance parameters check
4443
4444     # instance name verification
4445     hostname1 = utils.HostInfo(self.op.instance_name)
4446     self.op.instance_name = instance_name = hostname1.name
4447
4448     # this is just a preventive check, but someone might still add this
4449     # instance in the meantime, and creation will fail at lock-add time
4450     if instance_name in self.cfg.GetInstanceList():
4451       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4452                                  instance_name)
4453
4454     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4455
4456     # NIC buildup
4457     self.nics = []
4458     for nic in self.op.nics:
4459       # ip validity checks
4460       ip = nic.get("ip", None)
4461       if ip is None or ip.lower() == "none":
4462         nic_ip = None
4463       elif ip.lower() == constants.VALUE_AUTO:
4464         nic_ip = hostname1.ip
4465       else:
4466         if not utils.IsValidIP(ip):
4467           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4468                                      " like a valid IP" % ip)
4469         nic_ip = ip
4470
4471       # MAC address verification
4472       mac = nic.get("mac", constants.VALUE_AUTO)
4473       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4474         if not utils.IsValidMac(mac.lower()):
4475           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4476                                      mac)
4477       # bridge verification
4478       bridge = nic.get("bridge", None)
4479       if bridge is None:
4480         bridge = self.cfg.GetDefBridge()
4481       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4482
4483     # disk checks/pre-build
4484     self.disks = []
4485     for disk in self.op.disks:
4486       mode = disk.get("mode", constants.DISK_RDWR)
4487       if mode not in constants.DISK_ACCESS_SET:
4488         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4489                                    mode)
4490       size = disk.get("size", None)
4491       if size is None:
4492         raise errors.OpPrereqError("Missing disk size")
4493       try:
4494         size = int(size)
4495       except ValueError:
4496         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4497       self.disks.append({"size": size, "mode": mode})
4498
4499     # used in CheckPrereq for ip ping check
4500     self.check_ip = hostname1.ip
4501
4502     # file storage checks
4503     if (self.op.file_driver and
4504         not self.op.file_driver in constants.FILE_DRIVER):
4505       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4506                                  self.op.file_driver)
4507
4508     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4509       raise errors.OpPrereqError("File storage directory path not absolute")
4510
4511     ### Node/iallocator related checks
4512     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4513       raise errors.OpPrereqError("One and only one of iallocator and primary"
4514                                  " node must be given")
4515
4516     if self.op.iallocator:
4517       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4518     else:
4519       self.op.pnode = self._ExpandNode(self.op.pnode)
4520       nodelist = [self.op.pnode]
4521       if self.op.snode is not None:
4522         self.op.snode = self._ExpandNode(self.op.snode)
4523         nodelist.append(self.op.snode)
4524       self.needed_locks[locking.LEVEL_NODE] = nodelist
4525
4526     # in case of import lock the source node too
4527     if self.op.mode == constants.INSTANCE_IMPORT:
4528       src_node = getattr(self.op, "src_node", None)
4529       src_path = getattr(self.op, "src_path", None)
4530
4531       if src_path is None:
4532         self.op.src_path = src_path = self.op.instance_name
4533
4534       if src_node is None:
4535         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4536         self.op.src_node = None
4537         if os.path.isabs(src_path):
4538           raise errors.OpPrereqError("Importing an instance from an absolute"
4539                                      " path requires a source node option.")
4540       else:
4541         self.op.src_node = src_node = self._ExpandNode(src_node)
4542         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4543           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4544         if not os.path.isabs(src_path):
4545           self.op.src_path = src_path = \
4546             os.path.join(constants.EXPORT_DIR, src_path)
4547
4548     else: # INSTANCE_CREATE
4549       if getattr(self.op, "os_type", None) is None:
4550         raise errors.OpPrereqError("No guest OS specified")
4551
4552   def _RunAllocator(self):
4553     """Run the allocator based on input opcode.
4554
4555     """
4556     nics = [n.ToDict() for n in self.nics]
4557     ial = IAllocator(self,
4558                      mode=constants.IALLOCATOR_MODE_ALLOC,
4559                      name=self.op.instance_name,
4560                      disk_template=self.op.disk_template,
4561                      tags=[],
4562                      os=self.op.os_type,
4563                      vcpus=self.be_full[constants.BE_VCPUS],
4564                      mem_size=self.be_full[constants.BE_MEMORY],
4565                      disks=self.disks,
4566                      nics=nics,
4567                      hypervisor=self.op.hypervisor,
4568                      )
4569
4570     ial.Run(self.op.iallocator)
4571
4572     if not ial.success:
4573       raise errors.OpPrereqError("Can't compute nodes using"
4574                                  " iallocator '%s': %s" % (self.op.iallocator,
4575                                                            ial.info))
4576     if len(ial.nodes) != ial.required_nodes:
4577       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4578                                  " of nodes (%s), required %s" %
4579                                  (self.op.iallocator, len(ial.nodes),
4580                                   ial.required_nodes))
4581     self.op.pnode = ial.nodes[0]
4582     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4583                  self.op.instance_name, self.op.iallocator,
4584                  ", ".join(ial.nodes))
4585     if ial.required_nodes == 2:
4586       self.op.snode = ial.nodes[1]
4587
4588   def BuildHooksEnv(self):
4589     """Build hooks env.
4590
4591     This runs on master, primary and secondary nodes of the instance.
4592
4593     """
4594     env = {
4595       "ADD_MODE": self.op.mode,
4596       }
4597     if self.op.mode == constants.INSTANCE_IMPORT:
4598       env["SRC_NODE"] = self.op.src_node
4599       env["SRC_PATH"] = self.op.src_path
4600       env["SRC_IMAGES"] = self.src_images
4601
4602     env.update(_BuildInstanceHookEnv(
4603       name=self.op.instance_name,
4604       primary_node=self.op.pnode,
4605       secondary_nodes=self.secondaries,
4606       status=self.op.start,
4607       os_type=self.op.os_type,
4608       memory=self.be_full[constants.BE_MEMORY],
4609       vcpus=self.be_full[constants.BE_VCPUS],
4610       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4611       disk_template=self.op.disk_template,
4612       disks=[(d["size"], d["mode"]) for d in self.disks],
4613       bep=self.be_full,
4614       hvp=self.hv_full,
4615       hypervisor=self.op.hypervisor,
4616     ))
4617
4618     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4619           self.secondaries)
4620     return env, nl, nl
4621
4622
4623   def CheckPrereq(self):
4624     """Check prerequisites.
4625
4626     """
4627     if (not self.cfg.GetVGName() and
4628         self.op.disk_template not in constants.DTS_NOT_LVM):
4629       raise errors.OpPrereqError("Cluster does not support lvm-based"
4630                                  " instances")
4631
4632     if self.op.mode == constants.INSTANCE_IMPORT:
4633       src_node = self.op.src_node
4634       src_path = self.op.src_path
4635
4636       if src_node is None:
4637         exp_list = self.rpc.call_export_list(
4638           self.acquired_locks[locking.LEVEL_NODE])
4639         found = False
4640         for node in exp_list:
4641           if not exp_list[node].failed and src_path in exp_list[node].data:
4642             found = True
4643             self.op.src_node = src_node = node
4644             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4645                                                        src_path)
4646             break
4647         if not found:
4648           raise errors.OpPrereqError("No export found for relative path %s" %
4649                                       src_path)
4650
4651       _CheckNodeOnline(self, src_node)
4652       result = self.rpc.call_export_info(src_node, src_path)
4653       result.Raise()
4654       if not result.data:
4655         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4656
4657       export_info = result.data
4658       if not export_info.has_section(constants.INISECT_EXP):
4659         raise errors.ProgrammerError("Corrupted export config")
4660
4661       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4662       if (int(ei_version) != constants.EXPORT_VERSION):
4663         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4664                                    (ei_version, constants.EXPORT_VERSION))
4665
4666       # Check that the new instance doesn't have less disks than the export
4667       instance_disks = len(self.disks)
4668       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4669       if instance_disks < export_disks:
4670         raise errors.OpPrereqError("Not enough disks to import."
4671                                    " (instance: %d, export: %d)" %
4672                                    (instance_disks, export_disks))
4673
4674       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4675       disk_images = []
4676       for idx in range(export_disks):
4677         option = 'disk%d_dump' % idx
4678         if export_info.has_option(constants.INISECT_INS, option):
4679           # FIXME: are the old os-es, disk sizes, etc. useful?
4680           export_name = export_info.get(constants.INISECT_INS, option)
4681           image = os.path.join(src_path, export_name)
4682           disk_images.append(image)
4683         else:
4684           disk_images.append(False)
4685
4686       self.src_images = disk_images
4687
4688       old_name = export_info.get(constants.INISECT_INS, 'name')
4689       # FIXME: int() here could throw a ValueError on broken exports
4690       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4691       if self.op.instance_name == old_name:
4692         for idx, nic in enumerate(self.nics):
4693           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4694             nic_mac_ini = 'nic%d_mac' % idx
4695             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4696
4697     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4698     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4699     if self.op.start and not self.op.ip_check:
4700       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4701                                  " adding an instance in start mode")
4702
4703     if self.op.ip_check:
4704       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4705         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4706                                    (self.check_ip, self.op.instance_name))
4707
4708     #### mac address generation
4709     # By generating here the mac address both the allocator and the hooks get
4710     # the real final mac address rather than the 'auto' or 'generate' value.
4711     # There is a race condition between the generation and the instance object
4712     # creation, which means that we know the mac is valid now, but we're not
4713     # sure it will be when we actually add the instance. If things go bad
4714     # adding the instance will abort because of a duplicate mac, and the
4715     # creation job will fail.
4716     for nic in self.nics:
4717       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4718         nic.mac = self.cfg.GenerateMAC()
4719
4720     #### allocator run
4721
4722     if self.op.iallocator is not None:
4723       self._RunAllocator()
4724
4725     #### node related checks
4726
4727     # check primary node
4728     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4729     assert self.pnode is not None, \
4730       "Cannot retrieve locked node %s" % self.op.pnode
4731     if pnode.offline:
4732       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4733                                  pnode.name)
4734     if pnode.drained:
4735       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4736                                  pnode.name)
4737
4738     self.secondaries = []
4739
4740     # mirror node verification
4741     if self.op.disk_template in constants.DTS_NET_MIRROR:
4742       if self.op.snode is None:
4743         raise errors.OpPrereqError("The networked disk templates need"
4744                                    " a mirror node")
4745       if self.op.snode == pnode.name:
4746         raise errors.OpPrereqError("The secondary node cannot be"
4747                                    " the primary node.")
4748       _CheckNodeOnline(self, self.op.snode)
4749       _CheckNodeNotDrained(self, self.op.snode)
4750       self.secondaries.append(self.op.snode)
4751
4752     nodenames = [pnode.name] + self.secondaries
4753
4754     req_size = _ComputeDiskSize(self.op.disk_template,
4755                                 self.disks)
4756
4757     # Check lv size requirements
4758     if req_size is not None:
4759       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4760                                          self.op.hypervisor)
4761       for node in nodenames:
4762         info = nodeinfo[node]
4763         info.Raise()
4764         info = info.data
4765         if not info:
4766           raise errors.OpPrereqError("Cannot get current information"
4767                                      " from node '%s'" % node)
4768         vg_free = info.get('vg_free', None)
4769         if not isinstance(vg_free, int):
4770           raise errors.OpPrereqError("Can't compute free disk space on"
4771                                      " node %s" % node)
4772         if req_size > info['vg_free']:
4773           raise errors.OpPrereqError("Not enough disk space on target node %s."
4774                                      " %d MB available, %d MB required" %
4775                                      (node, info['vg_free'], req_size))
4776
4777     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4778
4779     # os verification
4780     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4781     result.Raise()
4782     if not isinstance(result.data, objects.OS) or not result.data:
4783       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4784                                  " primary node"  % self.op.os_type)
4785
4786     # bridge check on primary node
4787     bridges = [n.bridge for n in self.nics]
4788     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4789     result.Raise()
4790     if not result.data:
4791       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4792                                  " exist on destination node '%s'" %
4793                                  (",".join(bridges), pnode.name))
4794
4795     # memory check on primary node
4796     if self.op.start:
4797       _CheckNodeFreeMemory(self, self.pnode.name,
4798                            "creating instance %s" % self.op.instance_name,
4799                            self.be_full[constants.BE_MEMORY],
4800                            self.op.hypervisor)
4801
4802   def Exec(self, feedback_fn):
4803     """Create and add the instance to the cluster.
4804
4805     """
4806     instance = self.op.instance_name
4807     pnode_name = self.pnode.name
4808
4809     ht_kind = self.op.hypervisor
4810     if ht_kind in constants.HTS_REQ_PORT:
4811       network_port = self.cfg.AllocatePort()
4812     else:
4813       network_port = None
4814
4815     ##if self.op.vnc_bind_address is None:
4816     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4817
4818     # this is needed because os.path.join does not accept None arguments
4819     if self.op.file_storage_dir is None:
4820       string_file_storage_dir = ""
4821     else:
4822       string_file_storage_dir = self.op.file_storage_dir
4823
4824     # build the full file storage dir path
4825     file_storage_dir = os.path.normpath(os.path.join(
4826                                         self.cfg.GetFileStorageDir(),
4827                                         string_file_storage_dir, instance))
4828
4829
4830     disks = _GenerateDiskTemplate(self,
4831                                   self.op.disk_template,
4832                                   instance, pnode_name,
4833                                   self.secondaries,
4834                                   self.disks,
4835                                   file_storage_dir,
4836                                   self.op.file_driver,
4837                                   0)
4838
4839     iobj = objects.Instance(name=instance, os=self.op.os_type,
4840                             primary_node=pnode_name,
4841                             nics=self.nics, disks=disks,
4842                             disk_template=self.op.disk_template,
4843                             admin_up=False,
4844                             network_port=network_port,
4845                             beparams=self.op.beparams,
4846                             hvparams=self.op.hvparams,
4847                             hypervisor=self.op.hypervisor,
4848                             )
4849
4850     feedback_fn("* creating instance disks...")
4851     try:
4852       _CreateDisks(self, iobj)
4853     except errors.OpExecError:
4854       self.LogWarning("Device creation failed, reverting...")
4855       try:
4856         _RemoveDisks(self, iobj)
4857       finally:
4858         self.cfg.ReleaseDRBDMinors(instance)
4859         raise
4860
4861     feedback_fn("adding instance %s to cluster config" % instance)
4862
4863     self.cfg.AddInstance(iobj)
4864     # Declare that we don't want to remove the instance lock anymore, as we've
4865     # added the instance to the config
4866     del self.remove_locks[locking.LEVEL_INSTANCE]
4867     # Unlock all the nodes
4868     if self.op.mode == constants.INSTANCE_IMPORT:
4869       nodes_keep = [self.op.src_node]
4870       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4871                        if node != self.op.src_node]
4872       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4873       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4874     else:
4875       self.context.glm.release(locking.LEVEL_NODE)
4876       del self.acquired_locks[locking.LEVEL_NODE]
4877
4878     if self.op.wait_for_sync:
4879       disk_abort = not _WaitForSync(self, iobj)
4880     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4881       # make sure the disks are not degraded (still sync-ing is ok)
4882       time.sleep(15)
4883       feedback_fn("* checking mirrors status")
4884       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4885     else:
4886       disk_abort = False
4887
4888     if disk_abort:
4889       _RemoveDisks(self, iobj)
4890       self.cfg.RemoveInstance(iobj.name)
4891       # Make sure the instance lock gets removed
4892       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4893       raise errors.OpExecError("There are some degraded disks for"
4894                                " this instance")
4895
4896     feedback_fn("creating os for instance %s on node %s" %
4897                 (instance, pnode_name))
4898
4899     if iobj.disk_template != constants.DT_DISKLESS:
4900       if self.op.mode == constants.INSTANCE_CREATE:
4901         feedback_fn("* running the instance OS create scripts...")
4902         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4903         msg = result.RemoteFailMsg()
4904         if msg:
4905           raise errors.OpExecError("Could not add os for instance %s"
4906                                    " on node %s: %s" %
4907                                    (instance, pnode_name, msg))
4908
4909       elif self.op.mode == constants.INSTANCE_IMPORT:
4910         feedback_fn("* running the instance OS import scripts...")
4911         src_node = self.op.src_node
4912         src_images = self.src_images
4913         cluster_name = self.cfg.GetClusterName()
4914         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4915                                                          src_node, src_images,
4916                                                          cluster_name)
4917         import_result.Raise()
4918         for idx, result in enumerate(import_result.data):
4919           if not result:
4920             self.LogWarning("Could not import the image %s for instance"
4921                             " %s, disk %d, on node %s" %
4922                             (src_images[idx], instance, idx, pnode_name))
4923       else:
4924         # also checked in the prereq part
4925         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4926                                      % self.op.mode)
4927
4928     if self.op.start:
4929       iobj.admin_up = True
4930       self.cfg.Update(iobj)
4931       logging.info("Starting instance %s on node %s", instance, pnode_name)
4932       feedback_fn("* starting instance...")
4933       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4934       msg = result.RemoteFailMsg()
4935       if msg:
4936         raise errors.OpExecError("Could not start instance: %s" % msg)
4937
4938
4939 class LUConnectConsole(NoHooksLU):
4940   """Connect to an instance's console.
4941
4942   This is somewhat special in that it returns the command line that
4943   you need to run on the master node in order to connect to the
4944   console.
4945
4946   """
4947   _OP_REQP = ["instance_name"]
4948   REQ_BGL = False
4949
4950   def ExpandNames(self):
4951     self._ExpandAndLockInstance()
4952
4953   def CheckPrereq(self):
4954     """Check prerequisites.
4955
4956     This checks that the instance is in the cluster.
4957
4958     """
4959     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4960     assert self.instance is not None, \
4961       "Cannot retrieve locked instance %s" % self.op.instance_name
4962     _CheckNodeOnline(self, self.instance.primary_node)
4963
4964   def Exec(self, feedback_fn):
4965     """Connect to the console of an instance
4966
4967     """
4968     instance = self.instance
4969     node = instance.primary_node
4970
4971     node_insts = self.rpc.call_instance_list([node],
4972                                              [instance.hypervisor])[node]
4973     node_insts.Raise()
4974
4975     if instance.name not in node_insts.data:
4976       raise errors.OpExecError("Instance %s is not running." % instance.name)
4977
4978     logging.debug("Connecting to console of %s on %s", instance.name, node)
4979
4980     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4981     cluster = self.cfg.GetClusterInfo()
4982     # beparams and hvparams are passed separately, to avoid editing the
4983     # instance and then saving the defaults in the instance itself.
4984     hvparams = cluster.FillHV(instance)
4985     beparams = cluster.FillBE(instance)
4986     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4987
4988     # build ssh cmdline
4989     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4990
4991
4992 class LUReplaceDisks(LogicalUnit):
4993   """Replace the disks of an instance.
4994
4995   """
4996   HPATH = "mirrors-replace"
4997   HTYPE = constants.HTYPE_INSTANCE
4998   _OP_REQP = ["instance_name", "mode", "disks"]
4999   REQ_BGL = False
5000
5001   def CheckArguments(self):
5002     if not hasattr(self.op, "remote_node"):
5003       self.op.remote_node = None
5004     if not hasattr(self.op, "iallocator"):
5005       self.op.iallocator = None
5006
5007     # check for valid parameter combination
5008     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5009     if self.op.mode == constants.REPLACE_DISK_CHG:
5010       if cnt == 2:
5011         raise errors.OpPrereqError("When changing the secondary either an"
5012                                    " iallocator script must be used or the"
5013                                    " new node given")
5014       elif cnt == 0:
5015         raise errors.OpPrereqError("Give either the iallocator or the new"
5016                                    " secondary, not both")
5017     else: # not replacing the secondary
5018       if cnt != 2:
5019         raise errors.OpPrereqError("The iallocator and new node options can"
5020                                    " be used only when changing the"
5021                                    " secondary node")
5022
5023   def ExpandNames(self):
5024     self._ExpandAndLockInstance()
5025
5026     if self.op.iallocator is not None:
5027       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5028     elif self.op.remote_node is not None:
5029       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5030       if remote_node is None:
5031         raise errors.OpPrereqError("Node '%s' not known" %
5032                                    self.op.remote_node)
5033       self.op.remote_node = remote_node
5034       # Warning: do not remove the locking of the new secondary here
5035       # unless DRBD8.AddChildren is changed to work in parallel;
5036       # currently it doesn't since parallel invocations of
5037       # FindUnusedMinor will conflict
5038       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5039       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5040     else:
5041       self.needed_locks[locking.LEVEL_NODE] = []
5042       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5043
5044   def DeclareLocks(self, level):
5045     # If we're not already locking all nodes in the set we have to declare the
5046     # instance's primary/secondary nodes.
5047     if (level == locking.LEVEL_NODE and
5048         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5049       self._LockInstancesNodes()
5050
5051   def _RunAllocator(self):
5052     """Compute a new secondary node using an IAllocator.
5053
5054     """
5055     ial = IAllocator(self,
5056                      mode=constants.IALLOCATOR_MODE_RELOC,
5057                      name=self.op.instance_name,
5058                      relocate_from=[self.sec_node])
5059
5060     ial.Run(self.op.iallocator)
5061
5062     if not ial.success:
5063       raise errors.OpPrereqError("Can't compute nodes using"
5064                                  " iallocator '%s': %s" % (self.op.iallocator,
5065                                                            ial.info))
5066     if len(ial.nodes) != ial.required_nodes:
5067       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5068                                  " of nodes (%s), required %s" %
5069                                  (len(ial.nodes), ial.required_nodes))
5070     self.op.remote_node = ial.nodes[0]
5071     self.LogInfo("Selected new secondary for the instance: %s",
5072                  self.op.remote_node)
5073
5074   def BuildHooksEnv(self):
5075     """Build hooks env.
5076
5077     This runs on the master, the primary and all the secondaries.
5078
5079     """
5080     env = {
5081       "MODE": self.op.mode,
5082       "NEW_SECONDARY": self.op.remote_node,
5083       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5084       }
5085     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5086     nl = [
5087       self.cfg.GetMasterNode(),
5088       self.instance.primary_node,
5089       ]
5090     if self.op.remote_node is not None:
5091       nl.append(self.op.remote_node)
5092     return env, nl, nl
5093
5094   def CheckPrereq(self):
5095     """Check prerequisites.
5096
5097     This checks that the instance is in the cluster.
5098
5099     """
5100     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5101     assert instance is not None, \
5102       "Cannot retrieve locked instance %s" % self.op.instance_name
5103     self.instance = instance
5104
5105     if instance.disk_template != constants.DT_DRBD8:
5106       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5107                                  " instances")
5108
5109     if len(instance.secondary_nodes) != 1:
5110       raise errors.OpPrereqError("The instance has a strange layout,"
5111                                  " expected one secondary but found %d" %
5112                                  len(instance.secondary_nodes))
5113
5114     self.sec_node = instance.secondary_nodes[0]
5115
5116     if self.op.iallocator is not None:
5117       self._RunAllocator()
5118
5119     remote_node = self.op.remote_node
5120     if remote_node is not None:
5121       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5122       assert self.remote_node_info is not None, \
5123         "Cannot retrieve locked node %s" % remote_node
5124     else:
5125       self.remote_node_info = None
5126     if remote_node == instance.primary_node:
5127       raise errors.OpPrereqError("The specified node is the primary node of"
5128                                  " the instance.")
5129     elif remote_node == self.sec_node:
5130       raise errors.OpPrereqError("The specified node is already the"
5131                                  " secondary node of the instance.")
5132
5133     if self.op.mode == constants.REPLACE_DISK_PRI:
5134       n1 = self.tgt_node = instance.primary_node
5135       n2 = self.oth_node = self.sec_node
5136     elif self.op.mode == constants.REPLACE_DISK_SEC:
5137       n1 = self.tgt_node = self.sec_node
5138       n2 = self.oth_node = instance.primary_node
5139     elif self.op.mode == constants.REPLACE_DISK_CHG:
5140       n1 = self.new_node = remote_node
5141       n2 = self.oth_node = instance.primary_node
5142       self.tgt_node = self.sec_node
5143       _CheckNodeNotDrained(self, remote_node)
5144     else:
5145       raise errors.ProgrammerError("Unhandled disk replace mode")
5146
5147     _CheckNodeOnline(self, n1)
5148     _CheckNodeOnline(self, n2)
5149
5150     if not self.op.disks:
5151       self.op.disks = range(len(instance.disks))
5152
5153     for disk_idx in self.op.disks:
5154       instance.FindDisk(disk_idx)
5155
5156   def _ExecD8DiskOnly(self, feedback_fn):
5157     """Replace a disk on the primary or secondary for dbrd8.
5158
5159     The algorithm for replace is quite complicated:
5160
5161       1. for each disk to be replaced:
5162
5163         1. create new LVs on the target node with unique names
5164         1. detach old LVs from the drbd device
5165         1. rename old LVs to name_replaced.<time_t>
5166         1. rename new LVs to old LVs
5167         1. attach the new LVs (with the old names now) to the drbd device
5168
5169       1. wait for sync across all devices
5170
5171       1. for each modified disk:
5172
5173         1. remove old LVs (which have the name name_replaces.<time_t>)
5174
5175     Failures are not very well handled.
5176
5177     """
5178     steps_total = 6
5179     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5180     instance = self.instance
5181     iv_names = {}
5182     vgname = self.cfg.GetVGName()
5183     # start of work
5184     cfg = self.cfg
5185     tgt_node = self.tgt_node
5186     oth_node = self.oth_node
5187
5188     # Step: check device activation
5189     self.proc.LogStep(1, steps_total, "check device existence")
5190     info("checking volume groups")
5191     my_vg = cfg.GetVGName()
5192     results = self.rpc.call_vg_list([oth_node, tgt_node])
5193     if not results:
5194       raise errors.OpExecError("Can't list volume groups on the nodes")
5195     for node in oth_node, tgt_node:
5196       res = results[node]
5197       if res.failed or not res.data or my_vg not in res.data:
5198         raise errors.OpExecError("Volume group '%s' not found on %s" %
5199                                  (my_vg, node))
5200     for idx, dev in enumerate(instance.disks):
5201       if idx not in self.op.disks:
5202         continue
5203       for node in tgt_node, oth_node:
5204         info("checking disk/%d on %s" % (idx, node))
5205         cfg.SetDiskID(dev, node)
5206         result = self.rpc.call_blockdev_find(node, dev)
5207         msg = result.RemoteFailMsg()
5208         if not msg and not result.payload:
5209           msg = "disk not found"
5210         if msg:
5211           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5212                                    (idx, node, msg))
5213
5214     # Step: check other node consistency
5215     self.proc.LogStep(2, steps_total, "check peer consistency")
5216     for idx, dev in enumerate(instance.disks):
5217       if idx not in self.op.disks:
5218         continue
5219       info("checking disk/%d consistency on %s" % (idx, oth_node))
5220       if not _CheckDiskConsistency(self, dev, oth_node,
5221                                    oth_node==instance.primary_node):
5222         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5223                                  " to replace disks on this node (%s)" %
5224                                  (oth_node, tgt_node))
5225
5226     # Step: create new storage
5227     self.proc.LogStep(3, steps_total, "allocate new storage")
5228     for idx, dev in enumerate(instance.disks):
5229       if idx not in self.op.disks:
5230         continue
5231       size = dev.size
5232       cfg.SetDiskID(dev, tgt_node)
5233       lv_names = [".disk%d_%s" % (idx, suf)
5234                   for suf in ["data", "meta"]]
5235       names = _GenerateUniqueNames(self, lv_names)
5236       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5237                              logical_id=(vgname, names[0]))
5238       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5239                              logical_id=(vgname, names[1]))
5240       new_lvs = [lv_data, lv_meta]
5241       old_lvs = dev.children
5242       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5243       info("creating new local storage on %s for %s" %
5244            (tgt_node, dev.iv_name))
5245       # we pass force_create=True to force the LVM creation
5246       for new_lv in new_lvs:
5247         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5248                         _GetInstanceInfoText(instance), False)
5249
5250     # Step: for each lv, detach+rename*2+attach
5251     self.proc.LogStep(4, steps_total, "change drbd configuration")
5252     for dev, old_lvs, new_lvs in iv_names.itervalues():
5253       info("detaching %s drbd from local storage" % dev.iv_name)
5254       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5255       result.Raise()
5256       if not result.data:
5257         raise errors.OpExecError("Can't detach drbd from local storage on node"
5258                                  " %s for device %s" % (tgt_node, dev.iv_name))
5259       #dev.children = []
5260       #cfg.Update(instance)
5261
5262       # ok, we created the new LVs, so now we know we have the needed
5263       # storage; as such, we proceed on the target node to rename
5264       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5265       # using the assumption that logical_id == physical_id (which in
5266       # turn is the unique_id on that node)
5267
5268       # FIXME(iustin): use a better name for the replaced LVs
5269       temp_suffix = int(time.time())
5270       ren_fn = lambda d, suff: (d.physical_id[0],
5271                                 d.physical_id[1] + "_replaced-%s" % suff)
5272       # build the rename list based on what LVs exist on the node
5273       rlist = []
5274       for to_ren in old_lvs:
5275         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5276         if not result.RemoteFailMsg() and result.payload:
5277           # device exists
5278           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5279
5280       info("renaming the old LVs on the target node")
5281       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5282       result.Raise()
5283       if not result.data:
5284         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5285       # now we rename the new LVs to the old LVs
5286       info("renaming the new LVs on the target node")
5287       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5288       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5289       result.Raise()
5290       if not result.data:
5291         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5292
5293       for old, new in zip(old_lvs, new_lvs):
5294         new.logical_id = old.logical_id
5295         cfg.SetDiskID(new, tgt_node)
5296
5297       for disk in old_lvs:
5298         disk.logical_id = ren_fn(disk, temp_suffix)
5299         cfg.SetDiskID(disk, tgt_node)
5300
5301       # now that the new lvs have the old name, we can add them to the device
5302       info("adding new mirror component on %s" % tgt_node)
5303       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5304       if result.failed or not result.data:
5305         for new_lv in new_lvs:
5306           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5307           if msg:
5308             warning("Can't rollback device %s: %s", dev, msg,
5309                     hint="cleanup manually the unused logical volumes")
5310         raise errors.OpExecError("Can't add local storage to drbd")
5311
5312       dev.children = new_lvs
5313       cfg.Update(instance)
5314
5315     # Step: wait for sync
5316
5317     # this can fail as the old devices are degraded and _WaitForSync
5318     # does a combined result over all disks, so we don't check its
5319     # return value
5320     self.proc.LogStep(5, steps_total, "sync devices")
5321     _WaitForSync(self, instance, unlock=True)
5322
5323     # so check manually all the devices
5324     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5325       cfg.SetDiskID(dev, instance.primary_node)
5326       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5327       msg = result.RemoteFailMsg()
5328       if not msg and not result.payload:
5329         msg = "disk not found"
5330       if msg:
5331         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5332                                  (name, msg))
5333       if result.payload[5]:
5334         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5335
5336     # Step: remove old storage
5337     self.proc.LogStep(6, steps_total, "removing old storage")
5338     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5339       info("remove logical volumes for %s" % name)
5340       for lv in old_lvs:
5341         cfg.SetDiskID(lv, tgt_node)
5342         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5343         if msg:
5344           warning("Can't remove old LV: %s" % msg,
5345                   hint="manually remove unused LVs")
5346           continue
5347
5348   def _ExecD8Secondary(self, feedback_fn):
5349     """Replace the secondary node for drbd8.
5350
5351     The algorithm for replace is quite complicated:
5352       - for all disks of the instance:
5353         - create new LVs on the new node with same names
5354         - shutdown the drbd device on the old secondary
5355         - disconnect the drbd network on the primary
5356         - create the drbd device on the new secondary
5357         - network attach the drbd on the primary, using an artifice:
5358           the drbd code for Attach() will connect to the network if it
5359           finds a device which is connected to the good local disks but
5360           not network enabled
5361       - wait for sync across all devices
5362       - remove all disks from the old secondary
5363
5364     Failures are not very well handled.
5365
5366     """
5367     steps_total = 6
5368     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5369     instance = self.instance
5370     iv_names = {}
5371     # start of work
5372     cfg = self.cfg
5373     old_node = self.tgt_node
5374     new_node = self.new_node
5375     pri_node = instance.primary_node
5376     nodes_ip = {
5377       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5378       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5379       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5380       }
5381
5382     # Step: check device activation
5383     self.proc.LogStep(1, steps_total, "check device existence")
5384     info("checking volume groups")
5385     my_vg = cfg.GetVGName()
5386     results = self.rpc.call_vg_list([pri_node, new_node])
5387     for node in pri_node, new_node:
5388       res = results[node]
5389       if res.failed or not res.data or my_vg not in res.data:
5390         raise errors.OpExecError("Volume group '%s' not found on %s" %
5391                                  (my_vg, node))
5392     for idx, dev in enumerate(instance.disks):
5393       if idx not in self.op.disks:
5394         continue
5395       info("checking disk/%d on %s" % (idx, pri_node))
5396       cfg.SetDiskID(dev, pri_node)
5397       result = self.rpc.call_blockdev_find(pri_node, dev)
5398       msg = result.RemoteFailMsg()
5399       if not msg and not result.payload:
5400         msg = "disk not found"
5401       if msg:
5402         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5403                                  (idx, pri_node, msg))
5404
5405     # Step: check other node consistency
5406     self.proc.LogStep(2, steps_total, "check peer consistency")
5407     for idx, dev in enumerate(instance.disks):
5408       if idx not in self.op.disks:
5409         continue
5410       info("checking disk/%d consistency on %s" % (idx, pri_node))
5411       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5412         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5413                                  " unsafe to replace the secondary" %
5414                                  pri_node)
5415
5416     # Step: create new storage
5417     self.proc.LogStep(3, steps_total, "allocate new storage")
5418     for idx, dev in enumerate(instance.disks):
5419       info("adding new local storage on %s for disk/%d" %
5420            (new_node, idx))
5421       # we pass force_create=True to force LVM creation
5422       for new_lv in dev.children:
5423         _CreateBlockDev(self, new_node, instance, new_lv, True,
5424                         _GetInstanceInfoText(instance), False)
5425
5426     # Step 4: dbrd minors and drbd setups changes
5427     # after this, we must manually remove the drbd minors on both the
5428     # error and the success paths
5429     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5430                                    instance.name)
5431     logging.debug("Allocated minors %s" % (minors,))
5432     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5433     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5434       size = dev.size
5435       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5436       # create new devices on new_node; note that we create two IDs:
5437       # one without port, so the drbd will be activated without
5438       # networking information on the new node at this stage, and one
5439       # with network, for the latter activation in step 4
5440       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5441       if pri_node == o_node1:
5442         p_minor = o_minor1
5443       else:
5444         p_minor = o_minor2
5445
5446       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5447       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5448
5449       iv_names[idx] = (dev, dev.children, new_net_id)
5450       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5451                     new_net_id)
5452       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5453                               logical_id=new_alone_id,
5454                               children=dev.children,
5455                               size=dev.size)
5456       try:
5457         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5458                               _GetInstanceInfoText(instance), False)
5459       except errors.GenericError:
5460         self.cfg.ReleaseDRBDMinors(instance.name)
5461         raise
5462
5463     for idx, dev in enumerate(instance.disks):
5464       # we have new devices, shutdown the drbd on the old secondary
5465       info("shutting down drbd for disk/%d on old node" % idx)
5466       cfg.SetDiskID(dev, old_node)
5467       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5468       if msg:
5469         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5470                 (idx, msg),
5471                 hint="Please cleanup this device manually as soon as possible")
5472
5473     info("detaching primary drbds from the network (=> standalone)")
5474     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5475                                                instance.disks)[pri_node]
5476
5477     msg = result.RemoteFailMsg()
5478     if msg:
5479       # detaches didn't succeed (unlikely)
5480       self.cfg.ReleaseDRBDMinors(instance.name)
5481       raise errors.OpExecError("Can't detach the disks from the network on"
5482                                " old node: %s" % (msg,))
5483
5484     # if we managed to detach at least one, we update all the disks of
5485     # the instance to point to the new secondary
5486     info("updating instance configuration")
5487     for dev, _, new_logical_id in iv_names.itervalues():
5488       dev.logical_id = new_logical_id
5489       cfg.SetDiskID(dev, pri_node)
5490     cfg.Update(instance)
5491
5492     # and now perform the drbd attach
5493     info("attaching primary drbds to new secondary (standalone => connected)")
5494     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5495                                            instance.disks, instance.name,
5496                                            False)
5497     for to_node, to_result in result.items():
5498       msg = to_result.RemoteFailMsg()
5499       if msg:
5500         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5501                 hint="please do a gnt-instance info to see the"
5502                 " status of disks")
5503
5504     # this can fail as the old devices are degraded and _WaitForSync
5505     # does a combined result over all disks, so we don't check its
5506     # return value
5507     self.proc.LogStep(5, steps_total, "sync devices")
5508     _WaitForSync(self, instance, unlock=True)
5509
5510     # so check manually all the devices
5511     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5512       cfg.SetDiskID(dev, pri_node)
5513       result = self.rpc.call_blockdev_find(pri_node, dev)
5514       msg = result.RemoteFailMsg()
5515       if not msg and not result.payload:
5516         msg = "disk not found"
5517       if msg:
5518         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5519                                  (idx, msg))
5520       if result.payload[5]:
5521         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5522
5523     self.proc.LogStep(6, steps_total, "removing old storage")
5524     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5525       info("remove logical volumes for disk/%d" % idx)
5526       for lv in old_lvs:
5527         cfg.SetDiskID(lv, old_node)
5528         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5529         if msg:
5530           warning("Can't remove LV on old secondary: %s", msg,
5531                   hint="Cleanup stale volumes by hand")
5532
5533   def Exec(self, feedback_fn):
5534     """Execute disk replacement.
5535
5536     This dispatches the disk replacement to the appropriate handler.
5537
5538     """
5539     instance = self.instance
5540
5541     # Activate the instance disks if we're replacing them on a down instance
5542     if not instance.admin_up:
5543       _StartInstanceDisks(self, instance, True)
5544
5545     if self.op.mode == constants.REPLACE_DISK_CHG:
5546       fn = self._ExecD8Secondary
5547     else:
5548       fn = self._ExecD8DiskOnly
5549
5550     ret = fn(feedback_fn)
5551
5552     # Deactivate the instance disks if we're replacing them on a down instance
5553     if not instance.admin_up:
5554       _SafeShutdownInstanceDisks(self, instance)
5555
5556     return ret
5557
5558
5559 class LUGrowDisk(LogicalUnit):
5560   """Grow a disk of an instance.
5561
5562   """
5563   HPATH = "disk-grow"
5564   HTYPE = constants.HTYPE_INSTANCE
5565   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5566   REQ_BGL = False
5567
5568   def ExpandNames(self):
5569     self._ExpandAndLockInstance()
5570     self.needed_locks[locking.LEVEL_NODE] = []
5571     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5572
5573   def DeclareLocks(self, level):
5574     if level == locking.LEVEL_NODE:
5575       self._LockInstancesNodes()
5576
5577   def BuildHooksEnv(self):
5578     """Build hooks env.
5579
5580     This runs on the master, the primary and all the secondaries.
5581
5582     """
5583     env = {
5584       "DISK": self.op.disk,
5585       "AMOUNT": self.op.amount,
5586       }
5587     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5588     nl = [
5589       self.cfg.GetMasterNode(),
5590       self.instance.primary_node,
5591       ]
5592     return env, nl, nl
5593
5594   def CheckPrereq(self):
5595     """Check prerequisites.
5596
5597     This checks that the instance is in the cluster.
5598
5599     """
5600     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5601     assert instance is not None, \
5602       "Cannot retrieve locked instance %s" % self.op.instance_name
5603     nodenames = list(instance.all_nodes)
5604     for node in nodenames:
5605       _CheckNodeOnline(self, node)
5606
5607
5608     self.instance = instance
5609
5610     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5611       raise errors.OpPrereqError("Instance's disk layout does not support"
5612                                  " growing.")
5613
5614     self.disk = instance.FindDisk(self.op.disk)
5615
5616     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5617                                        instance.hypervisor)
5618     for node in nodenames:
5619       info = nodeinfo[node]
5620       if info.failed or not info.data:
5621         raise errors.OpPrereqError("Cannot get current information"
5622                                    " from node '%s'" % node)
5623       vg_free = info.data.get('vg_free', None)
5624       if not isinstance(vg_free, int):
5625         raise errors.OpPrereqError("Can't compute free disk space on"
5626                                    " node %s" % node)
5627       if self.op.amount > vg_free:
5628         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5629                                    " %d MiB available, %d MiB required" %
5630                                    (node, vg_free, self.op.amount))
5631
5632   def Exec(self, feedback_fn):
5633     """Execute disk grow.
5634
5635     """
5636     instance = self.instance
5637     disk = self.disk
5638     for node in instance.all_nodes:
5639       self.cfg.SetDiskID(disk, node)
5640       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5641       msg = result.RemoteFailMsg()
5642       if msg:
5643         raise errors.OpExecError("Grow request failed to node %s: %s" %
5644                                  (node, msg))
5645     disk.RecordGrow(self.op.amount)
5646     self.cfg.Update(instance)
5647     if self.op.wait_for_sync:
5648       disk_abort = not _WaitForSync(self, instance)
5649       if disk_abort:
5650         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5651                              " status.\nPlease check the instance.")
5652
5653
5654 class LUQueryInstanceData(NoHooksLU):
5655   """Query runtime instance data.
5656
5657   """
5658   _OP_REQP = ["instances", "static"]
5659   REQ_BGL = False
5660
5661   def ExpandNames(self):
5662     self.needed_locks = {}
5663     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5664
5665     if not isinstance(self.op.instances, list):
5666       raise errors.OpPrereqError("Invalid argument type 'instances'")
5667
5668     if self.op.instances:
5669       self.wanted_names = []
5670       for name in self.op.instances:
5671         full_name = self.cfg.ExpandInstanceName(name)
5672         if full_name is None:
5673           raise errors.OpPrereqError("Instance '%s' not known" % name)
5674         self.wanted_names.append(full_name)
5675       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5676     else:
5677       self.wanted_names = None
5678       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5679
5680     self.needed_locks[locking.LEVEL_NODE] = []
5681     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5682
5683   def DeclareLocks(self, level):
5684     if level == locking.LEVEL_NODE:
5685       self._LockInstancesNodes()
5686
5687   def CheckPrereq(self):
5688     """Check prerequisites.
5689
5690     This only checks the optional instance list against the existing names.
5691
5692     """
5693     if self.wanted_names is None:
5694       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5695
5696     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5697                              in self.wanted_names]
5698     return
5699
5700   def _ComputeDiskStatus(self, instance, snode, dev):
5701     """Compute block device status.
5702
5703     """
5704     static = self.op.static
5705     if not static:
5706       self.cfg.SetDiskID(dev, instance.primary_node)
5707       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5708       if dev_pstatus.offline:
5709         dev_pstatus = None
5710       else:
5711         msg = dev_pstatus.RemoteFailMsg()
5712         if msg:
5713           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5714                                    (instance.name, msg))
5715         dev_pstatus = dev_pstatus.payload
5716     else:
5717       dev_pstatus = None
5718
5719     if dev.dev_type in constants.LDS_DRBD:
5720       # we change the snode then (otherwise we use the one passed in)
5721       if dev.logical_id[0] == instance.primary_node:
5722         snode = dev.logical_id[1]
5723       else:
5724         snode = dev.logical_id[0]
5725
5726     if snode and not static:
5727       self.cfg.SetDiskID(dev, snode)
5728       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5729       if dev_sstatus.offline:
5730         dev_sstatus = None
5731       else:
5732         msg = dev_sstatus.RemoteFailMsg()
5733         if msg:
5734           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5735                                    (instance.name, msg))
5736         dev_sstatus = dev_sstatus.payload
5737     else:
5738       dev_sstatus = None
5739
5740     if dev.children:
5741       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5742                       for child in dev.children]
5743     else:
5744       dev_children = []
5745
5746     data = {
5747       "iv_name": dev.iv_name,
5748       "dev_type": dev.dev_type,
5749       "logical_id": dev.logical_id,
5750       "physical_id": dev.physical_id,
5751       "pstatus": dev_pstatus,
5752       "sstatus": dev_sstatus,
5753       "children": dev_children,
5754       "mode": dev.mode,
5755       "size": dev.size,
5756       }
5757
5758     return data
5759
5760   def Exec(self, feedback_fn):
5761     """Gather and return data"""
5762     result = {}
5763
5764     cluster = self.cfg.GetClusterInfo()
5765
5766     for instance in self.wanted_instances:
5767       if not self.op.static:
5768         remote_info = self.rpc.call_instance_info(instance.primary_node,
5769                                                   instance.name,
5770                                                   instance.hypervisor)
5771         remote_info.Raise()
5772         remote_info = remote_info.data
5773         if remote_info and "state" in remote_info:
5774           remote_state = "up"
5775         else:
5776           remote_state = "down"
5777       else:
5778         remote_state = None
5779       if instance.admin_up:
5780         config_state = "up"
5781       else:
5782         config_state = "down"
5783
5784       disks = [self._ComputeDiskStatus(instance, None, device)
5785                for device in instance.disks]
5786
5787       idict = {
5788         "name": instance.name,
5789         "config_state": config_state,
5790         "run_state": remote_state,
5791         "pnode": instance.primary_node,
5792         "snodes": instance.secondary_nodes,
5793         "os": instance.os,
5794         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5795         "disks": disks,
5796         "hypervisor": instance.hypervisor,
5797         "network_port": instance.network_port,
5798         "hv_instance": instance.hvparams,
5799         "hv_actual": cluster.FillHV(instance),
5800         "be_instance": instance.beparams,
5801         "be_actual": cluster.FillBE(instance),
5802         }
5803
5804       result[instance.name] = idict
5805
5806     return result
5807
5808
5809 class LUSetInstanceParams(LogicalUnit):
5810   """Modifies an instances's parameters.
5811
5812   """
5813   HPATH = "instance-modify"
5814   HTYPE = constants.HTYPE_INSTANCE
5815   _OP_REQP = ["instance_name"]
5816   REQ_BGL = False
5817
5818   def CheckArguments(self):
5819     if not hasattr(self.op, 'nics'):
5820       self.op.nics = []
5821     if not hasattr(self.op, 'disks'):
5822       self.op.disks = []
5823     if not hasattr(self.op, 'beparams'):
5824       self.op.beparams = {}
5825     if not hasattr(self.op, 'hvparams'):
5826       self.op.hvparams = {}
5827     self.op.force = getattr(self.op, "force", False)
5828     if not (self.op.nics or self.op.disks or
5829             self.op.hvparams or self.op.beparams):
5830       raise errors.OpPrereqError("No changes submitted")
5831
5832     # Disk validation
5833     disk_addremove = 0
5834     for disk_op, disk_dict in self.op.disks:
5835       if disk_op == constants.DDM_REMOVE:
5836         disk_addremove += 1
5837         continue
5838       elif disk_op == constants.DDM_ADD:
5839         disk_addremove += 1
5840       else:
5841         if not isinstance(disk_op, int):
5842           raise errors.OpPrereqError("Invalid disk index")
5843       if disk_op == constants.DDM_ADD:
5844         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5845         if mode not in constants.DISK_ACCESS_SET:
5846           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5847         size = disk_dict.get('size', None)
5848         if size is None:
5849           raise errors.OpPrereqError("Required disk parameter size missing")
5850         try:
5851           size = int(size)
5852         except ValueError, err:
5853           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5854                                      str(err))
5855         disk_dict['size'] = size
5856       else:
5857         # modification of disk
5858         if 'size' in disk_dict:
5859           raise errors.OpPrereqError("Disk size change not possible, use"
5860                                      " grow-disk")
5861
5862     if disk_addremove > 1:
5863       raise errors.OpPrereqError("Only one disk add or remove operation"
5864                                  " supported at a time")
5865
5866     # NIC validation
5867     nic_addremove = 0
5868     for nic_op, nic_dict in self.op.nics:
5869       if nic_op == constants.DDM_REMOVE:
5870         nic_addremove += 1
5871         continue
5872       elif nic_op == constants.DDM_ADD:
5873         nic_addremove += 1
5874       else:
5875         if not isinstance(nic_op, int):
5876           raise errors.OpPrereqError("Invalid nic index")
5877
5878       # nic_dict should be a dict
5879       nic_ip = nic_dict.get('ip', None)
5880       if nic_ip is not None:
5881         if nic_ip.lower() == constants.VALUE_NONE:
5882           nic_dict['ip'] = None
5883         else:
5884           if not utils.IsValidIP(nic_ip):
5885             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5886
5887       if nic_op == constants.DDM_ADD:
5888         nic_bridge = nic_dict.get('bridge', None)
5889         if nic_bridge is None:
5890           nic_dict['bridge'] = self.cfg.GetDefBridge()
5891         nic_mac = nic_dict.get('mac', None)
5892         if nic_mac is None:
5893           nic_dict['mac'] = constants.VALUE_AUTO
5894
5895       if 'mac' in nic_dict:
5896         nic_mac = nic_dict['mac']
5897         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5898           if not utils.IsValidMac(nic_mac):
5899             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5900         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5901           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5902                                      " modifying an existing nic")
5903
5904     if nic_addremove > 1:
5905       raise errors.OpPrereqError("Only one NIC add or remove operation"
5906                                  " supported at a time")
5907
5908   def ExpandNames(self):
5909     self._ExpandAndLockInstance()
5910     self.needed_locks[locking.LEVEL_NODE] = []
5911     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5912
5913   def DeclareLocks(self, level):
5914     if level == locking.LEVEL_NODE:
5915       self._LockInstancesNodes()
5916
5917   def BuildHooksEnv(self):
5918     """Build hooks env.
5919
5920     This runs on the master, primary and secondaries.
5921
5922     """
5923     args = dict()
5924     if constants.BE_MEMORY in self.be_new:
5925       args['memory'] = self.be_new[constants.BE_MEMORY]
5926     if constants.BE_VCPUS in self.be_new:
5927       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5928     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5929     # information at all.
5930     if self.op.nics:
5931       args['nics'] = []
5932       nic_override = dict(self.op.nics)
5933       for idx, nic in enumerate(self.instance.nics):
5934         if idx in nic_override:
5935           this_nic_override = nic_override[idx]
5936         else:
5937           this_nic_override = {}
5938         if 'ip' in this_nic_override:
5939           ip = this_nic_override['ip']
5940         else:
5941           ip = nic.ip
5942         if 'bridge' in this_nic_override:
5943           bridge = this_nic_override['bridge']
5944         else:
5945           bridge = nic.bridge
5946         if 'mac' in this_nic_override:
5947           mac = this_nic_override['mac']
5948         else:
5949           mac = nic.mac
5950         args['nics'].append((ip, bridge, mac))
5951       if constants.DDM_ADD in nic_override:
5952         ip = nic_override[constants.DDM_ADD].get('ip', None)
5953         bridge = nic_override[constants.DDM_ADD]['bridge']
5954         mac = nic_override[constants.DDM_ADD]['mac']
5955         args['nics'].append((ip, bridge, mac))
5956       elif constants.DDM_REMOVE in nic_override:
5957         del args['nics'][-1]
5958
5959     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5960     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5961     return env, nl, nl
5962
5963   def CheckPrereq(self):
5964     """Check prerequisites.
5965
5966     This only checks the instance list against the existing names.
5967
5968     """
5969     force = self.force = self.op.force
5970
5971     # checking the new params on the primary/secondary nodes
5972
5973     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5974     assert self.instance is not None, \
5975       "Cannot retrieve locked instance %s" % self.op.instance_name
5976     pnode = instance.primary_node
5977     nodelist = list(instance.all_nodes)
5978
5979     # hvparams processing
5980     if self.op.hvparams:
5981       i_hvdict = copy.deepcopy(instance.hvparams)
5982       for key, val in self.op.hvparams.iteritems():
5983         if val == constants.VALUE_DEFAULT:
5984           try:
5985             del i_hvdict[key]
5986           except KeyError:
5987             pass
5988         else:
5989           i_hvdict[key] = val
5990       cluster = self.cfg.GetClusterInfo()
5991       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5992       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5993                                 i_hvdict)
5994       # local check
5995       hypervisor.GetHypervisor(
5996         instance.hypervisor).CheckParameterSyntax(hv_new)
5997       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
5998       self.hv_new = hv_new # the new actual values
5999       self.hv_inst = i_hvdict # the new dict (without defaults)
6000     else:
6001       self.hv_new = self.hv_inst = {}
6002
6003     # beparams processing
6004     if self.op.beparams:
6005       i_bedict = copy.deepcopy(instance.beparams)
6006       for key, val in self.op.beparams.iteritems():
6007         if val == constants.VALUE_DEFAULT:
6008           try:
6009             del i_bedict[key]
6010           except KeyError:
6011             pass
6012         else:
6013           i_bedict[key] = val
6014       cluster = self.cfg.GetClusterInfo()
6015       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
6016       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
6017                                 i_bedict)
6018       self.be_new = be_new # the new actual values
6019       self.be_inst = i_bedict # the new dict (without defaults)
6020     else:
6021       self.be_new = self.be_inst = {}
6022
6023     self.warn = []
6024
6025     if constants.BE_MEMORY in self.op.beparams and not self.force:
6026       mem_check_list = [pnode]
6027       if be_new[constants.BE_AUTO_BALANCE]:
6028         # either we changed auto_balance to yes or it was from before
6029         mem_check_list.extend(instance.secondary_nodes)
6030       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6031                                                   instance.hypervisor)
6032       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6033                                          instance.hypervisor)
6034       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6035         # Assume the primary node is unreachable and go ahead
6036         self.warn.append("Can't get info from primary node %s" % pnode)
6037       else:
6038         if not instance_info.failed and instance_info.data:
6039           current_mem = int(instance_info.data['memory'])
6040         else:
6041           # Assume instance not running
6042           # (there is a slight race condition here, but it's not very probable,
6043           # and we have no other way to check)
6044           current_mem = 0
6045         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6046                     nodeinfo[pnode].data['memory_free'])
6047         if miss_mem > 0:
6048           raise errors.OpPrereqError("This change will prevent the instance"
6049                                      " from starting, due to %d MB of memory"
6050                                      " missing on its primary node" % miss_mem)
6051
6052       if be_new[constants.BE_AUTO_BALANCE]:
6053         for node, nres in nodeinfo.iteritems():
6054           if node not in instance.secondary_nodes:
6055             continue
6056           if nres.failed or not isinstance(nres.data, dict):
6057             self.warn.append("Can't get info from secondary node %s" % node)
6058           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6059             self.warn.append("Not enough memory to failover instance to"
6060                              " secondary node %s" % node)
6061
6062     # NIC processing
6063     for nic_op, nic_dict in self.op.nics:
6064       if nic_op == constants.DDM_REMOVE:
6065         if not instance.nics:
6066           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6067         continue
6068       if nic_op != constants.DDM_ADD:
6069         # an existing nic
6070         if nic_op < 0 or nic_op >= len(instance.nics):
6071           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6072                                      " are 0 to %d" %
6073                                      (nic_op, len(instance.nics)))
6074       if 'bridge' in nic_dict:
6075         nic_bridge = nic_dict['bridge']
6076         if nic_bridge is None:
6077           raise errors.OpPrereqError('Cannot set the nic bridge to None')
6078         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6079           msg = ("Bridge '%s' doesn't exist on one of"
6080                  " the instance nodes" % nic_bridge)
6081           if self.force:
6082             self.warn.append(msg)
6083           else:
6084             raise errors.OpPrereqError(msg)
6085       if 'mac' in nic_dict:
6086         nic_mac = nic_dict['mac']
6087         if nic_mac is None:
6088           raise errors.OpPrereqError('Cannot set the nic mac to None')
6089         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6090           # otherwise generate the mac
6091           nic_dict['mac'] = self.cfg.GenerateMAC()
6092         else:
6093           # or validate/reserve the current one
6094           if self.cfg.IsMacInUse(nic_mac):
6095             raise errors.OpPrereqError("MAC address %s already in use"
6096                                        " in cluster" % nic_mac)
6097
6098     # DISK processing
6099     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6100       raise errors.OpPrereqError("Disk operations not supported for"
6101                                  " diskless instances")
6102     for disk_op, disk_dict in self.op.disks:
6103       if disk_op == constants.DDM_REMOVE:
6104         if len(instance.disks) == 1:
6105           raise errors.OpPrereqError("Cannot remove the last disk of"
6106                                      " an instance")
6107         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6108         ins_l = ins_l[pnode]
6109         if ins_l.failed or not isinstance(ins_l.data, list):
6110           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6111         if instance.name in ins_l.data:
6112           raise errors.OpPrereqError("Instance is running, can't remove"
6113                                      " disks.")
6114
6115       if (disk_op == constants.DDM_ADD and
6116           len(instance.nics) >= constants.MAX_DISKS):
6117         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6118                                    " add more" % constants.MAX_DISKS)
6119       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6120         # an existing disk
6121         if disk_op < 0 or disk_op >= len(instance.disks):
6122           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6123                                      " are 0 to %d" %
6124                                      (disk_op, len(instance.disks)))
6125
6126     return
6127
6128   def Exec(self, feedback_fn):
6129     """Modifies an instance.
6130
6131     All parameters take effect only at the next restart of the instance.
6132
6133     """
6134     # Process here the warnings from CheckPrereq, as we don't have a
6135     # feedback_fn there.
6136     for warn in self.warn:
6137       feedback_fn("WARNING: %s" % warn)
6138
6139     result = []
6140     instance = self.instance
6141     # disk changes
6142     for disk_op, disk_dict in self.op.disks:
6143       if disk_op == constants.DDM_REMOVE:
6144         # remove the last disk
6145         device = instance.disks.pop()
6146         device_idx = len(instance.disks)
6147         for node, disk in device.ComputeNodeTree(instance.primary_node):
6148           self.cfg.SetDiskID(disk, node)
6149           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6150           if msg:
6151             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6152                             " continuing anyway", device_idx, node, msg)
6153         result.append(("disk/%d" % device_idx, "remove"))
6154       elif disk_op == constants.DDM_ADD:
6155         # add a new disk
6156         if instance.disk_template == constants.DT_FILE:
6157           file_driver, file_path = instance.disks[0].logical_id
6158           file_path = os.path.dirname(file_path)
6159         else:
6160           file_driver = file_path = None
6161         disk_idx_base = len(instance.disks)
6162         new_disk = _GenerateDiskTemplate(self,
6163                                          instance.disk_template,
6164                                          instance.name, instance.primary_node,
6165                                          instance.secondary_nodes,
6166                                          [disk_dict],
6167                                          file_path,
6168                                          file_driver,
6169                                          disk_idx_base)[0]
6170         instance.disks.append(new_disk)
6171         info = _GetInstanceInfoText(instance)
6172
6173         logging.info("Creating volume %s for instance %s",
6174                      new_disk.iv_name, instance.name)
6175         # Note: this needs to be kept in sync with _CreateDisks
6176         #HARDCODE
6177         for node in instance.all_nodes:
6178           f_create = node == instance.primary_node
6179           try:
6180             _CreateBlockDev(self, node, instance, new_disk,
6181                             f_create, info, f_create)
6182           except errors.OpExecError, err:
6183             self.LogWarning("Failed to create volume %s (%s) on"
6184                             " node %s: %s",
6185                             new_disk.iv_name, new_disk, node, err)
6186         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6187                        (new_disk.size, new_disk.mode)))
6188       else:
6189         # change a given disk
6190         instance.disks[disk_op].mode = disk_dict['mode']
6191         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6192     # NIC changes
6193     for nic_op, nic_dict in self.op.nics:
6194       if nic_op == constants.DDM_REMOVE:
6195         # remove the last nic
6196         del instance.nics[-1]
6197         result.append(("nic.%d" % len(instance.nics), "remove"))
6198       elif nic_op == constants.DDM_ADD:
6199         # mac and bridge should be set, by now
6200         mac = nic_dict['mac']
6201         bridge = nic_dict['bridge']
6202         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6203                               bridge=bridge)
6204         instance.nics.append(new_nic)
6205         result.append(("nic.%d" % (len(instance.nics) - 1),
6206                        "add:mac=%s,ip=%s,bridge=%s" %
6207                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6208       else:
6209         # change a given nic
6210         for key in 'mac', 'ip', 'bridge':
6211           if key in nic_dict:
6212             setattr(instance.nics[nic_op], key, nic_dict[key])
6213             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6214
6215     # hvparams changes
6216     if self.op.hvparams:
6217       instance.hvparams = self.hv_inst
6218       for key, val in self.op.hvparams.iteritems():
6219         result.append(("hv/%s" % key, val))
6220
6221     # beparams changes
6222     if self.op.beparams:
6223       instance.beparams = self.be_inst
6224       for key, val in self.op.beparams.iteritems():
6225         result.append(("be/%s" % key, val))
6226
6227     self.cfg.Update(instance)
6228
6229     return result
6230
6231
6232 class LUQueryExports(NoHooksLU):
6233   """Query the exports list
6234
6235   """
6236   _OP_REQP = ['nodes']
6237   REQ_BGL = False
6238
6239   def ExpandNames(self):
6240     self.needed_locks = {}
6241     self.share_locks[locking.LEVEL_NODE] = 1
6242     if not self.op.nodes:
6243       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6244     else:
6245       self.needed_locks[locking.LEVEL_NODE] = \
6246         _GetWantedNodes(self, self.op.nodes)
6247
6248   def CheckPrereq(self):
6249     """Check prerequisites.
6250
6251     """
6252     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6253
6254   def Exec(self, feedback_fn):
6255     """Compute the list of all the exported system images.
6256
6257     @rtype: dict
6258     @return: a dictionary with the structure node->(export-list)
6259         where export-list is a list of the instances exported on
6260         that node.
6261
6262     """
6263     rpcresult = self.rpc.call_export_list(self.nodes)
6264     result = {}
6265     for node in rpcresult:
6266       if rpcresult[node].failed:
6267         result[node] = False
6268       else:
6269         result[node] = rpcresult[node].data
6270
6271     return result
6272
6273
6274 class LUExportInstance(LogicalUnit):
6275   """Export an instance to an image in the cluster.
6276
6277   """
6278   HPATH = "instance-export"
6279   HTYPE = constants.HTYPE_INSTANCE
6280   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6281   REQ_BGL = False
6282
6283   def ExpandNames(self):
6284     self._ExpandAndLockInstance()
6285     # FIXME: lock only instance primary and destination node
6286     #
6287     # Sad but true, for now we have do lock all nodes, as we don't know where
6288     # the previous export might be, and and in this LU we search for it and
6289     # remove it from its current node. In the future we could fix this by:
6290     #  - making a tasklet to search (share-lock all), then create the new one,
6291     #    then one to remove, after
6292     #  - removing the removal operation altoghether
6293     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6294
6295   def DeclareLocks(self, level):
6296     """Last minute lock declaration."""
6297     # All nodes are locked anyway, so nothing to do here.
6298
6299   def BuildHooksEnv(self):
6300     """Build hooks env.
6301
6302     This will run on the master, primary node and target node.
6303
6304     """
6305     env = {
6306       "EXPORT_NODE": self.op.target_node,
6307       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6308       }
6309     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6310     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6311           self.op.target_node]
6312     return env, nl, nl
6313
6314   def CheckPrereq(self):
6315     """Check prerequisites.
6316
6317     This checks that the instance and node names are valid.
6318
6319     """
6320     instance_name = self.op.instance_name
6321     self.instance = self.cfg.GetInstanceInfo(instance_name)
6322     assert self.instance is not None, \
6323           "Cannot retrieve locked instance %s" % self.op.instance_name
6324     _CheckNodeOnline(self, self.instance.primary_node)
6325
6326     self.dst_node = self.cfg.GetNodeInfo(
6327       self.cfg.ExpandNodeName(self.op.target_node))
6328
6329     if self.dst_node is None:
6330       # This is wrong node name, not a non-locked node
6331       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6332     _CheckNodeOnline(self, self.dst_node.name)
6333     _CheckNodeNotDrained(self, self.dst_node.name)
6334
6335     # instance disk type verification
6336     for disk in self.instance.disks:
6337       if disk.dev_type == constants.LD_FILE:
6338         raise errors.OpPrereqError("Export not supported for instances with"
6339                                    " file-based disks")
6340
6341   def Exec(self, feedback_fn):
6342     """Export an instance to an image in the cluster.
6343
6344     """
6345     instance = self.instance
6346     dst_node = self.dst_node
6347     src_node = instance.primary_node
6348     if self.op.shutdown:
6349       # shutdown the instance, but not the disks
6350       result = self.rpc.call_instance_shutdown(src_node, instance)
6351       msg = result.RemoteFailMsg()
6352       if msg:
6353         raise errors.OpExecError("Could not shutdown instance %s on"
6354                                  " node %s: %s" %
6355                                  (instance.name, src_node, msg))
6356
6357     vgname = self.cfg.GetVGName()
6358
6359     snap_disks = []
6360
6361     # set the disks ID correctly since call_instance_start needs the
6362     # correct drbd minor to create the symlinks
6363     for disk in instance.disks:
6364       self.cfg.SetDiskID(disk, src_node)
6365
6366     try:
6367       for idx, disk in enumerate(instance.disks):
6368         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6369         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6370         if new_dev_name.failed or not new_dev_name.data:
6371           self.LogWarning("Could not snapshot disk/%d on node %s",
6372                           idx, src_node)
6373           snap_disks.append(False)
6374         else:
6375           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6376                                  logical_id=(vgname, new_dev_name.data),
6377                                  physical_id=(vgname, new_dev_name.data),
6378                                  iv_name=disk.iv_name)
6379           snap_disks.append(new_dev)
6380
6381     finally:
6382       if self.op.shutdown and instance.admin_up:
6383         result = self.rpc.call_instance_start(src_node, instance, None, None)
6384         msg = result.RemoteFailMsg()
6385         if msg:
6386           _ShutdownInstanceDisks(self, instance)
6387           raise errors.OpExecError("Could not start instance: %s" % msg)
6388
6389     # TODO: check for size
6390
6391     cluster_name = self.cfg.GetClusterName()
6392     for idx, dev in enumerate(snap_disks):
6393       if dev:
6394         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6395                                                instance, cluster_name, idx)
6396         if result.failed or not result.data:
6397           self.LogWarning("Could not export disk/%d from node %s to"
6398                           " node %s", idx, src_node, dst_node.name)
6399         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6400         if msg:
6401           self.LogWarning("Could not remove snapshot for disk/%d from node"
6402                           " %s: %s", idx, src_node, msg)
6403
6404     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6405     if result.failed or not result.data:
6406       self.LogWarning("Could not finalize export for instance %s on node %s",
6407                       instance.name, dst_node.name)
6408
6409     nodelist = self.cfg.GetNodeList()
6410     nodelist.remove(dst_node.name)
6411
6412     # on one-node clusters nodelist will be empty after the removal
6413     # if we proceed the backup would be removed because OpQueryExports
6414     # substitutes an empty list with the full cluster node list.
6415     if nodelist:
6416       exportlist = self.rpc.call_export_list(nodelist)
6417       for node in exportlist:
6418         if exportlist[node].failed:
6419           continue
6420         if instance.name in exportlist[node].data:
6421           if not self.rpc.call_export_remove(node, instance.name):
6422             self.LogWarning("Could not remove older export for instance %s"
6423                             " on node %s", instance.name, node)
6424
6425
6426 class LURemoveExport(NoHooksLU):
6427   """Remove exports related to the named instance.
6428
6429   """
6430   _OP_REQP = ["instance_name"]
6431   REQ_BGL = False
6432
6433   def ExpandNames(self):
6434     self.needed_locks = {}
6435     # We need all nodes to be locked in order for RemoveExport to work, but we
6436     # don't need to lock the instance itself, as nothing will happen to it (and
6437     # we can remove exports also for a removed instance)
6438     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6439
6440   def CheckPrereq(self):
6441     """Check prerequisites.
6442     """
6443     pass
6444
6445   def Exec(self, feedback_fn):
6446     """Remove any export.
6447
6448     """
6449     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6450     # If the instance was not found we'll try with the name that was passed in.
6451     # This will only work if it was an FQDN, though.
6452     fqdn_warn = False
6453     if not instance_name:
6454       fqdn_warn = True
6455       instance_name = self.op.instance_name
6456
6457     exportlist = self.rpc.call_export_list(self.acquired_locks[
6458       locking.LEVEL_NODE])
6459     found = False
6460     for node in exportlist:
6461       if exportlist[node].failed:
6462         self.LogWarning("Failed to query node %s, continuing" % node)
6463         continue
6464       if instance_name in exportlist[node].data:
6465         found = True
6466         result = self.rpc.call_export_remove(node, instance_name)
6467         if result.failed or not result.data:
6468           logging.error("Could not remove export for instance %s"
6469                         " on node %s", instance_name, node)
6470
6471     if fqdn_warn and not found:
6472       feedback_fn("Export not found. If trying to remove an export belonging"
6473                   " to a deleted instance please use its Fully Qualified"
6474                   " Domain Name.")
6475
6476
6477 class TagsLU(NoHooksLU):
6478   """Generic tags LU.
6479
6480   This is an abstract class which is the parent of all the other tags LUs.
6481
6482   """
6483
6484   def ExpandNames(self):
6485     self.needed_locks = {}
6486     if self.op.kind == constants.TAG_NODE:
6487       name = self.cfg.ExpandNodeName(self.op.name)
6488       if name is None:
6489         raise errors.OpPrereqError("Invalid node name (%s)" %
6490                                    (self.op.name,))
6491       self.op.name = name
6492       self.needed_locks[locking.LEVEL_NODE] = name
6493     elif self.op.kind == constants.TAG_INSTANCE:
6494       name = self.cfg.ExpandInstanceName(self.op.name)
6495       if name is None:
6496         raise errors.OpPrereqError("Invalid instance name (%s)" %
6497                                    (self.op.name,))
6498       self.op.name = name
6499       self.needed_locks[locking.LEVEL_INSTANCE] = name
6500
6501   def CheckPrereq(self):
6502     """Check prerequisites.
6503
6504     """
6505     if self.op.kind == constants.TAG_CLUSTER:
6506       self.target = self.cfg.GetClusterInfo()
6507     elif self.op.kind == constants.TAG_NODE:
6508       self.target = self.cfg.GetNodeInfo(self.op.name)
6509     elif self.op.kind == constants.TAG_INSTANCE:
6510       self.target = self.cfg.GetInstanceInfo(self.op.name)
6511     else:
6512       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6513                                  str(self.op.kind))
6514
6515
6516 class LUGetTags(TagsLU):
6517   """Returns the tags of a given object.
6518
6519   """
6520   _OP_REQP = ["kind", "name"]
6521   REQ_BGL = False
6522
6523   def Exec(self, feedback_fn):
6524     """Returns the tag list.
6525
6526     """
6527     return list(self.target.GetTags())
6528
6529
6530 class LUSearchTags(NoHooksLU):
6531   """Searches the tags for a given pattern.
6532
6533   """
6534   _OP_REQP = ["pattern"]
6535   REQ_BGL = False
6536
6537   def ExpandNames(self):
6538     self.needed_locks = {}
6539
6540   def CheckPrereq(self):
6541     """Check prerequisites.
6542
6543     This checks the pattern passed for validity by compiling it.
6544
6545     """
6546     try:
6547       self.re = re.compile(self.op.pattern)
6548     except re.error, err:
6549       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6550                                  (self.op.pattern, err))
6551
6552   def Exec(self, feedback_fn):
6553     """Returns the tag list.
6554
6555     """
6556     cfg = self.cfg
6557     tgts = [("/cluster", cfg.GetClusterInfo())]
6558     ilist = cfg.GetAllInstancesInfo().values()
6559     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6560     nlist = cfg.GetAllNodesInfo().values()
6561     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6562     results = []
6563     for path, target in tgts:
6564       for tag in target.GetTags():
6565         if self.re.search(tag):
6566           results.append((path, tag))
6567     return results
6568
6569
6570 class LUAddTags(TagsLU):
6571   """Sets a tag on a given object.
6572
6573   """
6574   _OP_REQP = ["kind", "name", "tags"]
6575   REQ_BGL = False
6576
6577   def CheckPrereq(self):
6578     """Check prerequisites.
6579
6580     This checks the type and length of the tag name and value.
6581
6582     """
6583     TagsLU.CheckPrereq(self)
6584     for tag in self.op.tags:
6585       objects.TaggableObject.ValidateTag(tag)
6586
6587   def Exec(self, feedback_fn):
6588     """Sets the tag.
6589
6590     """
6591     try:
6592       for tag in self.op.tags:
6593         self.target.AddTag(tag)
6594     except errors.TagError, err:
6595       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6596     try:
6597       self.cfg.Update(self.target)
6598     except errors.ConfigurationError:
6599       raise errors.OpRetryError("There has been a modification to the"
6600                                 " config file and the operation has been"
6601                                 " aborted. Please retry.")
6602
6603
6604 class LUDelTags(TagsLU):
6605   """Delete a list of tags from a given object.
6606
6607   """
6608   _OP_REQP = ["kind", "name", "tags"]
6609   REQ_BGL = False
6610
6611   def CheckPrereq(self):
6612     """Check prerequisites.
6613
6614     This checks that we have the given tag.
6615
6616     """
6617     TagsLU.CheckPrereq(self)
6618     for tag in self.op.tags:
6619       objects.TaggableObject.ValidateTag(tag)
6620     del_tags = frozenset(self.op.tags)
6621     cur_tags = self.target.GetTags()
6622     if not del_tags <= cur_tags:
6623       diff_tags = del_tags - cur_tags
6624       diff_names = ["'%s'" % tag for tag in diff_tags]
6625       diff_names.sort()
6626       raise errors.OpPrereqError("Tag(s) %s not found" %
6627                                  (",".join(diff_names)))
6628
6629   def Exec(self, feedback_fn):
6630     """Remove the tag from the object.
6631
6632     """
6633     for tag in self.op.tags:
6634       self.target.RemoveTag(tag)
6635     try:
6636       self.cfg.Update(self.target)
6637     except errors.ConfigurationError:
6638       raise errors.OpRetryError("There has been a modification to the"
6639                                 " config file and the operation has been"
6640                                 " aborted. Please retry.")
6641
6642
6643 class LUTestDelay(NoHooksLU):
6644   """Sleep for a specified amount of time.
6645
6646   This LU sleeps on the master and/or nodes for a specified amount of
6647   time.
6648
6649   """
6650   _OP_REQP = ["duration", "on_master", "on_nodes"]
6651   REQ_BGL = False
6652
6653   def ExpandNames(self):
6654     """Expand names and set required locks.
6655
6656     This expands the node list, if any.
6657
6658     """
6659     self.needed_locks = {}
6660     if self.op.on_nodes:
6661       # _GetWantedNodes can be used here, but is not always appropriate to use
6662       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6663       # more information.
6664       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6665       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6666
6667   def CheckPrereq(self):
6668     """Check prerequisites.
6669
6670     """
6671
6672   def Exec(self, feedback_fn):
6673     """Do the actual sleep.
6674
6675     """
6676     if self.op.on_master:
6677       if not utils.TestDelay(self.op.duration):
6678         raise errors.OpExecError("Error during master delay test")
6679     if self.op.on_nodes:
6680       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6681       if not result:
6682         raise errors.OpExecError("Complete failure from rpc call")
6683       for node, node_result in result.items():
6684         node_result.Raise()
6685         if not node_result.data:
6686           raise errors.OpExecError("Failure during rpc call to node %s,"
6687                                    " result: %s" % (node, node_result.data))
6688
6689
6690 class IAllocator(object):
6691   """IAllocator framework.
6692
6693   An IAllocator instance has three sets of attributes:
6694     - cfg that is needed to query the cluster
6695     - input data (all members of the _KEYS class attribute are required)
6696     - four buffer attributes (in|out_data|text), that represent the
6697       input (to the external script) in text and data structure format,
6698       and the output from it, again in two formats
6699     - the result variables from the script (success, info, nodes) for
6700       easy usage
6701
6702   """
6703   _ALLO_KEYS = [
6704     "mem_size", "disks", "disk_template",
6705     "os", "tags", "nics", "vcpus", "hypervisor",
6706     ]
6707   _RELO_KEYS = [
6708     "relocate_from",
6709     ]
6710
6711   def __init__(self, lu, mode, name, **kwargs):
6712     self.lu = lu
6713     # init buffer variables
6714     self.in_text = self.out_text = self.in_data = self.out_data = None
6715     # init all input fields so that pylint is happy
6716     self.mode = mode
6717     self.name = name
6718     self.mem_size = self.disks = self.disk_template = None
6719     self.os = self.tags = self.nics = self.vcpus = None
6720     self.hypervisor = None
6721     self.relocate_from = None
6722     # computed fields
6723     self.required_nodes = None
6724     # init result fields
6725     self.success = self.info = self.nodes = None
6726     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6727       keyset = self._ALLO_KEYS
6728     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6729       keyset = self._RELO_KEYS
6730     else:
6731       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6732                                    " IAllocator" % self.mode)
6733     for key in kwargs:
6734       if key not in keyset:
6735         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6736                                      " IAllocator" % key)
6737       setattr(self, key, kwargs[key])
6738     for key in keyset:
6739       if key not in kwargs:
6740         raise errors.ProgrammerError("Missing input parameter '%s' to"
6741                                      " IAllocator" % key)
6742     self._BuildInputData()
6743
6744   def _ComputeClusterData(self):
6745     """Compute the generic allocator input data.
6746
6747     This is the data that is independent of the actual operation.
6748
6749     """
6750     cfg = self.lu.cfg
6751     cluster_info = cfg.GetClusterInfo()
6752     # cluster data
6753     data = {
6754       "version": constants.IALLOCATOR_VERSION,
6755       "cluster_name": cfg.GetClusterName(),
6756       "cluster_tags": list(cluster_info.GetTags()),
6757       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6758       # we don't have job IDs
6759       }
6760     iinfo = cfg.GetAllInstancesInfo().values()
6761     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6762
6763     # node data
6764     node_results = {}
6765     node_list = cfg.GetNodeList()
6766
6767     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6768       hypervisor_name = self.hypervisor
6769     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6770       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6771
6772     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6773                                            hypervisor_name)
6774     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6775                        cluster_info.enabled_hypervisors)
6776     for nname, nresult in node_data.items():
6777       # first fill in static (config-based) values
6778       ninfo = cfg.GetNodeInfo(nname)
6779       pnr = {
6780         "tags": list(ninfo.GetTags()),
6781         "primary_ip": ninfo.primary_ip,
6782         "secondary_ip": ninfo.secondary_ip,
6783         "offline": ninfo.offline,
6784         "drained": ninfo.drained,
6785         "master_candidate": ninfo.master_candidate,
6786         }
6787
6788       if not ninfo.offline:
6789         nresult.Raise()
6790         if not isinstance(nresult.data, dict):
6791           raise errors.OpExecError("Can't get data for node %s" % nname)
6792         remote_info = nresult.data
6793         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6794                      'vg_size', 'vg_free', 'cpu_total']:
6795           if attr not in remote_info:
6796             raise errors.OpExecError("Node '%s' didn't return attribute"
6797                                      " '%s'" % (nname, attr))
6798           try:
6799             remote_info[attr] = int(remote_info[attr])
6800           except ValueError, err:
6801             raise errors.OpExecError("Node '%s' returned invalid value"
6802                                      " for '%s': %s" % (nname, attr, err))
6803         # compute memory used by primary instances
6804         i_p_mem = i_p_up_mem = 0
6805         for iinfo, beinfo in i_list:
6806           if iinfo.primary_node == nname:
6807             i_p_mem += beinfo[constants.BE_MEMORY]
6808             if iinfo.name not in node_iinfo[nname].data:
6809               i_used_mem = 0
6810             else:
6811               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6812             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6813             remote_info['memory_free'] -= max(0, i_mem_diff)
6814
6815             if iinfo.admin_up:
6816               i_p_up_mem += beinfo[constants.BE_MEMORY]
6817
6818         # compute memory used by instances
6819         pnr_dyn = {
6820           "total_memory": remote_info['memory_total'],
6821           "reserved_memory": remote_info['memory_dom0'],
6822           "free_memory": remote_info['memory_free'],
6823           "total_disk": remote_info['vg_size'],
6824           "free_disk": remote_info['vg_free'],
6825           "total_cpus": remote_info['cpu_total'],
6826           "i_pri_memory": i_p_mem,
6827           "i_pri_up_memory": i_p_up_mem,
6828           }
6829         pnr.update(pnr_dyn)
6830
6831       node_results[nname] = pnr
6832     data["nodes"] = node_results
6833
6834     # instance data
6835     instance_data = {}
6836     for iinfo, beinfo in i_list:
6837       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6838                   for n in iinfo.nics]
6839       pir = {
6840         "tags": list(iinfo.GetTags()),
6841         "admin_up": iinfo.admin_up,
6842         "vcpus": beinfo[constants.BE_VCPUS],
6843         "memory": beinfo[constants.BE_MEMORY],
6844         "os": iinfo.os,
6845         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6846         "nics": nic_data,
6847         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6848         "disk_template": iinfo.disk_template,
6849         "hypervisor": iinfo.hypervisor,
6850         }
6851       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6852                                                  pir["disks"])
6853       instance_data[iinfo.name] = pir
6854
6855     data["instances"] = instance_data
6856
6857     self.in_data = data
6858
6859   def _AddNewInstance(self):
6860     """Add new instance data to allocator structure.
6861
6862     This in combination with _AllocatorGetClusterData will create the
6863     correct structure needed as input for the allocator.
6864
6865     The checks for the completeness of the opcode must have already been
6866     done.
6867
6868     """
6869     data = self.in_data
6870
6871     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6872
6873     if self.disk_template in constants.DTS_NET_MIRROR:
6874       self.required_nodes = 2
6875     else:
6876       self.required_nodes = 1
6877     request = {
6878       "type": "allocate",
6879       "name": self.name,
6880       "disk_template": self.disk_template,
6881       "tags": self.tags,
6882       "os": self.os,
6883       "vcpus": self.vcpus,
6884       "memory": self.mem_size,
6885       "disks": self.disks,
6886       "disk_space_total": disk_space,
6887       "nics": self.nics,
6888       "required_nodes": self.required_nodes,
6889       }
6890     data["request"] = request
6891
6892   def _AddRelocateInstance(self):
6893     """Add relocate instance data to allocator structure.
6894
6895     This in combination with _IAllocatorGetClusterData will create the
6896     correct structure needed as input for the allocator.
6897
6898     The checks for the completeness of the opcode must have already been
6899     done.
6900
6901     """
6902     instance = self.lu.cfg.GetInstanceInfo(self.name)
6903     if instance is None:
6904       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6905                                    " IAllocator" % self.name)
6906
6907     if instance.disk_template not in constants.DTS_NET_MIRROR:
6908       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6909
6910     if len(instance.secondary_nodes) != 1:
6911       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6912
6913     self.required_nodes = 1
6914     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6915     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6916
6917     request = {
6918       "type": "relocate",
6919       "name": self.name,
6920       "disk_space_total": disk_space,
6921       "required_nodes": self.required_nodes,
6922       "relocate_from": self.relocate_from,
6923       }
6924     self.in_data["request"] = request
6925
6926   def _BuildInputData(self):
6927     """Build input data structures.
6928
6929     """
6930     self._ComputeClusterData()
6931
6932     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6933       self._AddNewInstance()
6934     else:
6935       self._AddRelocateInstance()
6936
6937     self.in_text = serializer.Dump(self.in_data)
6938
6939   def Run(self, name, validate=True, call_fn=None):
6940     """Run an instance allocator and return the results.
6941
6942     """
6943     if call_fn is None:
6944       call_fn = self.lu.rpc.call_iallocator_runner
6945     data = self.in_text
6946
6947     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6948     result.Raise()
6949
6950     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6951       raise errors.OpExecError("Invalid result from master iallocator runner")
6952
6953     rcode, stdout, stderr, fail = result.data
6954
6955     if rcode == constants.IARUN_NOTFOUND:
6956       raise errors.OpExecError("Can't find allocator '%s'" % name)
6957     elif rcode == constants.IARUN_FAILURE:
6958       raise errors.OpExecError("Instance allocator call failed: %s,"
6959                                " output: %s" % (fail, stdout+stderr))
6960     self.out_text = stdout
6961     if validate:
6962       self._ValidateResult()
6963
6964   def _ValidateResult(self):
6965     """Process the allocator results.
6966
6967     This will process and if successful save the result in
6968     self.out_data and the other parameters.
6969
6970     """
6971     try:
6972       rdict = serializer.Load(self.out_text)
6973     except Exception, err:
6974       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6975
6976     if not isinstance(rdict, dict):
6977       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6978
6979     for key in "success", "info", "nodes":
6980       if key not in rdict:
6981         raise errors.OpExecError("Can't parse iallocator results:"
6982                                  " missing key '%s'" % key)
6983       setattr(self, key, rdict[key])
6984
6985     if not isinstance(rdict["nodes"], list):
6986       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6987                                " is not a list")
6988     self.out_data = rdict
6989
6990
6991 class LUTestAllocator(NoHooksLU):
6992   """Run allocator tests.
6993
6994   This LU runs the allocator tests
6995
6996   """
6997   _OP_REQP = ["direction", "mode", "name"]
6998
6999   def CheckPrereq(self):
7000     """Check prerequisites.
7001
7002     This checks the opcode parameters depending on the director and mode test.
7003
7004     """
7005     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7006       for attr in ["name", "mem_size", "disks", "disk_template",
7007                    "os", "tags", "nics", "vcpus"]:
7008         if not hasattr(self.op, attr):
7009           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7010                                      attr)
7011       iname = self.cfg.ExpandInstanceName(self.op.name)
7012       if iname is not None:
7013         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7014                                    iname)
7015       if not isinstance(self.op.nics, list):
7016         raise errors.OpPrereqError("Invalid parameter 'nics'")
7017       for row in self.op.nics:
7018         if (not isinstance(row, dict) or
7019             "mac" not in row or
7020             "ip" not in row or
7021             "bridge" not in row):
7022           raise errors.OpPrereqError("Invalid contents of the"
7023                                      " 'nics' parameter")
7024       if not isinstance(self.op.disks, list):
7025         raise errors.OpPrereqError("Invalid parameter 'disks'")
7026       for row in self.op.disks:
7027         if (not isinstance(row, dict) or
7028             "size" not in row or
7029             not isinstance(row["size"], int) or
7030             "mode" not in row or
7031             row["mode"] not in ['r', 'w']):
7032           raise errors.OpPrereqError("Invalid contents of the"
7033                                      " 'disks' parameter")
7034       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7035         self.op.hypervisor = self.cfg.GetHypervisorType()
7036     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7037       if not hasattr(self.op, "name"):
7038         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7039       fname = self.cfg.ExpandInstanceName(self.op.name)
7040       if fname is None:
7041         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7042                                    self.op.name)
7043       self.op.name = fname
7044       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7045     else:
7046       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7047                                  self.op.mode)
7048
7049     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7050       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7051         raise errors.OpPrereqError("Missing allocator name")
7052     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7053       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7054                                  self.op.direction)
7055
7056   def Exec(self, feedback_fn):
7057     """Run the allocator test.
7058
7059     """
7060     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7061       ial = IAllocator(self,
7062                        mode=self.op.mode,
7063                        name=self.op.name,
7064                        mem_size=self.op.mem_size,
7065                        disks=self.op.disks,
7066                        disk_template=self.op.disk_template,
7067                        os=self.op.os,
7068                        tags=self.op.tags,
7069                        nics=self.op.nics,
7070                        vcpus=self.op.vcpus,
7071                        hypervisor=self.op.hypervisor,
7072                        )
7073     else:
7074       ial = IAllocator(self,
7075                        mode=self.op.mode,
7076                        name=self.op.name,
7077                        relocate_from=list(self.relocate_from),
7078                        )
7079
7080     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7081       result = ial.in_text
7082     else:
7083       ial.Run(self.op.allocator, validate=False)
7084       result = ial.out_text
7085     return result