c2965f6539c303700523e47b9b8d819a2d21fb6d
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import re
30 import platform
31 import logging
32 import copy
33
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
43
44
45 class LogicalUnit(object):
46   """Logical Unit base class.
47
48   Subclasses must follow these rules:
49     - implement ExpandNames
50     - implement CheckPrereq
51     - implement Exec
52     - implement BuildHooksEnv
53     - redefine HPATH and HTYPE
54     - optionally redefine their run requirements:
55         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56
57   Note that all commands require root permissions.
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_BGL = True
64
65   def __init__(self, processor, op, context, rpc):
66     """Constructor for LogicalUnit.
67
68     This needs to be overridden in derived classes in order to check op
69     validity.
70
71     """
72     self.proc = processor
73     self.op = op
74     self.cfg = context.cfg
75     self.context = context
76     self.rpc = rpc
77     # Dicts used to declare locking needs to mcpu
78     self.needed_locks = None
79     self.acquired_locks = {}
80     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
81     self.add_locks = {}
82     self.remove_locks = {}
83     # Used to force good behavior when calling helper functions
84     self.recalculate_locks = {}
85     self.__ssh = None
86     # logging
87     self.LogWarning = processor.LogWarning
88     self.LogInfo = processor.LogInfo
89
90     for attr_name in self._OP_REQP:
91       attr_val = getattr(op, attr_name, None)
92       if attr_val is None:
93         raise errors.OpPrereqError("Required parameter '%s' missing" %
94                                    attr_name)
95     self.CheckArguments()
96
97   def __GetSSH(self):
98     """Returns the SshRunner object
99
100     """
101     if not self.__ssh:
102       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
103     return self.__ssh
104
105   ssh = property(fget=__GetSSH)
106
107   def CheckArguments(self):
108     """Check syntactic validity for the opcode arguments.
109
110     This method is for doing a simple syntactic check and ensure
111     validity of opcode parameters, without any cluster-related
112     checks. While the same can be accomplished in ExpandNames and/or
113     CheckPrereq, doing these separate is better because:
114
115       - ExpandNames is left as as purely a lock-related function
116       - CheckPrereq is run after we have acquired locks (and possible
117         waited for them)
118
119     The function is allowed to change the self.op attribute so that
120     later methods can no longer worry about missing parameters.
121
122     """
123     pass
124
125   def ExpandNames(self):
126     """Expand names for this LU.
127
128     This method is called before starting to execute the opcode, and it should
129     update all the parameters of the opcode to their canonical form (e.g. a
130     short node name must be fully expanded after this method has successfully
131     completed). This way locking, hooks, logging, ecc. can work correctly.
132
133     LUs which implement this method must also populate the self.needed_locks
134     member, as a dict with lock levels as keys, and a list of needed lock names
135     as values. Rules:
136
137       - use an empty dict if you don't need any lock
138       - if you don't need any lock at a particular level omit that level
139       - don't put anything for the BGL level
140       - if you want all locks at a level use locking.ALL_SET as a value
141
142     If you need to share locks (rather than acquire them exclusively) at one
143     level you can modify self.share_locks, setting a true value (usually 1) for
144     that level. By default locks are not shared.
145
146     Examples::
147
148       # Acquire all nodes and one instance
149       self.needed_locks = {
150         locking.LEVEL_NODE: locking.ALL_SET,
151         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
152       }
153       # Acquire just two nodes
154       self.needed_locks = {
155         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
156       }
157       # Acquire no locks
158       self.needed_locks = {} # No, you can't leave it to the default value None
159
160     """
161     # The implementation of this method is mandatory only if the new LU is
162     # concurrent, so that old LUs don't need to be changed all at the same
163     # time.
164     if self.REQ_BGL:
165       self.needed_locks = {} # Exclusive LUs don't need locks.
166     else:
167       raise NotImplementedError
168
169   def DeclareLocks(self, level):
170     """Declare LU locking needs for a level
171
172     While most LUs can just declare their locking needs at ExpandNames time,
173     sometimes there's the need to calculate some locks after having acquired
174     the ones before. This function is called just before acquiring locks at a
175     particular level, but after acquiring the ones at lower levels, and permits
176     such calculations. It can be used to modify self.needed_locks, and by
177     default it does nothing.
178
179     This function is only called if you have something already set in
180     self.needed_locks for the level.
181
182     @param level: Locking level which is going to be locked
183     @type level: member of ganeti.locking.LEVELS
184
185     """
186
187   def CheckPrereq(self):
188     """Check prerequisites for this LU.
189
190     This method should check that the prerequisites for the execution
191     of this LU are fulfilled. It can do internode communication, but
192     it should be idempotent - no cluster or system changes are
193     allowed.
194
195     The method should raise errors.OpPrereqError in case something is
196     not fulfilled. Its return value is ignored.
197
198     This method should also update all the parameters of the opcode to
199     their canonical form if it hasn't been done by ExpandNames before.
200
201     """
202     raise NotImplementedError
203
204   def Exec(self, feedback_fn):
205     """Execute the LU.
206
207     This method should implement the actual work. It should raise
208     errors.OpExecError for failures that are somewhat dealt with in
209     code, or expected.
210
211     """
212     raise NotImplementedError
213
214   def BuildHooksEnv(self):
215     """Build hooks environment for this LU.
216
217     This method should return a three-node tuple consisting of: a dict
218     containing the environment that will be used for running the
219     specific hook for this LU, a list of node names on which the hook
220     should run before the execution, and a list of node names on which
221     the hook should run after the execution.
222
223     The keys of the dict must not have 'GANETI_' prefixed as this will
224     be handled in the hooks runner. Also note additional keys will be
225     added by the hooks runner. If the LU doesn't define any
226     environment, an empty dict (and not None) should be returned.
227
228     No nodes should be returned as an empty list (and not None).
229
230     Note that if the HPATH for a LU class is None, this function will
231     not be called.
232
233     """
234     raise NotImplementedError
235
236   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
237     """Notify the LU about the results of its hooks.
238
239     This method is called every time a hooks phase is executed, and notifies
240     the Logical Unit about the hooks' result. The LU can then use it to alter
241     its result based on the hooks.  By default the method does nothing and the
242     previous result is passed back unchanged but any LU can define it if it
243     wants to use the local cluster hook-scripts somehow.
244
245     @param phase: one of L{constants.HOOKS_PHASE_POST} or
246         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
247     @param hook_results: the results of the multi-node hooks rpc call
248     @param feedback_fn: function used send feedback back to the caller
249     @param lu_result: the previous Exec result this LU had, or None
250         in the PRE phase
251     @return: the new Exec result, based on the previous result
252         and hook results
253
254     """
255     return lu_result
256
257   def _ExpandAndLockInstance(self):
258     """Helper function to expand and lock an instance.
259
260     Many LUs that work on an instance take its name in self.op.instance_name
261     and need to expand it and then declare the expanded name for locking. This
262     function does it, and then updates self.op.instance_name to the expanded
263     name. It also initializes needed_locks as a dict, if this hasn't been done
264     before.
265
266     """
267     if self.needed_locks is None:
268       self.needed_locks = {}
269     else:
270       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
271         "_ExpandAndLockInstance called with instance-level locks set"
272     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
273     if expanded_name is None:
274       raise errors.OpPrereqError("Instance '%s' not known" %
275                                   self.op.instance_name)
276     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
277     self.op.instance_name = expanded_name
278
279   def _LockInstancesNodes(self, primary_only=False):
280     """Helper function to declare instances' nodes for locking.
281
282     This function should be called after locking one or more instances to lock
283     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
284     with all primary or secondary nodes for instances already locked and
285     present in self.needed_locks[locking.LEVEL_INSTANCE].
286
287     It should be called from DeclareLocks, and for safety only works if
288     self.recalculate_locks[locking.LEVEL_NODE] is set.
289
290     In the future it may grow parameters to just lock some instance's nodes, or
291     to just lock primaries or secondary nodes, if needed.
292
293     If should be called in DeclareLocks in a way similar to::
294
295       if level == locking.LEVEL_NODE:
296         self._LockInstancesNodes()
297
298     @type primary_only: boolean
299     @param primary_only: only lock primary nodes of locked instances
300
301     """
302     assert locking.LEVEL_NODE in self.recalculate_locks, \
303       "_LockInstancesNodes helper function called with no nodes to recalculate"
304
305     # TODO: check if we're really been called with the instance locks held
306
307     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
308     # future we might want to have different behaviors depending on the value
309     # of self.recalculate_locks[locking.LEVEL_NODE]
310     wanted_nodes = []
311     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
312       instance = self.context.cfg.GetInstanceInfo(instance_name)
313       wanted_nodes.append(instance.primary_node)
314       if not primary_only:
315         wanted_nodes.extend(instance.secondary_nodes)
316
317     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
318       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
319     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
320       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
321
322     del self.recalculate_locks[locking.LEVEL_NODE]
323
324
325 class NoHooksLU(LogicalUnit):
326   """Simple LU which runs no hooks.
327
328   This LU is intended as a parent for other LogicalUnits which will
329   run no hooks, in order to reduce duplicate code.
330
331   """
332   HPATH = None
333   HTYPE = None
334
335
336 def _GetWantedNodes(lu, nodes):
337   """Returns list of checked and expanded node names.
338
339   @type lu: L{LogicalUnit}
340   @param lu: the logical unit on whose behalf we execute
341   @type nodes: list
342   @param nodes: list of node names or None for all nodes
343   @rtype: list
344   @return: the list of nodes, sorted
345   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
346
347   """
348   if not isinstance(nodes, list):
349     raise errors.OpPrereqError("Invalid argument type 'nodes'")
350
351   if not nodes:
352     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
353       " non-empty list of nodes whose name is to be expanded.")
354
355   wanted = []
356   for name in nodes:
357     node = lu.cfg.ExpandNodeName(name)
358     if node is None:
359       raise errors.OpPrereqError("No such node name '%s'" % name)
360     wanted.append(node)
361
362   return utils.NiceSort(wanted)
363
364
365 def _GetWantedInstances(lu, instances):
366   """Returns list of checked and expanded instance names.
367
368   @type lu: L{LogicalUnit}
369   @param lu: the logical unit on whose behalf we execute
370   @type instances: list
371   @param instances: list of instance names or None for all instances
372   @rtype: list
373   @return: the list of instances, sorted
374   @raise errors.OpPrereqError: if the instances parameter is wrong type
375   @raise errors.OpPrereqError: if any of the passed instances is not found
376
377   """
378   if not isinstance(instances, list):
379     raise errors.OpPrereqError("Invalid argument type 'instances'")
380
381   if instances:
382     wanted = []
383
384     for name in instances:
385       instance = lu.cfg.ExpandInstanceName(name)
386       if instance is None:
387         raise errors.OpPrereqError("No such instance name '%s'" % name)
388       wanted.append(instance)
389
390   else:
391     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
392   return wanted
393
394
395 def _CheckOutputFields(static, dynamic, selected):
396   """Checks whether all selected fields are valid.
397
398   @type static: L{utils.FieldSet}
399   @param static: static fields set
400   @type dynamic: L{utils.FieldSet}
401   @param dynamic: dynamic fields set
402
403   """
404   f = utils.FieldSet()
405   f.Extend(static)
406   f.Extend(dynamic)
407
408   delta = f.NonMatching(selected)
409   if delta:
410     raise errors.OpPrereqError("Unknown output fields selected: %s"
411                                % ",".join(delta))
412
413
414 def _CheckBooleanOpField(op, name):
415   """Validates boolean opcode parameters.
416
417   This will ensure that an opcode parameter is either a boolean value,
418   or None (but that it always exists).
419
420   """
421   val = getattr(op, name, None)
422   if not (val is None or isinstance(val, bool)):
423     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
424                                (name, str(val)))
425   setattr(op, name, val)
426
427
428 def _CheckNodeOnline(lu, node):
429   """Ensure that a given node is online.
430
431   @param lu: the LU on behalf of which we make the check
432   @param node: the node to check
433   @raise errors.OpPrereqError: if the node is offline
434
435   """
436   if lu.cfg.GetNodeInfo(node).offline:
437     raise errors.OpPrereqError("Can't use offline node %s" % node)
438
439
440 def _CheckNodeNotDrained(lu, node):
441   """Ensure that a given node is not drained.
442
443   @param lu: the LU on behalf of which we make the check
444   @param node: the node to check
445   @raise errors.OpPrereqError: if the node is drained
446
447   """
448   if lu.cfg.GetNodeInfo(node).drained:
449     raise errors.OpPrereqError("Can't use drained node %s" % node)
450
451
452 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
453                           memory, vcpus, nics, disk_template, disks,
454                           bep, hvp, hypervisor_name):
455   """Builds instance related env variables for hooks
456
457   This builds the hook environment from individual variables.
458
459   @type name: string
460   @param name: the name of the instance
461   @type primary_node: string
462   @param primary_node: the name of the instance's primary node
463   @type secondary_nodes: list
464   @param secondary_nodes: list of secondary nodes as strings
465   @type os_type: string
466   @param os_type: the name of the instance's OS
467   @type status: boolean
468   @param status: the should_run status of the instance
469   @type memory: string
470   @param memory: the memory size of the instance
471   @type vcpus: string
472   @param vcpus: the count of VCPUs the instance has
473   @type nics: list
474   @param nics: list of tuples (ip, bridge, mac) representing
475       the NICs the instance  has
476   @type disk_template: string
477   @param disk_template: the disk template of the instance
478   @type disks: list
479   @param disks: the list of (size, mode) pairs
480   @type bep: dict
481   @param bep: the backend parameters for the instance
482   @type hvp: dict
483   @param hvp: the hypervisor parameters for the instance
484   @type hypervisor_name: string
485   @param hypervisor_name: the hypervisor for the instance
486   @rtype: dict
487   @return: the hook environment for this instance
488
489   """
490   if status:
491     str_status = "up"
492   else:
493     str_status = "down"
494   env = {
495     "OP_TARGET": name,
496     "INSTANCE_NAME": name,
497     "INSTANCE_PRIMARY": primary_node,
498     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
499     "INSTANCE_OS_TYPE": os_type,
500     "INSTANCE_STATUS": str_status,
501     "INSTANCE_MEMORY": memory,
502     "INSTANCE_VCPUS": vcpus,
503     "INSTANCE_DISK_TEMPLATE": disk_template,
504     "INSTANCE_HYPERVISOR": hypervisor_name,
505   }
506
507   if nics:
508     nic_count = len(nics)
509     for idx, (ip, bridge, mac) in enumerate(nics):
510       if ip is None:
511         ip = ""
512       env["INSTANCE_NIC%d_IP" % idx] = ip
513       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
514       env["INSTANCE_NIC%d_MAC" % idx] = mac
515   else:
516     nic_count = 0
517
518   env["INSTANCE_NIC_COUNT"] = nic_count
519
520   if disks:
521     disk_count = len(disks)
522     for idx, (size, mode) in enumerate(disks):
523       env["INSTANCE_DISK%d_SIZE" % idx] = size
524       env["INSTANCE_DISK%d_MODE" % idx] = mode
525   else:
526     disk_count = 0
527
528   env["INSTANCE_DISK_COUNT"] = disk_count
529
530   for source, kind in [(bep, "BE"), (hvp, "HV")]:
531     for key, value in source.items():
532       env["INSTANCE_%s_%s" % (kind, key)] = value
533
534   return env
535
536
537 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
538   """Builds instance related env variables for hooks from an object.
539
540   @type lu: L{LogicalUnit}
541   @param lu: the logical unit on whose behalf we execute
542   @type instance: L{objects.Instance}
543   @param instance: the instance for which we should build the
544       environment
545   @type override: dict
546   @param override: dictionary with key/values that will override
547       our values
548   @rtype: dict
549   @return: the hook environment dictionary
550
551   """
552   cluster = lu.cfg.GetClusterInfo()
553   bep = cluster.FillBE(instance)
554   hvp = cluster.FillHV(instance)
555   args = {
556     'name': instance.name,
557     'primary_node': instance.primary_node,
558     'secondary_nodes': instance.secondary_nodes,
559     'os_type': instance.os,
560     'status': instance.admin_up,
561     'memory': bep[constants.BE_MEMORY],
562     'vcpus': bep[constants.BE_VCPUS],
563     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
564     'disk_template': instance.disk_template,
565     'disks': [(disk.size, disk.mode) for disk in instance.disks],
566     'bep': bep,
567     'hvp': hvp,
568     'hypervisor': instance.hypervisor,
569   }
570   if override:
571     args.update(override)
572   return _BuildInstanceHookEnv(**args)
573
574
575 def _AdjustCandidatePool(lu):
576   """Adjust the candidate pool after node operations.
577
578   """
579   mod_list = lu.cfg.MaintainCandidatePool()
580   if mod_list:
581     lu.LogInfo("Promoted nodes to master candidate role: %s",
582                ", ".join(node.name for node in mod_list))
583     for name in mod_list:
584       lu.context.ReaddNode(name)
585   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
586   if mc_now > mc_max:
587     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
588                (mc_now, mc_max))
589
590
591 def _CheckInstanceBridgesExist(lu, instance):
592   """Check that the bridges needed by an instance exist.
593
594   """
595   # check bridges existence
596   brlist = [nic.bridge for nic in instance.nics]
597   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
598   result.Raise()
599   if not result.data:
600     raise errors.OpPrereqError("One or more target bridges %s does not"
601                                " exist on destination node '%s'" %
602                                (brlist, instance.primary_node))
603
604
605 class LUDestroyCluster(NoHooksLU):
606   """Logical unit for destroying the cluster.
607
608   """
609   _OP_REQP = []
610
611   def CheckPrereq(self):
612     """Check prerequisites.
613
614     This checks whether the cluster is empty.
615
616     Any errors are signaled by raising errors.OpPrereqError.
617
618     """
619     master = self.cfg.GetMasterNode()
620
621     nodelist = self.cfg.GetNodeList()
622     if len(nodelist) != 1 or nodelist[0] != master:
623       raise errors.OpPrereqError("There are still %d node(s) in"
624                                  " this cluster." % (len(nodelist) - 1))
625     instancelist = self.cfg.GetInstanceList()
626     if instancelist:
627       raise errors.OpPrereqError("There are still %d instance(s) in"
628                                  " this cluster." % len(instancelist))
629
630   def Exec(self, feedback_fn):
631     """Destroys the cluster.
632
633     """
634     master = self.cfg.GetMasterNode()
635     result = self.rpc.call_node_stop_master(master, False)
636     result.Raise()
637     if not result.data:
638       raise errors.OpExecError("Could not disable the master role")
639     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
640     utils.CreateBackup(priv_key)
641     utils.CreateBackup(pub_key)
642     return master
643
644
645 class LUVerifyCluster(LogicalUnit):
646   """Verifies the cluster status.
647
648   """
649   HPATH = "cluster-verify"
650   HTYPE = constants.HTYPE_CLUSTER
651   _OP_REQP = ["skip_checks"]
652   REQ_BGL = False
653
654   def ExpandNames(self):
655     self.needed_locks = {
656       locking.LEVEL_NODE: locking.ALL_SET,
657       locking.LEVEL_INSTANCE: locking.ALL_SET,
658     }
659     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
660
661   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
662                   node_result, feedback_fn, master_files,
663                   drbd_map, vg_name):
664     """Run multiple tests against a node.
665
666     Test list:
667
668       - compares ganeti version
669       - checks vg existence and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     @type nodeinfo: L{objects.Node}
674     @param nodeinfo: the node to check
675     @param file_list: required list of files
676     @param local_cksum: dictionary of local files and their checksums
677     @param node_result: the results from the node
678     @param feedback_fn: function used to accumulate results
679     @param master_files: list of files that only masters should have
680     @param drbd_map: the useddrbd minors for this node, in
681         form of minor: (instance, must_exist) which correspond to instances
682         and their running status
683     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
684
685     """
686     node = nodeinfo.name
687
688     # main result, node_result should be a non-empty dict
689     if not node_result or not isinstance(node_result, dict):
690       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
691       return True
692
693     # compares ganeti version
694     local_version = constants.PROTOCOL_VERSION
695     remote_version = node_result.get('version', None)
696     if not (remote_version and isinstance(remote_version, (list, tuple)) and
697             len(remote_version) == 2):
698       feedback_fn("  - ERROR: connection to %s failed" % (node))
699       return True
700
701     if local_version != remote_version[0]:
702       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
703                   " node %s %s" % (local_version, node, remote_version[0]))
704       return True
705
706     # node seems compatible, we can actually try to look into its results
707
708     bad = False
709
710     # full package version
711     if constants.RELEASE_VERSION != remote_version[1]:
712       feedback_fn("  - WARNING: software version mismatch: master %s,"
713                   " node %s %s" %
714                   (constants.RELEASE_VERSION, node, remote_version[1]))
715
716     # checks vg existence and size > 20G
717     if vg_name is not None:
718       vglist = node_result.get(constants.NV_VGLIST, None)
719       if not vglist:
720         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
721                         (node,))
722         bad = True
723       else:
724         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
725                                               constants.MIN_VG_SIZE)
726         if vgstatus:
727           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
728           bad = True
729
730     # checks config file checksum
731
732     remote_cksum = node_result.get(constants.NV_FILELIST, None)
733     if not isinstance(remote_cksum, dict):
734       bad = True
735       feedback_fn("  - ERROR: node hasn't returned file checksum data")
736     else:
737       for file_name in file_list:
738         node_is_mc = nodeinfo.master_candidate
739         must_have_file = file_name not in master_files
740         if file_name not in remote_cksum:
741           if node_is_mc or must_have_file:
742             bad = True
743             feedback_fn("  - ERROR: file '%s' missing" % file_name)
744         elif remote_cksum[file_name] != local_cksum[file_name]:
745           if node_is_mc or must_have_file:
746             bad = True
747             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
748           else:
749             # not candidate and this is not a must-have file
750             bad = True
751             feedback_fn("  - ERROR: file '%s' should not exist on non master"
752                         " candidates (and the file is outdated)" % file_name)
753         else:
754           # all good, except non-master/non-must have combination
755           if not node_is_mc and not must_have_file:
756             feedback_fn("  - ERROR: file '%s' should not exist on non master"
757                         " candidates" % file_name)
758
759     # checks ssh to any
760
761     if constants.NV_NODELIST not in node_result:
762       bad = True
763       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
764     else:
765       if node_result[constants.NV_NODELIST]:
766         bad = True
767         for node in node_result[constants.NV_NODELIST]:
768           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
769                           (node, node_result[constants.NV_NODELIST][node]))
770
771     if constants.NV_NODENETTEST not in node_result:
772       bad = True
773       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
774     else:
775       if node_result[constants.NV_NODENETTEST]:
776         bad = True
777         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
778         for node in nlist:
779           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
780                           (node, node_result[constants.NV_NODENETTEST][node]))
781
782     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
783     if isinstance(hyp_result, dict):
784       for hv_name, hv_result in hyp_result.iteritems():
785         if hv_result is not None:
786           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
787                       (hv_name, hv_result))
788
789     # check used drbd list
790     if vg_name is not None:
791       used_minors = node_result.get(constants.NV_DRBDLIST, [])
792       if not isinstance(used_minors, (tuple, list)):
793         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
794                     str(used_minors))
795       else:
796         for minor, (iname, must_exist) in drbd_map.items():
797           if minor not in used_minors and must_exist:
798             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
799                         " not active" % (minor, iname))
800             bad = True
801         for minor in used_minors:
802           if minor not in drbd_map:
803             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
804                         minor)
805             bad = True
806
807     return bad
808
809   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
810                       node_instance, feedback_fn, n_offline):
811     """Verify an instance.
812
813     This function checks to see if the required block devices are
814     available on the instance's node.
815
816     """
817     bad = False
818
819     node_current = instanceconfig.primary_node
820
821     node_vol_should = {}
822     instanceconfig.MapLVsByNode(node_vol_should)
823
824     for node in node_vol_should:
825       if node in n_offline:
826         # ignore missing volumes on offline nodes
827         continue
828       for volume in node_vol_should[node]:
829         if node not in node_vol_is or volume not in node_vol_is[node]:
830           feedback_fn("  - ERROR: volume %s missing on node %s" %
831                           (volume, node))
832           bad = True
833
834     if instanceconfig.admin_up:
835       if ((node_current not in node_instance or
836           not instance in node_instance[node_current]) and
837           node_current not in n_offline):
838         feedback_fn("  - ERROR: instance %s not running on node %s" %
839                         (instance, node_current))
840         bad = True
841
842     for node in node_instance:
843       if (not node == node_current):
844         if instance in node_instance[node]:
845           feedback_fn("  - ERROR: instance %s should not run on node %s" %
846                           (instance, node))
847           bad = True
848
849     return bad
850
851   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
852     """Verify if there are any unknown volumes in the cluster.
853
854     The .os, .swap and backup volumes are ignored. All other volumes are
855     reported as unknown.
856
857     """
858     bad = False
859
860     for node in node_vol_is:
861       for volume in node_vol_is[node]:
862         if node not in node_vol_should or volume not in node_vol_should[node]:
863           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
864                       (volume, node))
865           bad = True
866     return bad
867
868   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
869     """Verify the list of running instances.
870
871     This checks what instances are running but unknown to the cluster.
872
873     """
874     bad = False
875     for node in node_instance:
876       for runninginstance in node_instance[node]:
877         if runninginstance not in instancelist:
878           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
879                           (runninginstance, node))
880           bad = True
881     return bad
882
883   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
884     """Verify N+1 Memory Resilience.
885
886     Check that if one single node dies we can still start all the instances it
887     was primary for.
888
889     """
890     bad = False
891
892     for node, nodeinfo in node_info.iteritems():
893       # This code checks that every node which is now listed as secondary has
894       # enough memory to host all instances it is supposed to should a single
895       # other node in the cluster fail.
896       # FIXME: not ready for failover to an arbitrary node
897       # FIXME: does not support file-backed instances
898       # WARNING: we currently take into account down instances as well as up
899       # ones, considering that even if they're down someone might want to start
900       # them even in the event of a node failure.
901       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
902         needed_mem = 0
903         for instance in instances:
904           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
905           if bep[constants.BE_AUTO_BALANCE]:
906             needed_mem += bep[constants.BE_MEMORY]
907         if nodeinfo['mfree'] < needed_mem:
908           feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
909                       " failovers should node %s fail" % (node, prinode))
910           bad = True
911     return bad
912
913   def CheckPrereq(self):
914     """Check prerequisites.
915
916     Transform the list of checks we're going to skip into a set and check that
917     all its members are valid.
918
919     """
920     self.skip_set = frozenset(self.op.skip_checks)
921     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
922       raise errors.OpPrereqError("Invalid checks to be skipped specified")
923
924   def BuildHooksEnv(self):
925     """Build hooks env.
926
927     Cluster-Verify hooks just ran in the post phase and their failure makes
928     the output be logged in the verify output and the verification to fail.
929
930     """
931     all_nodes = self.cfg.GetNodeList()
932     env = {
933       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
934       }
935     for node in self.cfg.GetAllNodesInfo().values():
936       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
937
938     return env, [], all_nodes
939
940   def Exec(self, feedback_fn):
941     """Verify integrity of cluster, performing various test on nodes.
942
943     """
944     bad = False
945     feedback_fn("* Verifying global settings")
946     for msg in self.cfg.VerifyConfig():
947       feedback_fn("  - ERROR: %s" % msg)
948
949     vg_name = self.cfg.GetVGName()
950     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
951     nodelist = utils.NiceSort(self.cfg.GetNodeList())
952     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
953     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
954     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
955                         for iname in instancelist)
956     i_non_redundant = [] # Non redundant instances
957     i_non_a_balanced = [] # Non auto-balanced instances
958     n_offline = [] # List of offline nodes
959     n_drained = [] # List of nodes being drained
960     node_volume = {}
961     node_instance = {}
962     node_info = {}
963     instance_cfg = {}
964
965     # FIXME: verify OS list
966     # do local checksums
967     master_files = [constants.CLUSTER_CONF_FILE]
968
969     file_names = ssconf.SimpleStore().GetFileList()
970     file_names.append(constants.SSL_CERT_FILE)
971     file_names.append(constants.RAPI_CERT_FILE)
972     file_names.extend(master_files)
973
974     local_checksums = utils.FingerprintFiles(file_names)
975
976     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
977     node_verify_param = {
978       constants.NV_FILELIST: file_names,
979       constants.NV_NODELIST: [node.name for node in nodeinfo
980                               if not node.offline],
981       constants.NV_HYPERVISOR: hypervisors,
982       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
983                                   node.secondary_ip) for node in nodeinfo
984                                  if not node.offline],
985       constants.NV_INSTANCELIST: hypervisors,
986       constants.NV_VERSION: None,
987       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
988       }
989     if vg_name is not None:
990       node_verify_param[constants.NV_VGLIST] = None
991       node_verify_param[constants.NV_LVLIST] = vg_name
992       node_verify_param[constants.NV_DRBDLIST] = None
993     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
994                                            self.cfg.GetClusterName())
995
996     cluster = self.cfg.GetClusterInfo()
997     master_node = self.cfg.GetMasterNode()
998     all_drbd_map = self.cfg.ComputeDRBDMap()
999
1000     for node_i in nodeinfo:
1001       node = node_i.name
1002       nresult = all_nvinfo[node].data
1003
1004       if node_i.offline:
1005         feedback_fn("* Skipping offline node %s" % (node,))
1006         n_offline.append(node)
1007         continue
1008
1009       if node == master_node:
1010         ntype = "master"
1011       elif node_i.master_candidate:
1012         ntype = "master candidate"
1013       elif node_i.drained:
1014         ntype = "drained"
1015         n_drained.append(node)
1016       else:
1017         ntype = "regular"
1018       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1019
1020       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1021         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1022         bad = True
1023         continue
1024
1025       node_drbd = {}
1026       for minor, instance in all_drbd_map[node].items():
1027         if instance not in instanceinfo:
1028           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1029                       instance)
1030           # ghost instance should not be running, but otherwise we
1031           # don't give double warnings (both ghost instance and
1032           # unallocated minor in use)
1033           node_drbd[minor] = (instance, False)
1034         else:
1035           instance = instanceinfo[instance]
1036           node_drbd[minor] = (instance.name, instance.admin_up)
1037       result = self._VerifyNode(node_i, file_names, local_checksums,
1038                                 nresult, feedback_fn, master_files,
1039                                 node_drbd, vg_name)
1040       bad = bad or result
1041
1042       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1043       if vg_name is None:
1044         node_volume[node] = {}
1045       elif isinstance(lvdata, basestring):
1046         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1047                     (node, utils.SafeEncode(lvdata)))
1048         bad = True
1049         node_volume[node] = {}
1050       elif not isinstance(lvdata, dict):
1051         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1052         bad = True
1053         continue
1054       else:
1055         node_volume[node] = lvdata
1056
1057       # node_instance
1058       idata = nresult.get(constants.NV_INSTANCELIST, None)
1059       if not isinstance(idata, list):
1060         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1061                     (node,))
1062         bad = True
1063         continue
1064
1065       node_instance[node] = idata
1066
1067       # node_info
1068       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1069       if not isinstance(nodeinfo, dict):
1070         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1071         bad = True
1072         continue
1073
1074       try:
1075         node_info[node] = {
1076           "mfree": int(nodeinfo['memory_free']),
1077           "pinst": [],
1078           "sinst": [],
1079           # dictionary holding all instances this node is secondary for,
1080           # grouped by their primary node. Each key is a cluster node, and each
1081           # value is a list of instances which have the key as primary and the
1082           # current node as secondary.  this is handy to calculate N+1 memory
1083           # availability if you can only failover from a primary to its
1084           # secondary.
1085           "sinst-by-pnode": {},
1086         }
1087         # FIXME: devise a free space model for file based instances as well
1088         if vg_name is not None:
1089           if (constants.NV_VGLIST not in nresult or
1090               vg_name not in nresult[constants.NV_VGLIST]):
1091             feedback_fn("  - ERROR: node %s didn't return data for the"
1092                         " volume group '%s' - it is either missing or broken" %
1093                         (node, vg_name))
1094             bad = True
1095             continue
1096           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1097       except (ValueError, KeyError):
1098         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1099                     " from node %s" % (node,))
1100         bad = True
1101         continue
1102
1103     node_vol_should = {}
1104
1105     for instance in instancelist:
1106       feedback_fn("* Verifying instance %s" % instance)
1107       inst_config = instanceinfo[instance]
1108       result =  self._VerifyInstance(instance, inst_config, node_volume,
1109                                      node_instance, feedback_fn, n_offline)
1110       bad = bad or result
1111       inst_nodes_offline = []
1112
1113       inst_config.MapLVsByNode(node_vol_should)
1114
1115       instance_cfg[instance] = inst_config
1116
1117       pnode = inst_config.primary_node
1118       if pnode in node_info:
1119         node_info[pnode]['pinst'].append(instance)
1120       elif pnode not in n_offline:
1121         feedback_fn("  - ERROR: instance %s, connection to primary node"
1122                     " %s failed" % (instance, pnode))
1123         bad = True
1124
1125       if pnode in n_offline:
1126         inst_nodes_offline.append(pnode)
1127
1128       # If the instance is non-redundant we cannot survive losing its primary
1129       # node, so we are not N+1 compliant. On the other hand we have no disk
1130       # templates with more than one secondary so that situation is not well
1131       # supported either.
1132       # FIXME: does not support file-backed instances
1133       if len(inst_config.secondary_nodes) == 0:
1134         i_non_redundant.append(instance)
1135       elif len(inst_config.secondary_nodes) > 1:
1136         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1137                     % instance)
1138
1139       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1140         i_non_a_balanced.append(instance)
1141
1142       for snode in inst_config.secondary_nodes:
1143         if snode in node_info:
1144           node_info[snode]['sinst'].append(instance)
1145           if pnode not in node_info[snode]['sinst-by-pnode']:
1146             node_info[snode]['sinst-by-pnode'][pnode] = []
1147           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1148         elif snode not in n_offline:
1149           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1150                       " %s failed" % (instance, snode))
1151           bad = True
1152         if snode in n_offline:
1153           inst_nodes_offline.append(snode)
1154
1155       if inst_nodes_offline:
1156         # warn that the instance lives on offline nodes, and set bad=True
1157         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1158                     ", ".join(inst_nodes_offline))
1159         bad = True
1160
1161     feedback_fn("* Verifying orphan volumes")
1162     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1163                                        feedback_fn)
1164     bad = bad or result
1165
1166     feedback_fn("* Verifying remaining instances")
1167     result = self._VerifyOrphanInstances(instancelist, node_instance,
1168                                          feedback_fn)
1169     bad = bad or result
1170
1171     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1172       feedback_fn("* Verifying N+1 Memory redundancy")
1173       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1174       bad = bad or result
1175
1176     feedback_fn("* Other Notes")
1177     if i_non_redundant:
1178       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1179                   % len(i_non_redundant))
1180
1181     if i_non_a_balanced:
1182       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1183                   % len(i_non_a_balanced))
1184
1185     if n_offline:
1186       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1187
1188     if n_drained:
1189       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1190
1191     return not bad
1192
1193   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1194     """Analyze the post-hooks' result
1195
1196     This method analyses the hook result, handles it, and sends some
1197     nicely-formatted feedback back to the user.
1198
1199     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1200         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1201     @param hooks_results: the results of the multi-node hooks rpc call
1202     @param feedback_fn: function used send feedback back to the caller
1203     @param lu_result: previous Exec result
1204     @return: the new Exec result, based on the previous result
1205         and hook results
1206
1207     """
1208     # We only really run POST phase hooks, and are only interested in
1209     # their results
1210     if phase == constants.HOOKS_PHASE_POST:
1211       # Used to change hooks' output to proper indentation
1212       indent_re = re.compile('^', re.M)
1213       feedback_fn("* Hooks Results")
1214       if not hooks_results:
1215         feedback_fn("  - ERROR: general communication failure")
1216         lu_result = 1
1217       else:
1218         for node_name in hooks_results:
1219           show_node_header = True
1220           res = hooks_results[node_name]
1221           if res.failed or res.data is False or not isinstance(res.data, list):
1222             if res.offline:
1223               # no need to warn or set fail return value
1224               continue
1225             feedback_fn("    Communication failure in hooks execution")
1226             lu_result = 1
1227             continue
1228           for script, hkr, output in res.data:
1229             if hkr == constants.HKR_FAIL:
1230               # The node header is only shown once, if there are
1231               # failing hooks on that node
1232               if show_node_header:
1233                 feedback_fn("  Node %s:" % node_name)
1234                 show_node_header = False
1235               feedback_fn("    ERROR: Script %s failed, output:" % script)
1236               output = indent_re.sub('      ', output)
1237               feedback_fn("%s" % output)
1238               lu_result = 1
1239
1240       return lu_result
1241
1242
1243 class LUVerifyDisks(NoHooksLU):
1244   """Verifies the cluster disks status.
1245
1246   """
1247   _OP_REQP = []
1248   REQ_BGL = False
1249
1250   def ExpandNames(self):
1251     self.needed_locks = {
1252       locking.LEVEL_NODE: locking.ALL_SET,
1253       locking.LEVEL_INSTANCE: locking.ALL_SET,
1254     }
1255     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1256
1257   def CheckPrereq(self):
1258     """Check prerequisites.
1259
1260     This has no prerequisites.
1261
1262     """
1263     pass
1264
1265   def Exec(self, feedback_fn):
1266     """Verify integrity of cluster disks.
1267
1268     """
1269     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1270
1271     vg_name = self.cfg.GetVGName()
1272     nodes = utils.NiceSort(self.cfg.GetNodeList())
1273     instances = [self.cfg.GetInstanceInfo(name)
1274                  for name in self.cfg.GetInstanceList()]
1275
1276     nv_dict = {}
1277     for inst in instances:
1278       inst_lvs = {}
1279       if (not inst.admin_up or
1280           inst.disk_template not in constants.DTS_NET_MIRROR):
1281         continue
1282       inst.MapLVsByNode(inst_lvs)
1283       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1284       for node, vol_list in inst_lvs.iteritems():
1285         for vol in vol_list:
1286           nv_dict[(node, vol)] = inst
1287
1288     if not nv_dict:
1289       return result
1290
1291     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1292
1293     for node in nodes:
1294       # node_volume
1295       lvs = node_lvs[node]
1296       if lvs.failed:
1297         if not lvs.offline:
1298           self.LogWarning("Connection to node %s failed: %s" %
1299                           (node, lvs.data))
1300         continue
1301       lvs = lvs.data
1302       if isinstance(lvs, basestring):
1303         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1304         res_nlvm[node] = lvs
1305         continue
1306       elif not isinstance(lvs, dict):
1307         logging.warning("Connection to node %s failed or invalid data"
1308                         " returned", node)
1309         res_nodes.append(node)
1310         continue
1311
1312       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1313         inst = nv_dict.pop((node, lv_name), None)
1314         if (not lv_online and inst is not None
1315             and inst.name not in res_instances):
1316           res_instances.append(inst.name)
1317
1318     # any leftover items in nv_dict are missing LVs, let's arrange the
1319     # data better
1320     for key, inst in nv_dict.iteritems():
1321       if inst.name not in res_missing:
1322         res_missing[inst.name] = []
1323       res_missing[inst.name].append(key)
1324
1325     return result
1326
1327
1328 class LURenameCluster(LogicalUnit):
1329   """Rename the cluster.
1330
1331   """
1332   HPATH = "cluster-rename"
1333   HTYPE = constants.HTYPE_CLUSTER
1334   _OP_REQP = ["name"]
1335
1336   def BuildHooksEnv(self):
1337     """Build hooks env.
1338
1339     """
1340     env = {
1341       "OP_TARGET": self.cfg.GetClusterName(),
1342       "NEW_NAME": self.op.name,
1343       }
1344     mn = self.cfg.GetMasterNode()
1345     return env, [mn], [mn]
1346
1347   def CheckPrereq(self):
1348     """Verify that the passed name is a valid one.
1349
1350     """
1351     hostname = utils.HostInfo(self.op.name)
1352
1353     new_name = hostname.name
1354     self.ip = new_ip = hostname.ip
1355     old_name = self.cfg.GetClusterName()
1356     old_ip = self.cfg.GetMasterIP()
1357     if new_name == old_name and new_ip == old_ip:
1358       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1359                                  " cluster has changed")
1360     if new_ip != old_ip:
1361       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1362         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1363                                    " reachable on the network. Aborting." %
1364                                    new_ip)
1365
1366     self.op.name = new_name
1367
1368   def Exec(self, feedback_fn):
1369     """Rename the cluster.
1370
1371     """
1372     clustername = self.op.name
1373     ip = self.ip
1374
1375     # shutdown the master IP
1376     master = self.cfg.GetMasterNode()
1377     result = self.rpc.call_node_stop_master(master, False)
1378     if result.failed or not result.data:
1379       raise errors.OpExecError("Could not disable the master role")
1380
1381     try:
1382       cluster = self.cfg.GetClusterInfo()
1383       cluster.cluster_name = clustername
1384       cluster.master_ip = ip
1385       self.cfg.Update(cluster)
1386
1387       # update the known hosts file
1388       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1389       node_list = self.cfg.GetNodeList()
1390       try:
1391         node_list.remove(master)
1392       except ValueError:
1393         pass
1394       result = self.rpc.call_upload_file(node_list,
1395                                          constants.SSH_KNOWN_HOSTS_FILE)
1396       for to_node, to_result in result.iteritems():
1397         if to_result.failed or not to_result.data:
1398           logging.error("Copy of file %s to node %s failed",
1399                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1400
1401     finally:
1402       result = self.rpc.call_node_start_master(master, False)
1403       if result.failed or not result.data:
1404         self.LogWarning("Could not re-enable the master role on"
1405                         " the master, please restart manually.")
1406
1407
1408 def _RecursiveCheckIfLVMBased(disk):
1409   """Check if the given disk or its children are lvm-based.
1410
1411   @type disk: L{objects.Disk}
1412   @param disk: the disk to check
1413   @rtype: boolean
1414   @return: boolean indicating whether a LD_LV dev_type was found or not
1415
1416   """
1417   if disk.children:
1418     for chdisk in disk.children:
1419       if _RecursiveCheckIfLVMBased(chdisk):
1420         return True
1421   return disk.dev_type == constants.LD_LV
1422
1423
1424 class LUSetClusterParams(LogicalUnit):
1425   """Change the parameters of the cluster.
1426
1427   """
1428   HPATH = "cluster-modify"
1429   HTYPE = constants.HTYPE_CLUSTER
1430   _OP_REQP = []
1431   REQ_BGL = False
1432
1433   def CheckArguments(self):
1434     """Check parameters
1435
1436     """
1437     if not hasattr(self.op, "candidate_pool_size"):
1438       self.op.candidate_pool_size = None
1439     if self.op.candidate_pool_size is not None:
1440       try:
1441         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1442       except (ValueError, TypeError), err:
1443         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1444                                    str(err))
1445       if self.op.candidate_pool_size < 1:
1446         raise errors.OpPrereqError("At least one master candidate needed")
1447
1448   def ExpandNames(self):
1449     # FIXME: in the future maybe other cluster params won't require checking on
1450     # all nodes to be modified.
1451     self.needed_locks = {
1452       locking.LEVEL_NODE: locking.ALL_SET,
1453     }
1454     self.share_locks[locking.LEVEL_NODE] = 1
1455
1456   def BuildHooksEnv(self):
1457     """Build hooks env.
1458
1459     """
1460     env = {
1461       "OP_TARGET": self.cfg.GetClusterName(),
1462       "NEW_VG_NAME": self.op.vg_name,
1463       }
1464     mn = self.cfg.GetMasterNode()
1465     return env, [mn], [mn]
1466
1467   def CheckPrereq(self):
1468     """Check prerequisites.
1469
1470     This checks whether the given params don't conflict and
1471     if the given volume group is valid.
1472
1473     """
1474     if self.op.vg_name is not None and not self.op.vg_name:
1475       instances = self.cfg.GetAllInstancesInfo().values()
1476       for inst in instances:
1477         for disk in inst.disks:
1478           if _RecursiveCheckIfLVMBased(disk):
1479             raise errors.OpPrereqError("Cannot disable lvm storage while"
1480                                        " lvm-based instances exist")
1481
1482     node_list = self.acquired_locks[locking.LEVEL_NODE]
1483
1484     # if vg_name not None, checks given volume group on all nodes
1485     if self.op.vg_name:
1486       vglist = self.rpc.call_vg_list(node_list)
1487       for node in node_list:
1488         if vglist[node].failed:
1489           # ignoring down node
1490           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1491           continue
1492         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1493                                               self.op.vg_name,
1494                                               constants.MIN_VG_SIZE)
1495         if vgstatus:
1496           raise errors.OpPrereqError("Error on node '%s': %s" %
1497                                      (node, vgstatus))
1498
1499     self.cluster = cluster = self.cfg.GetClusterInfo()
1500     # validate beparams changes
1501     if self.op.beparams:
1502       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1503       self.new_beparams = cluster.FillDict(
1504         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1505
1506     # hypervisor list/parameters
1507     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1508     if self.op.hvparams:
1509       if not isinstance(self.op.hvparams, dict):
1510         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1511       for hv_name, hv_dict in self.op.hvparams.items():
1512         if hv_name not in self.new_hvparams:
1513           self.new_hvparams[hv_name] = hv_dict
1514         else:
1515           self.new_hvparams[hv_name].update(hv_dict)
1516
1517     if self.op.enabled_hypervisors is not None:
1518       self.hv_list = self.op.enabled_hypervisors
1519       if not self.hv_list:
1520         raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1521                                    " least one member")
1522       invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1523       if invalid_hvs:
1524         raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1525                                    " entries: %s" % invalid_hvs)
1526     else:
1527       self.hv_list = cluster.enabled_hypervisors
1528
1529     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1530       # either the enabled list has changed, or the parameters have, validate
1531       for hv_name, hv_params in self.new_hvparams.items():
1532         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1533             (self.op.enabled_hypervisors and
1534              hv_name in self.op.enabled_hypervisors)):
1535           # either this is a new hypervisor, or its parameters have changed
1536           hv_class = hypervisor.GetHypervisor(hv_name)
1537           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1538           hv_class.CheckParameterSyntax(hv_params)
1539           _CheckHVParams(self, node_list, hv_name, hv_params)
1540
1541   def Exec(self, feedback_fn):
1542     """Change the parameters of the cluster.
1543
1544     """
1545     if self.op.vg_name is not None:
1546       new_volume = self.op.vg_name
1547       if not new_volume:
1548         new_volume = None
1549       if new_volume != self.cfg.GetVGName():
1550         self.cfg.SetVGName(new_volume)
1551       else:
1552         feedback_fn("Cluster LVM configuration already in desired"
1553                     " state, not changing")
1554     if self.op.hvparams:
1555       self.cluster.hvparams = self.new_hvparams
1556     if self.op.enabled_hypervisors is not None:
1557       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1558     if self.op.beparams:
1559       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1560     if self.op.candidate_pool_size is not None:
1561       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1562       # we need to update the pool size here, otherwise the save will fail
1563       _AdjustCandidatePool(self)
1564
1565     self.cfg.Update(self.cluster)
1566
1567
1568 class LURedistributeConfig(NoHooksLU):
1569   """Force the redistribution of cluster configuration.
1570
1571   This is a very simple LU.
1572
1573   """
1574   _OP_REQP = []
1575   REQ_BGL = False
1576
1577   def ExpandNames(self):
1578     self.needed_locks = {
1579       locking.LEVEL_NODE: locking.ALL_SET,
1580     }
1581     self.share_locks[locking.LEVEL_NODE] = 1
1582
1583   def CheckPrereq(self):
1584     """Check prerequisites.
1585
1586     """
1587
1588   def Exec(self, feedback_fn):
1589     """Redistribute the configuration.
1590
1591     """
1592     self.cfg.Update(self.cfg.GetClusterInfo())
1593
1594
1595 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1596   """Sleep and poll for an instance's disk to sync.
1597
1598   """
1599   if not instance.disks:
1600     return True
1601
1602   if not oneshot:
1603     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1604
1605   node = instance.primary_node
1606
1607   for dev in instance.disks:
1608     lu.cfg.SetDiskID(dev, node)
1609
1610   retries = 0
1611   degr_retries = 10 # in seconds, as we sleep 1 second each time
1612   while True:
1613     max_time = 0
1614     done = True
1615     cumul_degraded = False
1616     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1617     if rstats.failed or not rstats.data:
1618       lu.LogWarning("Can't get any data from node %s", node)
1619       retries += 1
1620       if retries >= 10:
1621         raise errors.RemoteError("Can't contact node %s for mirror data,"
1622                                  " aborting." % node)
1623       time.sleep(6)
1624       continue
1625     rstats = rstats.data
1626     retries = 0
1627     for i, mstat in enumerate(rstats):
1628       if mstat is None:
1629         lu.LogWarning("Can't compute data for node %s/%s",
1630                            node, instance.disks[i].iv_name)
1631         continue
1632       # we ignore the ldisk parameter
1633       perc_done, est_time, is_degraded, _ = mstat
1634       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1635       if perc_done is not None:
1636         done = False
1637         if est_time is not None:
1638           rem_time = "%d estimated seconds remaining" % est_time
1639           max_time = est_time
1640         else:
1641           rem_time = "no time estimate"
1642         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1643                         (instance.disks[i].iv_name, perc_done, rem_time))
1644
1645     # if we're done but degraded, let's do a few small retries, to
1646     # make sure we see a stable and not transient situation; therefore
1647     # we force restart of the loop
1648     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1649       logging.info("Degraded disks found, %d retries left", degr_retries)
1650       degr_retries -= 1
1651       time.sleep(1)
1652       continue
1653
1654     if done or oneshot:
1655       break
1656
1657     time.sleep(min(60, max_time))
1658
1659   if done:
1660     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1661   return not cumul_degraded
1662
1663
1664 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1665   """Check that mirrors are not degraded.
1666
1667   The ldisk parameter, if True, will change the test from the
1668   is_degraded attribute (which represents overall non-ok status for
1669   the device(s)) to the ldisk (representing the local storage status).
1670
1671   """
1672   lu.cfg.SetDiskID(dev, node)
1673   if ldisk:
1674     idx = 6
1675   else:
1676     idx = 5
1677
1678   result = True
1679   if on_primary or dev.AssembleOnSecondary():
1680     rstats = lu.rpc.call_blockdev_find(node, dev)
1681     msg = rstats.RemoteFailMsg()
1682     if msg:
1683       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1684       result = False
1685     elif not rstats.payload:
1686       lu.LogWarning("Can't find disk on node %s", node)
1687       result = False
1688     else:
1689       result = result and (not rstats.payload[idx])
1690   if dev.children:
1691     for child in dev.children:
1692       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1693
1694   return result
1695
1696
1697 class LUDiagnoseOS(NoHooksLU):
1698   """Logical unit for OS diagnose/query.
1699
1700   """
1701   _OP_REQP = ["output_fields", "names"]
1702   REQ_BGL = False
1703   _FIELDS_STATIC = utils.FieldSet()
1704   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1705
1706   def ExpandNames(self):
1707     if self.op.names:
1708       raise errors.OpPrereqError("Selective OS query not supported")
1709
1710     _CheckOutputFields(static=self._FIELDS_STATIC,
1711                        dynamic=self._FIELDS_DYNAMIC,
1712                        selected=self.op.output_fields)
1713
1714     # Lock all nodes, in shared mode
1715     # Temporary removal of locks, should be reverted later
1716     # TODO: reintroduce locks when they are lighter-weight
1717     self.needed_locks = {}
1718     #self.share_locks[locking.LEVEL_NODE] = 1
1719     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1720
1721   def CheckPrereq(self):
1722     """Check prerequisites.
1723
1724     """
1725
1726   @staticmethod
1727   def _DiagnoseByOS(node_list, rlist):
1728     """Remaps a per-node return list into an a per-os per-node dictionary
1729
1730     @param node_list: a list with the names of all nodes
1731     @param rlist: a map with node names as keys and OS objects as values
1732
1733     @rtype: dict
1734     @return: a dictionary with osnames as keys and as value another map, with
1735         nodes as keys and list of OS objects as values, eg::
1736
1737           {"debian-etch": {"node1": [<object>,...],
1738                            "node2": [<object>,]}
1739           }
1740
1741     """
1742     all_os = {}
1743     # we build here the list of nodes that didn't fail the RPC (at RPC
1744     # level), so that nodes with a non-responding node daemon don't
1745     # make all OSes invalid
1746     good_nodes = [node_name for node_name in rlist
1747                   if not rlist[node_name].failed]
1748     for node_name, nr in rlist.iteritems():
1749       if nr.failed or not nr.data:
1750         continue
1751       for os_obj in nr.data:
1752         if os_obj.name not in all_os:
1753           # build a list of nodes for this os containing empty lists
1754           # for each node in node_list
1755           all_os[os_obj.name] = {}
1756           for nname in good_nodes:
1757             all_os[os_obj.name][nname] = []
1758         all_os[os_obj.name][node_name].append(os_obj)
1759     return all_os
1760
1761   def Exec(self, feedback_fn):
1762     """Compute the list of OSes.
1763
1764     """
1765     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1766     node_data = self.rpc.call_os_diagnose(valid_nodes)
1767     if node_data == False:
1768       raise errors.OpExecError("Can't gather the list of OSes")
1769     pol = self._DiagnoseByOS(valid_nodes, node_data)
1770     output = []
1771     for os_name, os_data in pol.iteritems():
1772       row = []
1773       for field in self.op.output_fields:
1774         if field == "name":
1775           val = os_name
1776         elif field == "valid":
1777           val = utils.all([osl and osl[0] for osl in os_data.values()])
1778         elif field == "node_status":
1779           val = {}
1780           for node_name, nos_list in os_data.iteritems():
1781             val[node_name] = [(v.status, v.path) for v in nos_list]
1782         else:
1783           raise errors.ParameterError(field)
1784         row.append(val)
1785       output.append(row)
1786
1787     return output
1788
1789
1790 class LURemoveNode(LogicalUnit):
1791   """Logical unit for removing a node.
1792
1793   """
1794   HPATH = "node-remove"
1795   HTYPE = constants.HTYPE_NODE
1796   _OP_REQP = ["node_name"]
1797
1798   def BuildHooksEnv(self):
1799     """Build hooks env.
1800
1801     This doesn't run on the target node in the pre phase as a failed
1802     node would then be impossible to remove.
1803
1804     """
1805     env = {
1806       "OP_TARGET": self.op.node_name,
1807       "NODE_NAME": self.op.node_name,
1808       }
1809     all_nodes = self.cfg.GetNodeList()
1810     all_nodes.remove(self.op.node_name)
1811     return env, all_nodes, all_nodes
1812
1813   def CheckPrereq(self):
1814     """Check prerequisites.
1815
1816     This checks:
1817      - the node exists in the configuration
1818      - it does not have primary or secondary instances
1819      - it's not the master
1820
1821     Any errors are signaled by raising errors.OpPrereqError.
1822
1823     """
1824     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1825     if node is None:
1826       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1827
1828     instance_list = self.cfg.GetInstanceList()
1829
1830     masternode = self.cfg.GetMasterNode()
1831     if node.name == masternode:
1832       raise errors.OpPrereqError("Node is the master node,"
1833                                  " you need to failover first.")
1834
1835     for instance_name in instance_list:
1836       instance = self.cfg.GetInstanceInfo(instance_name)
1837       if node.name in instance.all_nodes:
1838         raise errors.OpPrereqError("Instance %s is still running on the node,"
1839                                    " please remove first." % instance_name)
1840     self.op.node_name = node.name
1841     self.node = node
1842
1843   def Exec(self, feedback_fn):
1844     """Removes the node from the cluster.
1845
1846     """
1847     node = self.node
1848     logging.info("Stopping the node daemon and removing configs from node %s",
1849                  node.name)
1850
1851     self.context.RemoveNode(node.name)
1852
1853     self.rpc.call_node_leave_cluster(node.name)
1854
1855     # Promote nodes to master candidate as needed
1856     _AdjustCandidatePool(self)
1857
1858
1859 class LUQueryNodes(NoHooksLU):
1860   """Logical unit for querying nodes.
1861
1862   """
1863   _OP_REQP = ["output_fields", "names", "use_locking"]
1864   REQ_BGL = False
1865   _FIELDS_DYNAMIC = utils.FieldSet(
1866     "dtotal", "dfree",
1867     "mtotal", "mnode", "mfree",
1868     "bootid",
1869     "ctotal", "cnodes", "csockets",
1870     )
1871
1872   _FIELDS_STATIC = utils.FieldSet(
1873     "name", "pinst_cnt", "sinst_cnt",
1874     "pinst_list", "sinst_list",
1875     "pip", "sip", "tags",
1876     "serial_no",
1877     "master_candidate",
1878     "master",
1879     "offline",
1880     "drained",
1881     "role",
1882     )
1883
1884   def ExpandNames(self):
1885     _CheckOutputFields(static=self._FIELDS_STATIC,
1886                        dynamic=self._FIELDS_DYNAMIC,
1887                        selected=self.op.output_fields)
1888
1889     self.needed_locks = {}
1890     self.share_locks[locking.LEVEL_NODE] = 1
1891
1892     if self.op.names:
1893       self.wanted = _GetWantedNodes(self, self.op.names)
1894     else:
1895       self.wanted = locking.ALL_SET
1896
1897     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1898     self.do_locking = self.do_node_query and self.op.use_locking
1899     if self.do_locking:
1900       # if we don't request only static fields, we need to lock the nodes
1901       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1902
1903
1904   def CheckPrereq(self):
1905     """Check prerequisites.
1906
1907     """
1908     # The validation of the node list is done in the _GetWantedNodes,
1909     # if non empty, and if empty, there's no validation to do
1910     pass
1911
1912   def Exec(self, feedback_fn):
1913     """Computes the list of nodes and their attributes.
1914
1915     """
1916     all_info = self.cfg.GetAllNodesInfo()
1917     if self.do_locking:
1918       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1919     elif self.wanted != locking.ALL_SET:
1920       nodenames = self.wanted
1921       missing = set(nodenames).difference(all_info.keys())
1922       if missing:
1923         raise errors.OpExecError(
1924           "Some nodes were removed before retrieving their data: %s" % missing)
1925     else:
1926       nodenames = all_info.keys()
1927
1928     nodenames = utils.NiceSort(nodenames)
1929     nodelist = [all_info[name] for name in nodenames]
1930
1931     # begin data gathering
1932
1933     if self.do_node_query:
1934       live_data = {}
1935       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1936                                           self.cfg.GetHypervisorType())
1937       for name in nodenames:
1938         nodeinfo = node_data[name]
1939         if not nodeinfo.failed and nodeinfo.data:
1940           nodeinfo = nodeinfo.data
1941           fn = utils.TryConvert
1942           live_data[name] = {
1943             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
1944             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
1945             "mfree": fn(int, nodeinfo.get('memory_free', None)),
1946             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
1947             "dfree": fn(int, nodeinfo.get('vg_free', None)),
1948             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
1949             "bootid": nodeinfo.get('bootid', None),
1950             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
1951             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
1952             }
1953         else:
1954           live_data[name] = {}
1955     else:
1956       live_data = dict.fromkeys(nodenames, {})
1957
1958     node_to_primary = dict([(name, set()) for name in nodenames])
1959     node_to_secondary = dict([(name, set()) for name in nodenames])
1960
1961     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1962                              "sinst_cnt", "sinst_list"))
1963     if inst_fields & frozenset(self.op.output_fields):
1964       instancelist = self.cfg.GetInstanceList()
1965
1966       for instance_name in instancelist:
1967         inst = self.cfg.GetInstanceInfo(instance_name)
1968         if inst.primary_node in node_to_primary:
1969           node_to_primary[inst.primary_node].add(inst.name)
1970         for secnode in inst.secondary_nodes:
1971           if secnode in node_to_secondary:
1972             node_to_secondary[secnode].add(inst.name)
1973
1974     master_node = self.cfg.GetMasterNode()
1975
1976     # end data gathering
1977
1978     output = []
1979     for node in nodelist:
1980       node_output = []
1981       for field in self.op.output_fields:
1982         if field == "name":
1983           val = node.name
1984         elif field == "pinst_list":
1985           val = list(node_to_primary[node.name])
1986         elif field == "sinst_list":
1987           val = list(node_to_secondary[node.name])
1988         elif field == "pinst_cnt":
1989           val = len(node_to_primary[node.name])
1990         elif field == "sinst_cnt":
1991           val = len(node_to_secondary[node.name])
1992         elif field == "pip":
1993           val = node.primary_ip
1994         elif field == "sip":
1995           val = node.secondary_ip
1996         elif field == "tags":
1997           val = list(node.GetTags())
1998         elif field == "serial_no":
1999           val = node.serial_no
2000         elif field == "master_candidate":
2001           val = node.master_candidate
2002         elif field == "master":
2003           val = node.name == master_node
2004         elif field == "offline":
2005           val = node.offline
2006         elif field == "drained":
2007           val = node.drained
2008         elif self._FIELDS_DYNAMIC.Matches(field):
2009           val = live_data[node.name].get(field, None)
2010         elif field == "role":
2011           if node.name == master_node:
2012             val = "M"
2013           elif node.master_candidate:
2014             val = "C"
2015           elif node.drained:
2016             val = "D"
2017           elif node.offline:
2018             val = "O"
2019           else:
2020             val = "R"
2021         else:
2022           raise errors.ParameterError(field)
2023         node_output.append(val)
2024       output.append(node_output)
2025
2026     return output
2027
2028
2029 class LUQueryNodeVolumes(NoHooksLU):
2030   """Logical unit for getting volumes on node(s).
2031
2032   """
2033   _OP_REQP = ["nodes", "output_fields"]
2034   REQ_BGL = False
2035   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2036   _FIELDS_STATIC = utils.FieldSet("node")
2037
2038   def ExpandNames(self):
2039     _CheckOutputFields(static=self._FIELDS_STATIC,
2040                        dynamic=self._FIELDS_DYNAMIC,
2041                        selected=self.op.output_fields)
2042
2043     self.needed_locks = {}
2044     self.share_locks[locking.LEVEL_NODE] = 1
2045     if not self.op.nodes:
2046       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2047     else:
2048       self.needed_locks[locking.LEVEL_NODE] = \
2049         _GetWantedNodes(self, self.op.nodes)
2050
2051   def CheckPrereq(self):
2052     """Check prerequisites.
2053
2054     This checks that the fields required are valid output fields.
2055
2056     """
2057     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2058
2059   def Exec(self, feedback_fn):
2060     """Computes the list of nodes and their attributes.
2061
2062     """
2063     nodenames = self.nodes
2064     volumes = self.rpc.call_node_volumes(nodenames)
2065
2066     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2067              in self.cfg.GetInstanceList()]
2068
2069     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2070
2071     output = []
2072     for node in nodenames:
2073       if node not in volumes or volumes[node].failed or not volumes[node].data:
2074         continue
2075
2076       node_vols = volumes[node].data[:]
2077       node_vols.sort(key=lambda vol: vol['dev'])
2078
2079       for vol in node_vols:
2080         node_output = []
2081         for field in self.op.output_fields:
2082           if field == "node":
2083             val = node
2084           elif field == "phys":
2085             val = vol['dev']
2086           elif field == "vg":
2087             val = vol['vg']
2088           elif field == "name":
2089             val = vol['name']
2090           elif field == "size":
2091             val = int(float(vol['size']))
2092           elif field == "instance":
2093             for inst in ilist:
2094               if node not in lv_by_node[inst]:
2095                 continue
2096               if vol['name'] in lv_by_node[inst][node]:
2097                 val = inst.name
2098                 break
2099             else:
2100               val = '-'
2101           else:
2102             raise errors.ParameterError(field)
2103           node_output.append(str(val))
2104
2105         output.append(node_output)
2106
2107     return output
2108
2109
2110 class LUAddNode(LogicalUnit):
2111   """Logical unit for adding node to the cluster.
2112
2113   """
2114   HPATH = "node-add"
2115   HTYPE = constants.HTYPE_NODE
2116   _OP_REQP = ["node_name"]
2117
2118   def BuildHooksEnv(self):
2119     """Build hooks env.
2120
2121     This will run on all nodes before, and on all nodes + the new node after.
2122
2123     """
2124     env = {
2125       "OP_TARGET": self.op.node_name,
2126       "NODE_NAME": self.op.node_name,
2127       "NODE_PIP": self.op.primary_ip,
2128       "NODE_SIP": self.op.secondary_ip,
2129       }
2130     nodes_0 = self.cfg.GetNodeList()
2131     nodes_1 = nodes_0 + [self.op.node_name, ]
2132     return env, nodes_0, nodes_1
2133
2134   def CheckPrereq(self):
2135     """Check prerequisites.
2136
2137     This checks:
2138      - the new node is not already in the config
2139      - it is resolvable
2140      - its parameters (single/dual homed) matches the cluster
2141
2142     Any errors are signaled by raising errors.OpPrereqError.
2143
2144     """
2145     node_name = self.op.node_name
2146     cfg = self.cfg
2147
2148     dns_data = utils.HostInfo(node_name)
2149
2150     node = dns_data.name
2151     primary_ip = self.op.primary_ip = dns_data.ip
2152     secondary_ip = getattr(self.op, "secondary_ip", None)
2153     if secondary_ip is None:
2154       secondary_ip = primary_ip
2155     if not utils.IsValidIP(secondary_ip):
2156       raise errors.OpPrereqError("Invalid secondary IP given")
2157     self.op.secondary_ip = secondary_ip
2158
2159     node_list = cfg.GetNodeList()
2160     if not self.op.readd and node in node_list:
2161       raise errors.OpPrereqError("Node %s is already in the configuration" %
2162                                  node)
2163     elif self.op.readd and node not in node_list:
2164       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2165
2166     for existing_node_name in node_list:
2167       existing_node = cfg.GetNodeInfo(existing_node_name)
2168
2169       if self.op.readd and node == existing_node_name:
2170         if (existing_node.primary_ip != primary_ip or
2171             existing_node.secondary_ip != secondary_ip):
2172           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2173                                      " address configuration as before")
2174         continue
2175
2176       if (existing_node.primary_ip == primary_ip or
2177           existing_node.secondary_ip == primary_ip or
2178           existing_node.primary_ip == secondary_ip or
2179           existing_node.secondary_ip == secondary_ip):
2180         raise errors.OpPrereqError("New node ip address(es) conflict with"
2181                                    " existing node %s" % existing_node.name)
2182
2183     # check that the type of the node (single versus dual homed) is the
2184     # same as for the master
2185     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2186     master_singlehomed = myself.secondary_ip == myself.primary_ip
2187     newbie_singlehomed = secondary_ip == primary_ip
2188     if master_singlehomed != newbie_singlehomed:
2189       if master_singlehomed:
2190         raise errors.OpPrereqError("The master has no private ip but the"
2191                                    " new node has one")
2192       else:
2193         raise errors.OpPrereqError("The master has a private ip but the"
2194                                    " new node doesn't have one")
2195
2196     # checks reachability
2197     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2198       raise errors.OpPrereqError("Node not reachable by ping")
2199
2200     if not newbie_singlehomed:
2201       # check reachability from my secondary ip to newbie's secondary ip
2202       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2203                            source=myself.secondary_ip):
2204         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2205                                    " based ping to noded port")
2206
2207     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2208     if self.op.readd:
2209       exceptions = [node]
2210     else:
2211       exceptions = []
2212     mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2213     # the new node will increase mc_max with one, so:
2214     mc_max = min(mc_max + 1, cp_size)
2215     self.master_candidate = mc_now < mc_max
2216
2217     if self.op.readd:
2218       self.new_node = self.cfg.GetNodeInfo(node)
2219       assert self.new_node is not None, "Can't retrieve locked node %s" % node
2220     else:
2221       self.new_node = objects.Node(name=node,
2222                                    primary_ip=primary_ip,
2223                                    secondary_ip=secondary_ip,
2224                                    master_candidate=self.master_candidate,
2225                                    offline=False, drained=False)
2226
2227   def Exec(self, feedback_fn):
2228     """Adds the new node to the cluster.
2229
2230     """
2231     new_node = self.new_node
2232     node = new_node.name
2233
2234     # for re-adds, reset the offline/drained/master-candidate flags;
2235     # we need to reset here, otherwise offline would prevent RPC calls
2236     # later in the procedure; this also means that if the re-add
2237     # fails, we are left with a non-offlined, broken node
2238     if self.op.readd:
2239       new_node.drained = new_node.offline = False
2240       self.LogInfo("Readding a node, the offline/drained flags were reset")
2241       # if we demote the node, we do cleanup later in the procedure
2242       new_node.master_candidate = self.master_candidate
2243
2244     # notify the user about any possible mc promotion
2245     if new_node.master_candidate:
2246       self.LogInfo("Node will be a master candidate")
2247
2248     # check connectivity
2249     result = self.rpc.call_version([node])[node]
2250     result.Raise()
2251     if result.data:
2252       if constants.PROTOCOL_VERSION == result.data:
2253         logging.info("Communication to node %s fine, sw version %s match",
2254                      node, result.data)
2255       else:
2256         raise errors.OpExecError("Version mismatch master version %s,"
2257                                  " node version %s" %
2258                                  (constants.PROTOCOL_VERSION, result.data))
2259     else:
2260       raise errors.OpExecError("Cannot get version from the new node")
2261
2262     # setup ssh on node
2263     logging.info("Copy ssh key to node %s", node)
2264     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2265     keyarray = []
2266     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2267                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2268                 priv_key, pub_key]
2269
2270     for i in keyfiles:
2271       f = open(i, 'r')
2272       try:
2273         keyarray.append(f.read())
2274       finally:
2275         f.close()
2276
2277     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2278                                     keyarray[2],
2279                                     keyarray[3], keyarray[4], keyarray[5])
2280
2281     msg = result.RemoteFailMsg()
2282     if msg:
2283       raise errors.OpExecError("Cannot transfer ssh keys to the"
2284                                " new node: %s" % msg)
2285
2286     # Add node to our /etc/hosts, and add key to known_hosts
2287     utils.AddHostToEtcHosts(new_node.name)
2288
2289     if new_node.secondary_ip != new_node.primary_ip:
2290       result = self.rpc.call_node_has_ip_address(new_node.name,
2291                                                  new_node.secondary_ip)
2292       if result.failed or not result.data:
2293         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2294                                  " you gave (%s). Please fix and re-run this"
2295                                  " command." % new_node.secondary_ip)
2296
2297     node_verify_list = [self.cfg.GetMasterNode()]
2298     node_verify_param = {
2299       'nodelist': [node],
2300       # TODO: do a node-net-test as well?
2301     }
2302
2303     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2304                                        self.cfg.GetClusterName())
2305     for verifier in node_verify_list:
2306       if result[verifier].failed or not result[verifier].data:
2307         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2308                                  " for remote verification" % verifier)
2309       if result[verifier].data['nodelist']:
2310         for failed in result[verifier].data['nodelist']:
2311           feedback_fn("ssh/hostname verification failed %s -> %s" %
2312                       (verifier, result[verifier].data['nodelist'][failed]))
2313         raise errors.OpExecError("ssh/hostname verification failed.")
2314
2315     # Distribute updated /etc/hosts and known_hosts to all nodes,
2316     # including the node just added
2317     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2318     dist_nodes = self.cfg.GetNodeList()
2319     if not self.op.readd:
2320       dist_nodes.append(node)
2321     if myself.name in dist_nodes:
2322       dist_nodes.remove(myself.name)
2323
2324     logging.debug("Copying hosts and known_hosts to all nodes")
2325     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2326       result = self.rpc.call_upload_file(dist_nodes, fname)
2327       for to_node, to_result in result.iteritems():
2328         if to_result.failed or not to_result.data:
2329           logging.error("Copy of file %s to node %s failed", fname, to_node)
2330
2331     to_copy = []
2332     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2333     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2334       to_copy.append(constants.VNC_PASSWORD_FILE)
2335
2336     for fname in to_copy:
2337       result = self.rpc.call_upload_file([node], fname)
2338       if result[node].failed or not result[node]:
2339         logging.error("Could not copy file %s to node %s", fname, node)
2340
2341     if self.op.readd:
2342       self.context.ReaddNode(new_node)
2343       # make sure we redistribute the config
2344       self.cfg.Update(new_node)
2345       # and make sure the new node will not have old files around
2346       if not new_node.master_candidate:
2347         result = self.rpc.call_node_demote_from_mc(new_node.name)
2348         msg = result.RemoteFailMsg()
2349         if msg:
2350           self.LogWarning("Node failed to demote itself from master"
2351                           " candidate status: %s" % msg)
2352     else:
2353       self.context.AddNode(new_node)
2354
2355
2356 class LUSetNodeParams(LogicalUnit):
2357   """Modifies the parameters of a node.
2358
2359   """
2360   HPATH = "node-modify"
2361   HTYPE = constants.HTYPE_NODE
2362   _OP_REQP = ["node_name"]
2363   REQ_BGL = False
2364
2365   def CheckArguments(self):
2366     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2367     if node_name is None:
2368       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2369     self.op.node_name = node_name
2370     _CheckBooleanOpField(self.op, 'master_candidate')
2371     _CheckBooleanOpField(self.op, 'offline')
2372     _CheckBooleanOpField(self.op, 'drained')
2373     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2374     if all_mods.count(None) == 3:
2375       raise errors.OpPrereqError("Please pass at least one modification")
2376     if all_mods.count(True) > 1:
2377       raise errors.OpPrereqError("Can't set the node into more than one"
2378                                  " state at the same time")
2379
2380   def ExpandNames(self):
2381     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2382
2383   def BuildHooksEnv(self):
2384     """Build hooks env.
2385
2386     This runs on the master node.
2387
2388     """
2389     env = {
2390       "OP_TARGET": self.op.node_name,
2391       "MASTER_CANDIDATE": str(self.op.master_candidate),
2392       "OFFLINE": str(self.op.offline),
2393       "DRAINED": str(self.op.drained),
2394       }
2395     nl = [self.cfg.GetMasterNode(),
2396           self.op.node_name]
2397     return env, nl, nl
2398
2399   def CheckPrereq(self):
2400     """Check prerequisites.
2401
2402     This only checks the instance list against the existing names.
2403
2404     """
2405     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2406
2407     if ((self.op.master_candidate == False or self.op.offline == True or
2408          self.op.drained == True) and node.master_candidate):
2409       # we will demote the node from master_candidate
2410       if self.op.node_name == self.cfg.GetMasterNode():
2411         raise errors.OpPrereqError("The master node has to be a"
2412                                    " master candidate, online and not drained")
2413       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2414       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2415       if num_candidates <= cp_size:
2416         msg = ("Not enough master candidates (desired"
2417                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2418         if self.op.force:
2419           self.LogWarning(msg)
2420         else:
2421           raise errors.OpPrereqError(msg)
2422
2423     if (self.op.master_candidate == True and
2424         ((node.offline and not self.op.offline == False) or
2425          (node.drained and not self.op.drained == False))):
2426       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2427                                  " to master_candidate" % node.name)
2428
2429     return
2430
2431   def Exec(self, feedback_fn):
2432     """Modifies a node.
2433
2434     """
2435     node = self.node
2436
2437     result = []
2438     changed_mc = False
2439
2440     if self.op.offline is not None:
2441       node.offline = self.op.offline
2442       result.append(("offline", str(self.op.offline)))
2443       if self.op.offline == True:
2444         if node.master_candidate:
2445           node.master_candidate = False
2446           changed_mc = True
2447           result.append(("master_candidate", "auto-demotion due to offline"))
2448         if node.drained:
2449           node.drained = False
2450           result.append(("drained", "clear drained status due to offline"))
2451
2452     if self.op.master_candidate is not None:
2453       node.master_candidate = self.op.master_candidate
2454       changed_mc = True
2455       result.append(("master_candidate", str(self.op.master_candidate)))
2456       if self.op.master_candidate == False:
2457         rrc = self.rpc.call_node_demote_from_mc(node.name)
2458         msg = rrc.RemoteFailMsg()
2459         if msg:
2460           self.LogWarning("Node failed to demote itself: %s" % msg)
2461
2462     if self.op.drained is not None:
2463       node.drained = self.op.drained
2464       result.append(("drained", str(self.op.drained)))
2465       if self.op.drained == True:
2466         if node.master_candidate:
2467           node.master_candidate = False
2468           changed_mc = True
2469           result.append(("master_candidate", "auto-demotion due to drain"))
2470           rrc = self.rpc.call_node_demote_from_mc(node.name)
2471           msg = rrc.RemoteFailMsg()
2472           if msg:
2473             self.LogWarning("Node failed to demote itself: %s" % msg)
2474         if node.offline:
2475           node.offline = False
2476           result.append(("offline", "clear offline status due to drain"))
2477
2478     # this will trigger configuration file update, if needed
2479     self.cfg.Update(node)
2480     # this will trigger job queue propagation or cleanup
2481     if changed_mc:
2482       self.context.ReaddNode(node)
2483
2484     return result
2485
2486
2487 class LUQueryClusterInfo(NoHooksLU):
2488   """Query cluster configuration.
2489
2490   """
2491   _OP_REQP = []
2492   REQ_BGL = False
2493
2494   def ExpandNames(self):
2495     self.needed_locks = {}
2496
2497   def CheckPrereq(self):
2498     """No prerequsites needed for this LU.
2499
2500     """
2501     pass
2502
2503   def Exec(self, feedback_fn):
2504     """Return cluster config.
2505
2506     """
2507     cluster = self.cfg.GetClusterInfo()
2508     result = {
2509       "software_version": constants.RELEASE_VERSION,
2510       "protocol_version": constants.PROTOCOL_VERSION,
2511       "config_version": constants.CONFIG_VERSION,
2512       "os_api_version": constants.OS_API_VERSION,
2513       "export_version": constants.EXPORT_VERSION,
2514       "architecture": (platform.architecture()[0], platform.machine()),
2515       "name": cluster.cluster_name,
2516       "master": cluster.master_node,
2517       "default_hypervisor": cluster.default_hypervisor,
2518       "enabled_hypervisors": cluster.enabled_hypervisors,
2519       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2520                         for hypervisor_name in cluster.enabled_hypervisors]),
2521       "beparams": cluster.beparams,
2522       "candidate_pool_size": cluster.candidate_pool_size,
2523       "default_bridge": cluster.default_bridge,
2524       "master_netdev": cluster.master_netdev,
2525       "volume_group_name": cluster.volume_group_name,
2526       "file_storage_dir": cluster.file_storage_dir,
2527       }
2528
2529     return result
2530
2531
2532 class LUQueryConfigValues(NoHooksLU):
2533   """Return configuration values.
2534
2535   """
2536   _OP_REQP = []
2537   REQ_BGL = False
2538   _FIELDS_DYNAMIC = utils.FieldSet()
2539   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2540
2541   def ExpandNames(self):
2542     self.needed_locks = {}
2543
2544     _CheckOutputFields(static=self._FIELDS_STATIC,
2545                        dynamic=self._FIELDS_DYNAMIC,
2546                        selected=self.op.output_fields)
2547
2548   def CheckPrereq(self):
2549     """No prerequisites.
2550
2551     """
2552     pass
2553
2554   def Exec(self, feedback_fn):
2555     """Dump a representation of the cluster config to the standard output.
2556
2557     """
2558     values = []
2559     for field in self.op.output_fields:
2560       if field == "cluster_name":
2561         entry = self.cfg.GetClusterName()
2562       elif field == "master_node":
2563         entry = self.cfg.GetMasterNode()
2564       elif field == "drain_flag":
2565         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2566       else:
2567         raise errors.ParameterError(field)
2568       values.append(entry)
2569     return values
2570
2571
2572 class LUActivateInstanceDisks(NoHooksLU):
2573   """Bring up an instance's disks.
2574
2575   """
2576   _OP_REQP = ["instance_name"]
2577   REQ_BGL = False
2578
2579   def ExpandNames(self):
2580     self._ExpandAndLockInstance()
2581     self.needed_locks[locking.LEVEL_NODE] = []
2582     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2583
2584   def DeclareLocks(self, level):
2585     if level == locking.LEVEL_NODE:
2586       self._LockInstancesNodes()
2587
2588   def CheckPrereq(self):
2589     """Check prerequisites.
2590
2591     This checks that the instance is in the cluster.
2592
2593     """
2594     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2595     assert self.instance is not None, \
2596       "Cannot retrieve locked instance %s" % self.op.instance_name
2597     _CheckNodeOnline(self, self.instance.primary_node)
2598
2599   def Exec(self, feedback_fn):
2600     """Activate the disks.
2601
2602     """
2603     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2604     if not disks_ok:
2605       raise errors.OpExecError("Cannot activate block devices")
2606
2607     return disks_info
2608
2609
2610 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2611   """Prepare the block devices for an instance.
2612
2613   This sets up the block devices on all nodes.
2614
2615   @type lu: L{LogicalUnit}
2616   @param lu: the logical unit on whose behalf we execute
2617   @type instance: L{objects.Instance}
2618   @param instance: the instance for whose disks we assemble
2619   @type ignore_secondaries: boolean
2620   @param ignore_secondaries: if true, errors on secondary nodes
2621       won't result in an error return from the function
2622   @return: False if the operation failed, otherwise a list of
2623       (host, instance_visible_name, node_visible_name)
2624       with the mapping from node devices to instance devices
2625
2626   """
2627   device_info = []
2628   disks_ok = True
2629   iname = instance.name
2630   # With the two passes mechanism we try to reduce the window of
2631   # opportunity for the race condition of switching DRBD to primary
2632   # before handshaking occured, but we do not eliminate it
2633
2634   # The proper fix would be to wait (with some limits) until the
2635   # connection has been made and drbd transitions from WFConnection
2636   # into any other network-connected state (Connected, SyncTarget,
2637   # SyncSource, etc.)
2638
2639   # 1st pass, assemble on all nodes in secondary mode
2640   for inst_disk in instance.disks:
2641     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2642       lu.cfg.SetDiskID(node_disk, node)
2643       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2644       msg = result.RemoteFailMsg()
2645       if msg:
2646         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2647                            " (is_primary=False, pass=1): %s",
2648                            inst_disk.iv_name, node, msg)
2649         if not ignore_secondaries:
2650           disks_ok = False
2651
2652   # FIXME: race condition on drbd migration to primary
2653
2654   # 2nd pass, do only the primary node
2655   for inst_disk in instance.disks:
2656     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2657       if node != instance.primary_node:
2658         continue
2659       lu.cfg.SetDiskID(node_disk, node)
2660       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2661       msg = result.RemoteFailMsg()
2662       if msg:
2663         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2664                            " (is_primary=True, pass=2): %s",
2665                            inst_disk.iv_name, node, msg)
2666         disks_ok = False
2667     device_info.append((instance.primary_node, inst_disk.iv_name,
2668                         result.payload))
2669
2670   # leave the disks configured for the primary node
2671   # this is a workaround that would be fixed better by
2672   # improving the logical/physical id handling
2673   for disk in instance.disks:
2674     lu.cfg.SetDiskID(disk, instance.primary_node)
2675
2676   return disks_ok, device_info
2677
2678
2679 def _StartInstanceDisks(lu, instance, force):
2680   """Start the disks of an instance.
2681
2682   """
2683   disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2684                                            ignore_secondaries=force)
2685   if not disks_ok:
2686     _ShutdownInstanceDisks(lu, instance)
2687     if force is not None and not force:
2688       lu.proc.LogWarning("", hint="If the message above refers to a"
2689                          " secondary node,"
2690                          " you can retry the operation using '--force'.")
2691     raise errors.OpExecError("Disk consistency error")
2692
2693
2694 class LUDeactivateInstanceDisks(NoHooksLU):
2695   """Shutdown an instance's disks.
2696
2697   """
2698   _OP_REQP = ["instance_name"]
2699   REQ_BGL = False
2700
2701   def ExpandNames(self):
2702     self._ExpandAndLockInstance()
2703     self.needed_locks[locking.LEVEL_NODE] = []
2704     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2705
2706   def DeclareLocks(self, level):
2707     if level == locking.LEVEL_NODE:
2708       self._LockInstancesNodes()
2709
2710   def CheckPrereq(self):
2711     """Check prerequisites.
2712
2713     This checks that the instance is in the cluster.
2714
2715     """
2716     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2717     assert self.instance is not None, \
2718       "Cannot retrieve locked instance %s" % self.op.instance_name
2719
2720   def Exec(self, feedback_fn):
2721     """Deactivate the disks
2722
2723     """
2724     instance = self.instance
2725     _SafeShutdownInstanceDisks(self, instance)
2726
2727
2728 def _SafeShutdownInstanceDisks(lu, instance):
2729   """Shutdown block devices of an instance.
2730
2731   This function checks if an instance is running, before calling
2732   _ShutdownInstanceDisks.
2733
2734   """
2735   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2736                                       [instance.hypervisor])
2737   ins_l = ins_l[instance.primary_node]
2738   if ins_l.failed or not isinstance(ins_l.data, list):
2739     raise errors.OpExecError("Can't contact node '%s'" %
2740                              instance.primary_node)
2741
2742   if instance.name in ins_l.data:
2743     raise errors.OpExecError("Instance is running, can't shutdown"
2744                              " block devices.")
2745
2746   _ShutdownInstanceDisks(lu, instance)
2747
2748
2749 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2750   """Shutdown block devices of an instance.
2751
2752   This does the shutdown on all nodes of the instance.
2753
2754   If the ignore_primary is false, errors on the primary node are
2755   ignored.
2756
2757   """
2758   all_result = True
2759   for disk in instance.disks:
2760     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2761       lu.cfg.SetDiskID(top_disk, node)
2762       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2763       msg = result.RemoteFailMsg()
2764       if msg:
2765         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2766                       disk.iv_name, node, msg)
2767         if not ignore_primary or node != instance.primary_node:
2768           all_result = False
2769   return all_result
2770
2771
2772 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2773   """Checks if a node has enough free memory.
2774
2775   This function check if a given node has the needed amount of free
2776   memory. In case the node has less memory or we cannot get the
2777   information from the node, this function raise an OpPrereqError
2778   exception.
2779
2780   @type lu: C{LogicalUnit}
2781   @param lu: a logical unit from which we get configuration data
2782   @type node: C{str}
2783   @param node: the node to check
2784   @type reason: C{str}
2785   @param reason: string to use in the error message
2786   @type requested: C{int}
2787   @param requested: the amount of memory in MiB to check for
2788   @type hypervisor_name: C{str}
2789   @param hypervisor_name: the hypervisor to ask for memory stats
2790   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2791       we cannot check the node
2792
2793   """
2794   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2795   nodeinfo[node].Raise()
2796   free_mem = nodeinfo[node].data.get('memory_free')
2797   if not isinstance(free_mem, int):
2798     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2799                              " was '%s'" % (node, free_mem))
2800   if requested > free_mem:
2801     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2802                              " needed %s MiB, available %s MiB" %
2803                              (node, reason, requested, free_mem))
2804
2805
2806 class LUStartupInstance(LogicalUnit):
2807   """Starts an instance.
2808
2809   """
2810   HPATH = "instance-start"
2811   HTYPE = constants.HTYPE_INSTANCE
2812   _OP_REQP = ["instance_name", "force"]
2813   REQ_BGL = False
2814
2815   def ExpandNames(self):
2816     self._ExpandAndLockInstance()
2817
2818   def BuildHooksEnv(self):
2819     """Build hooks env.
2820
2821     This runs on master, primary and secondary nodes of the instance.
2822
2823     """
2824     env = {
2825       "FORCE": self.op.force,
2826       }
2827     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2828     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2829     return env, nl, nl
2830
2831   def CheckPrereq(self):
2832     """Check prerequisites.
2833
2834     This checks that the instance is in the cluster.
2835
2836     """
2837     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2838     assert self.instance is not None, \
2839       "Cannot retrieve locked instance %s" % self.op.instance_name
2840
2841     # extra beparams
2842     self.beparams = getattr(self.op, "beparams", {})
2843     if self.beparams:
2844       if not isinstance(self.beparams, dict):
2845         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2846                                    " dict" % (type(self.beparams), ))
2847       # fill the beparams dict
2848       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2849       self.op.beparams = self.beparams
2850
2851     # extra hvparams
2852     self.hvparams = getattr(self.op, "hvparams", {})
2853     if self.hvparams:
2854       if not isinstance(self.hvparams, dict):
2855         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2856                                    " dict" % (type(self.hvparams), ))
2857
2858       # check hypervisor parameter syntax (locally)
2859       cluster = self.cfg.GetClusterInfo()
2860       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2861       filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2862                                     instance.hvparams)
2863       filled_hvp.update(self.hvparams)
2864       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2865       hv_type.CheckParameterSyntax(filled_hvp)
2866       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2867       self.op.hvparams = self.hvparams
2868
2869     _CheckNodeOnline(self, instance.primary_node)
2870
2871     bep = self.cfg.GetClusterInfo().FillBE(instance)
2872     # check bridges existence
2873     _CheckInstanceBridgesExist(self, instance)
2874
2875     remote_info = self.rpc.call_instance_info(instance.primary_node,
2876                                               instance.name,
2877                                               instance.hypervisor)
2878     remote_info.Raise()
2879     if not remote_info.data:
2880       _CheckNodeFreeMemory(self, instance.primary_node,
2881                            "starting instance %s" % instance.name,
2882                            bep[constants.BE_MEMORY], instance.hypervisor)
2883
2884   def Exec(self, feedback_fn):
2885     """Start the instance.
2886
2887     """
2888     instance = self.instance
2889     force = self.op.force
2890
2891     self.cfg.MarkInstanceUp(instance.name)
2892
2893     node_current = instance.primary_node
2894
2895     _StartInstanceDisks(self, instance, force)
2896
2897     result = self.rpc.call_instance_start(node_current, instance,
2898                                           self.hvparams, self.beparams)
2899     msg = result.RemoteFailMsg()
2900     if msg:
2901       _ShutdownInstanceDisks(self, instance)
2902       raise errors.OpExecError("Could not start instance: %s" % msg)
2903
2904
2905 class LURebootInstance(LogicalUnit):
2906   """Reboot an instance.
2907
2908   """
2909   HPATH = "instance-reboot"
2910   HTYPE = constants.HTYPE_INSTANCE
2911   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2912   REQ_BGL = False
2913
2914   def ExpandNames(self):
2915     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2916                                    constants.INSTANCE_REBOOT_HARD,
2917                                    constants.INSTANCE_REBOOT_FULL]:
2918       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2919                                   (constants.INSTANCE_REBOOT_SOFT,
2920                                    constants.INSTANCE_REBOOT_HARD,
2921                                    constants.INSTANCE_REBOOT_FULL))
2922     self._ExpandAndLockInstance()
2923
2924   def BuildHooksEnv(self):
2925     """Build hooks env.
2926
2927     This runs on master, primary and secondary nodes of the instance.
2928
2929     """
2930     env = {
2931       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2932       "REBOOT_TYPE": self.op.reboot_type,
2933       }
2934     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2935     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2936     return env, nl, nl
2937
2938   def CheckPrereq(self):
2939     """Check prerequisites.
2940
2941     This checks that the instance is in the cluster.
2942
2943     """
2944     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2945     assert self.instance is not None, \
2946       "Cannot retrieve locked instance %s" % self.op.instance_name
2947
2948     _CheckNodeOnline(self, instance.primary_node)
2949
2950     # check bridges existence
2951     _CheckInstanceBridgesExist(self, instance)
2952
2953   def Exec(self, feedback_fn):
2954     """Reboot the instance.
2955
2956     """
2957     instance = self.instance
2958     ignore_secondaries = self.op.ignore_secondaries
2959     reboot_type = self.op.reboot_type
2960
2961     node_current = instance.primary_node
2962
2963     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2964                        constants.INSTANCE_REBOOT_HARD]:
2965       for disk in instance.disks:
2966         self.cfg.SetDiskID(disk, node_current)
2967       result = self.rpc.call_instance_reboot(node_current, instance,
2968                                              reboot_type)
2969       msg = result.RemoteFailMsg()
2970       if msg:
2971         raise errors.OpExecError("Could not reboot instance: %s" % msg)
2972     else:
2973       result = self.rpc.call_instance_shutdown(node_current, instance)
2974       msg = result.RemoteFailMsg()
2975       if msg:
2976         raise errors.OpExecError("Could not shutdown instance for"
2977                                  " full reboot: %s" % msg)
2978       _ShutdownInstanceDisks(self, instance)
2979       _StartInstanceDisks(self, instance, ignore_secondaries)
2980       result = self.rpc.call_instance_start(node_current, instance, None, None)
2981       msg = result.RemoteFailMsg()
2982       if msg:
2983         _ShutdownInstanceDisks(self, instance)
2984         raise errors.OpExecError("Could not start instance for"
2985                                  " full reboot: %s" % msg)
2986
2987     self.cfg.MarkInstanceUp(instance.name)
2988
2989
2990 class LUShutdownInstance(LogicalUnit):
2991   """Shutdown an instance.
2992
2993   """
2994   HPATH = "instance-stop"
2995   HTYPE = constants.HTYPE_INSTANCE
2996   _OP_REQP = ["instance_name"]
2997   REQ_BGL = False
2998
2999   def ExpandNames(self):
3000     self._ExpandAndLockInstance()
3001
3002   def BuildHooksEnv(self):
3003     """Build hooks env.
3004
3005     This runs on master, primary and secondary nodes of the instance.
3006
3007     """
3008     env = _BuildInstanceHookEnvByObject(self, self.instance)
3009     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3010     return env, nl, nl
3011
3012   def CheckPrereq(self):
3013     """Check prerequisites.
3014
3015     This checks that the instance is in the cluster.
3016
3017     """
3018     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3019     assert self.instance is not None, \
3020       "Cannot retrieve locked instance %s" % self.op.instance_name
3021     _CheckNodeOnline(self, self.instance.primary_node)
3022
3023   def Exec(self, feedback_fn):
3024     """Shutdown the instance.
3025
3026     """
3027     instance = self.instance
3028     node_current = instance.primary_node
3029     self.cfg.MarkInstanceDown(instance.name)
3030     result = self.rpc.call_instance_shutdown(node_current, instance)
3031     msg = result.RemoteFailMsg()
3032     if msg:
3033       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3034
3035     _ShutdownInstanceDisks(self, instance)
3036
3037
3038 class LUReinstallInstance(LogicalUnit):
3039   """Reinstall an instance.
3040
3041   """
3042   HPATH = "instance-reinstall"
3043   HTYPE = constants.HTYPE_INSTANCE
3044   _OP_REQP = ["instance_name"]
3045   REQ_BGL = False
3046
3047   def ExpandNames(self):
3048     self._ExpandAndLockInstance()
3049
3050   def BuildHooksEnv(self):
3051     """Build hooks env.
3052
3053     This runs on master, primary and secondary nodes of the instance.
3054
3055     """
3056     env = _BuildInstanceHookEnvByObject(self, self.instance)
3057     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3058     return env, nl, nl
3059
3060   def CheckPrereq(self):
3061     """Check prerequisites.
3062
3063     This checks that the instance is in the cluster and is not running.
3064
3065     """
3066     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3067     assert instance is not None, \
3068       "Cannot retrieve locked instance %s" % self.op.instance_name
3069     _CheckNodeOnline(self, instance.primary_node)
3070
3071     if instance.disk_template == constants.DT_DISKLESS:
3072       raise errors.OpPrereqError("Instance '%s' has no disks" %
3073                                  self.op.instance_name)
3074     if instance.admin_up:
3075       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3076                                  self.op.instance_name)
3077     remote_info = self.rpc.call_instance_info(instance.primary_node,
3078                                               instance.name,
3079                                               instance.hypervisor)
3080     remote_info.Raise()
3081     if remote_info.data:
3082       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3083                                  (self.op.instance_name,
3084                                   instance.primary_node))
3085
3086     self.op.os_type = getattr(self.op, "os_type", None)
3087     if self.op.os_type is not None:
3088       # OS verification
3089       pnode = self.cfg.GetNodeInfo(
3090         self.cfg.ExpandNodeName(instance.primary_node))
3091       if pnode is None:
3092         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3093                                    self.op.pnode)
3094       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3095       result.Raise()
3096       if not isinstance(result.data, objects.OS):
3097         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3098                                    " primary node"  % self.op.os_type)
3099
3100     self.instance = instance
3101
3102   def Exec(self, feedback_fn):
3103     """Reinstall the instance.
3104
3105     """
3106     inst = self.instance
3107
3108     if self.op.os_type is not None:
3109       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3110       inst.os = self.op.os_type
3111       self.cfg.Update(inst)
3112
3113     _StartInstanceDisks(self, inst, None)
3114     try:
3115       feedback_fn("Running the instance OS create scripts...")
3116       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3117       msg = result.RemoteFailMsg()
3118       if msg:
3119         raise errors.OpExecError("Could not install OS for instance %s"
3120                                  " on node %s: %s" %
3121                                  (inst.name, inst.primary_node, msg))
3122     finally:
3123       _ShutdownInstanceDisks(self, inst)
3124
3125
3126 class LURenameInstance(LogicalUnit):
3127   """Rename an instance.
3128
3129   """
3130   HPATH = "instance-rename"
3131   HTYPE = constants.HTYPE_INSTANCE
3132   _OP_REQP = ["instance_name", "new_name"]
3133
3134   def BuildHooksEnv(self):
3135     """Build hooks env.
3136
3137     This runs on master, primary and secondary nodes of the instance.
3138
3139     """
3140     env = _BuildInstanceHookEnvByObject(self, self.instance)
3141     env["INSTANCE_NEW_NAME"] = self.op.new_name
3142     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3143     return env, nl, nl
3144
3145   def CheckPrereq(self):
3146     """Check prerequisites.
3147
3148     This checks that the instance is in the cluster and is not running.
3149
3150     """
3151     instance = self.cfg.GetInstanceInfo(
3152       self.cfg.ExpandInstanceName(self.op.instance_name))
3153     if instance is None:
3154       raise errors.OpPrereqError("Instance '%s' not known" %
3155                                  self.op.instance_name)
3156     _CheckNodeOnline(self, instance.primary_node)
3157
3158     if instance.admin_up:
3159       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3160                                  self.op.instance_name)
3161     remote_info = self.rpc.call_instance_info(instance.primary_node,
3162                                               instance.name,
3163                                               instance.hypervisor)
3164     remote_info.Raise()
3165     if remote_info.data:
3166       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3167                                  (self.op.instance_name,
3168                                   instance.primary_node))
3169     self.instance = instance
3170
3171     # new name verification
3172     name_info = utils.HostInfo(self.op.new_name)
3173
3174     self.op.new_name = new_name = name_info.name
3175     instance_list = self.cfg.GetInstanceList()
3176     if new_name in instance_list:
3177       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3178                                  new_name)
3179
3180     if not getattr(self.op, "ignore_ip", False):
3181       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3182         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3183                                    (name_info.ip, new_name))
3184
3185
3186   def Exec(self, feedback_fn):
3187     """Reinstall the instance.
3188
3189     """
3190     inst = self.instance
3191     old_name = inst.name
3192
3193     if inst.disk_template == constants.DT_FILE:
3194       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3195
3196     self.cfg.RenameInstance(inst.name, self.op.new_name)
3197     # Change the instance lock. This is definitely safe while we hold the BGL
3198     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3199     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3200
3201     # re-read the instance from the configuration after rename
3202     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3203
3204     if inst.disk_template == constants.DT_FILE:
3205       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3206       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3207                                                      old_file_storage_dir,
3208                                                      new_file_storage_dir)
3209       result.Raise()
3210       if not result.data:
3211         raise errors.OpExecError("Could not connect to node '%s' to rename"
3212                                  " directory '%s' to '%s' (but the instance"
3213                                  " has been renamed in Ganeti)" % (
3214                                  inst.primary_node, old_file_storage_dir,
3215                                  new_file_storage_dir))
3216
3217       if not result.data[0]:
3218         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3219                                  " (but the instance has been renamed in"
3220                                  " Ganeti)" % (old_file_storage_dir,
3221                                                new_file_storage_dir))
3222
3223     _StartInstanceDisks(self, inst, None)
3224     try:
3225       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3226                                                  old_name)
3227       msg = result.RemoteFailMsg()
3228       if msg:
3229         msg = ("Could not run OS rename script for instance %s on node %s"
3230                " (but the instance has been renamed in Ganeti): %s" %
3231                (inst.name, inst.primary_node, msg))
3232         self.proc.LogWarning(msg)
3233     finally:
3234       _ShutdownInstanceDisks(self, inst)
3235
3236
3237 class LURemoveInstance(LogicalUnit):
3238   """Remove an instance.
3239
3240   """
3241   HPATH = "instance-remove"
3242   HTYPE = constants.HTYPE_INSTANCE
3243   _OP_REQP = ["instance_name", "ignore_failures"]
3244   REQ_BGL = False
3245
3246   def ExpandNames(self):
3247     self._ExpandAndLockInstance()
3248     self.needed_locks[locking.LEVEL_NODE] = []
3249     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3250
3251   def DeclareLocks(self, level):
3252     if level == locking.LEVEL_NODE:
3253       self._LockInstancesNodes()
3254
3255   def BuildHooksEnv(self):
3256     """Build hooks env.
3257
3258     This runs on master, primary and secondary nodes of the instance.
3259
3260     """
3261     env = _BuildInstanceHookEnvByObject(self, self.instance)
3262     nl = [self.cfg.GetMasterNode()]
3263     return env, nl, nl
3264
3265   def CheckPrereq(self):
3266     """Check prerequisites.
3267
3268     This checks that the instance is in the cluster.
3269
3270     """
3271     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3272     assert self.instance is not None, \
3273       "Cannot retrieve locked instance %s" % self.op.instance_name
3274
3275   def Exec(self, feedback_fn):
3276     """Remove the instance.
3277
3278     """
3279     instance = self.instance
3280     logging.info("Shutting down instance %s on node %s",
3281                  instance.name, instance.primary_node)
3282
3283     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3284     msg = result.RemoteFailMsg()
3285     if msg:
3286       if self.op.ignore_failures:
3287         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3288       else:
3289         raise errors.OpExecError("Could not shutdown instance %s on"
3290                                  " node %s: %s" %
3291                                  (instance.name, instance.primary_node, msg))
3292
3293     logging.info("Removing block devices for instance %s", instance.name)
3294
3295     if not _RemoveDisks(self, instance):
3296       if self.op.ignore_failures:
3297         feedback_fn("Warning: can't remove instance's disks")
3298       else:
3299         raise errors.OpExecError("Can't remove instance's disks")
3300
3301     logging.info("Removing instance %s out of cluster config", instance.name)
3302
3303     self.cfg.RemoveInstance(instance.name)
3304     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3305
3306
3307 class LUQueryInstances(NoHooksLU):
3308   """Logical unit for querying instances.
3309
3310   """
3311   _OP_REQP = ["output_fields", "names", "use_locking"]
3312   REQ_BGL = False
3313   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3314                                     "admin_state",
3315                                     "disk_template", "ip", "mac", "bridge",
3316                                     "sda_size", "sdb_size", "vcpus", "tags",
3317                                     "network_port", "beparams",
3318                                     r"(disk)\.(size)/([0-9]+)",
3319                                     r"(disk)\.(sizes)", "disk_usage",
3320                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3321                                     r"(nic)\.(macs|ips|bridges)",
3322                                     r"(disk|nic)\.(count)",
3323                                     "serial_no", "hypervisor", "hvparams",] +
3324                                   ["hv/%s" % name
3325                                    for name in constants.HVS_PARAMETERS] +
3326                                   ["be/%s" % name
3327                                    for name in constants.BES_PARAMETERS])
3328   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3329
3330
3331   def ExpandNames(self):
3332     _CheckOutputFields(static=self._FIELDS_STATIC,
3333                        dynamic=self._FIELDS_DYNAMIC,
3334                        selected=self.op.output_fields)
3335
3336     self.needed_locks = {}
3337     self.share_locks[locking.LEVEL_INSTANCE] = 1
3338     self.share_locks[locking.LEVEL_NODE] = 1
3339
3340     if self.op.names:
3341       self.wanted = _GetWantedInstances(self, self.op.names)
3342     else:
3343       self.wanted = locking.ALL_SET
3344
3345     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3346     self.do_locking = self.do_node_query and self.op.use_locking
3347     if self.do_locking:
3348       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3349       self.needed_locks[locking.LEVEL_NODE] = []
3350       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3351
3352   def DeclareLocks(self, level):
3353     if level == locking.LEVEL_NODE and self.do_locking:
3354       self._LockInstancesNodes()
3355
3356   def CheckPrereq(self):
3357     """Check prerequisites.
3358
3359     """
3360     pass
3361
3362   def Exec(self, feedback_fn):
3363     """Computes the list of nodes and their attributes.
3364
3365     """
3366     all_info = self.cfg.GetAllInstancesInfo()
3367     if self.wanted == locking.ALL_SET:
3368       # caller didn't specify instance names, so ordering is not important
3369       if self.do_locking:
3370         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3371       else:
3372         instance_names = all_info.keys()
3373       instance_names = utils.NiceSort(instance_names)
3374     else:
3375       # caller did specify names, so we must keep the ordering
3376       if self.do_locking:
3377         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3378       else:
3379         tgt_set = all_info.keys()
3380       missing = set(self.wanted).difference(tgt_set)
3381       if missing:
3382         raise errors.OpExecError("Some instances were removed before"
3383                                  " retrieving their data: %s" % missing)
3384       instance_names = self.wanted
3385
3386     instance_list = [all_info[iname] for iname in instance_names]
3387
3388     # begin data gathering
3389
3390     nodes = frozenset([inst.primary_node for inst in instance_list])
3391     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3392
3393     bad_nodes = []
3394     off_nodes = []
3395     if self.do_node_query:
3396       live_data = {}
3397       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3398       for name in nodes:
3399         result = node_data[name]
3400         if result.offline:
3401           # offline nodes will be in both lists
3402           off_nodes.append(name)
3403         if result.failed:
3404           bad_nodes.append(name)
3405         else:
3406           if result.data:
3407             live_data.update(result.data)
3408             # else no instance is alive
3409     else:
3410       live_data = dict([(name, {}) for name in instance_names])
3411
3412     # end data gathering
3413
3414     HVPREFIX = "hv/"
3415     BEPREFIX = "be/"
3416     output = []
3417     for instance in instance_list:
3418       iout = []
3419       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3420       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3421       for field in self.op.output_fields:
3422         st_match = self._FIELDS_STATIC.Matches(field)
3423         if field == "name":
3424           val = instance.name
3425         elif field == "os":
3426           val = instance.os
3427         elif field == "pnode":
3428           val = instance.primary_node
3429         elif field == "snodes":
3430           val = list(instance.secondary_nodes)
3431         elif field == "admin_state":
3432           val = instance.admin_up
3433         elif field == "oper_state":
3434           if instance.primary_node in bad_nodes:
3435             val = None
3436           else:
3437             val = bool(live_data.get(instance.name))
3438         elif field == "status":
3439           if instance.primary_node in off_nodes:
3440             val = "ERROR_nodeoffline"
3441           elif instance.primary_node in bad_nodes:
3442             val = "ERROR_nodedown"
3443           else:
3444             running = bool(live_data.get(instance.name))
3445             if running:
3446               if instance.admin_up:
3447                 val = "running"
3448               else:
3449                 val = "ERROR_up"
3450             else:
3451               if instance.admin_up:
3452                 val = "ERROR_down"
3453               else:
3454                 val = "ADMIN_down"
3455         elif field == "oper_ram":
3456           if instance.primary_node in bad_nodes:
3457             val = None
3458           elif instance.name in live_data:
3459             val = live_data[instance.name].get("memory", "?")
3460           else:
3461             val = "-"
3462         elif field == "vcpus":
3463           val = i_be[constants.BE_VCPUS]
3464         elif field == "disk_template":
3465           val = instance.disk_template
3466         elif field == "ip":
3467           if instance.nics:
3468             val = instance.nics[0].ip
3469           else:
3470             val = None
3471         elif field == "bridge":
3472           if instance.nics:
3473             val = instance.nics[0].bridge
3474           else:
3475             val = None
3476         elif field == "mac":
3477           if instance.nics:
3478             val = instance.nics[0].mac
3479           else:
3480             val = None
3481         elif field == "sda_size" or field == "sdb_size":
3482           idx = ord(field[2]) - ord('a')
3483           try:
3484             val = instance.FindDisk(idx).size
3485           except errors.OpPrereqError:
3486             val = None
3487         elif field == "disk_usage": # total disk usage per node
3488           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3489           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3490         elif field == "tags":
3491           val = list(instance.GetTags())
3492         elif field == "serial_no":
3493           val = instance.serial_no
3494         elif field == "network_port":
3495           val = instance.network_port
3496         elif field == "hypervisor":
3497           val = instance.hypervisor
3498         elif field == "hvparams":
3499           val = i_hv
3500         elif (field.startswith(HVPREFIX) and
3501               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3502           val = i_hv.get(field[len(HVPREFIX):], None)
3503         elif field == "beparams":
3504           val = i_be
3505         elif (field.startswith(BEPREFIX) and
3506               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3507           val = i_be.get(field[len(BEPREFIX):], None)
3508         elif st_match and st_match.groups():
3509           # matches a variable list
3510           st_groups = st_match.groups()
3511           if st_groups and st_groups[0] == "disk":
3512             if st_groups[1] == "count":
3513               val = len(instance.disks)
3514             elif st_groups[1] == "sizes":
3515               val = [disk.size for disk in instance.disks]
3516             elif st_groups[1] == "size":
3517               try:
3518                 val = instance.FindDisk(st_groups[2]).size
3519               except errors.OpPrereqError:
3520                 val = None
3521             else:
3522               assert False, "Unhandled disk parameter"
3523           elif st_groups[0] == "nic":
3524             if st_groups[1] == "count":
3525               val = len(instance.nics)
3526             elif st_groups[1] == "macs":
3527               val = [nic.mac for nic in instance.nics]
3528             elif st_groups[1] == "ips":
3529               val = [nic.ip for nic in instance.nics]
3530             elif st_groups[1] == "bridges":
3531               val = [nic.bridge for nic in instance.nics]
3532             else:
3533               # index-based item
3534               nic_idx = int(st_groups[2])
3535               if nic_idx >= len(instance.nics):
3536                 val = None
3537               else:
3538                 if st_groups[1] == "mac":
3539                   val = instance.nics[nic_idx].mac
3540                 elif st_groups[1] == "ip":
3541                   val = instance.nics[nic_idx].ip
3542                 elif st_groups[1] == "bridge":
3543                   val = instance.nics[nic_idx].bridge
3544                 else:
3545                   assert False, "Unhandled NIC parameter"
3546           else:
3547             assert False, ("Declared but unhandled variable parameter '%s'" %
3548                            field)
3549         else:
3550           assert False, "Declared but unhandled parameter '%s'" % field
3551         iout.append(val)
3552       output.append(iout)
3553
3554     return output
3555
3556
3557 class LUFailoverInstance(LogicalUnit):
3558   """Failover an instance.
3559
3560   """
3561   HPATH = "instance-failover"
3562   HTYPE = constants.HTYPE_INSTANCE
3563   _OP_REQP = ["instance_name", "ignore_consistency"]
3564   REQ_BGL = False
3565
3566   def ExpandNames(self):
3567     self._ExpandAndLockInstance()
3568     self.needed_locks[locking.LEVEL_NODE] = []
3569     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3570
3571   def DeclareLocks(self, level):
3572     if level == locking.LEVEL_NODE:
3573       self._LockInstancesNodes()
3574
3575   def BuildHooksEnv(self):
3576     """Build hooks env.
3577
3578     This runs on master, primary and secondary nodes of the instance.
3579
3580     """
3581     env = {
3582       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3583       }
3584     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3585     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3586     return env, nl, nl
3587
3588   def CheckPrereq(self):
3589     """Check prerequisites.
3590
3591     This checks that the instance is in the cluster.
3592
3593     """
3594     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3595     assert self.instance is not None, \
3596       "Cannot retrieve locked instance %s" % self.op.instance_name
3597
3598     bep = self.cfg.GetClusterInfo().FillBE(instance)
3599     if instance.disk_template not in constants.DTS_NET_MIRROR:
3600       raise errors.OpPrereqError("Instance's disk layout is not"
3601                                  " network mirrored, cannot failover.")
3602
3603     secondary_nodes = instance.secondary_nodes
3604     if not secondary_nodes:
3605       raise errors.ProgrammerError("no secondary node but using "
3606                                    "a mirrored disk template")
3607
3608     target_node = secondary_nodes[0]
3609     _CheckNodeOnline(self, target_node)
3610     _CheckNodeNotDrained(self, target_node)
3611
3612     if instance.admin_up:
3613       # check memory requirements on the secondary node
3614       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3615                            instance.name, bep[constants.BE_MEMORY],
3616                            instance.hypervisor)
3617     else:
3618       self.LogInfo("Not checking memory on the secondary node as"
3619                    " instance will not be started")
3620
3621     # check bridge existence
3622     brlist = [nic.bridge for nic in instance.nics]
3623     result = self.rpc.call_bridges_exist(target_node, brlist)
3624     result.Raise()
3625     if not result.data:
3626       raise errors.OpPrereqError("One or more target bridges %s does not"
3627                                  " exist on destination node '%s'" %
3628                                  (brlist, target_node))
3629
3630   def Exec(self, feedback_fn):
3631     """Failover an instance.
3632
3633     The failover is done by shutting it down on its present node and
3634     starting it on the secondary.
3635
3636     """
3637     instance = self.instance
3638
3639     source_node = instance.primary_node
3640     target_node = instance.secondary_nodes[0]
3641
3642     feedback_fn("* checking disk consistency between source and target")
3643     for dev in instance.disks:
3644       # for drbd, these are drbd over lvm
3645       if not _CheckDiskConsistency(self, dev, target_node, False):
3646         if instance.admin_up and not self.op.ignore_consistency:
3647           raise errors.OpExecError("Disk %s is degraded on target node,"
3648                                    " aborting failover." % dev.iv_name)
3649
3650     feedback_fn("* shutting down instance on source node")
3651     logging.info("Shutting down instance %s on node %s",
3652                  instance.name, source_node)
3653
3654     result = self.rpc.call_instance_shutdown(source_node, instance)
3655     msg = result.RemoteFailMsg()
3656     if msg:
3657       if self.op.ignore_consistency:
3658         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3659                              " Proceeding anyway. Please make sure node"
3660                              " %s is down. Error details: %s",
3661                              instance.name, source_node, source_node, msg)
3662       else:
3663         raise errors.OpExecError("Could not shutdown instance %s on"
3664                                  " node %s: %s" %
3665                                  (instance.name, source_node, msg))
3666
3667     feedback_fn("* deactivating the instance's disks on source node")
3668     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3669       raise errors.OpExecError("Can't shut down the instance's disks.")
3670
3671     instance.primary_node = target_node
3672     # distribute new instance config to the other nodes
3673     self.cfg.Update(instance)
3674
3675     # Only start the instance if it's marked as up
3676     if instance.admin_up:
3677       feedback_fn("* activating the instance's disks on target node")
3678       logging.info("Starting instance %s on node %s",
3679                    instance.name, target_node)
3680
3681       disks_ok, _ = _AssembleInstanceDisks(self, instance,
3682                                                ignore_secondaries=True)
3683       if not disks_ok:
3684         _ShutdownInstanceDisks(self, instance)
3685         raise errors.OpExecError("Can't activate the instance's disks")
3686
3687       feedback_fn("* starting the instance on the target node")
3688       result = self.rpc.call_instance_start(target_node, instance, None, None)
3689       msg = result.RemoteFailMsg()
3690       if msg:
3691         _ShutdownInstanceDisks(self, instance)
3692         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3693                                  (instance.name, target_node, msg))
3694
3695
3696 class LUMigrateInstance(LogicalUnit):
3697   """Migrate an instance.
3698
3699   This is migration without shutting down, compared to the failover,
3700   which is done with shutdown.
3701
3702   """
3703   HPATH = "instance-migrate"
3704   HTYPE = constants.HTYPE_INSTANCE
3705   _OP_REQP = ["instance_name", "live", "cleanup"]
3706
3707   REQ_BGL = False
3708
3709   def ExpandNames(self):
3710     self._ExpandAndLockInstance()
3711     self.needed_locks[locking.LEVEL_NODE] = []
3712     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3713
3714   def DeclareLocks(self, level):
3715     if level == locking.LEVEL_NODE:
3716       self._LockInstancesNodes()
3717
3718   def BuildHooksEnv(self):
3719     """Build hooks env.
3720
3721     This runs on master, primary and secondary nodes of the instance.
3722
3723     """
3724     env = _BuildInstanceHookEnvByObject(self, self.instance)
3725     env["MIGRATE_LIVE"] = self.op.live
3726     env["MIGRATE_CLEANUP"] = self.op.cleanup
3727     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3728     return env, nl, nl
3729
3730   def CheckPrereq(self):
3731     """Check prerequisites.
3732
3733     This checks that the instance is in the cluster.
3734
3735     """
3736     instance = self.cfg.GetInstanceInfo(
3737       self.cfg.ExpandInstanceName(self.op.instance_name))
3738     if instance is None:
3739       raise errors.OpPrereqError("Instance '%s' not known" %
3740                                  self.op.instance_name)
3741
3742     if instance.disk_template != constants.DT_DRBD8:
3743       raise errors.OpPrereqError("Instance's disk layout is not"
3744                                  " drbd8, cannot migrate.")
3745
3746     secondary_nodes = instance.secondary_nodes
3747     if not secondary_nodes:
3748       raise errors.ConfigurationError("No secondary node but using"
3749                                       " drbd8 disk template")
3750
3751     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3752
3753     target_node = secondary_nodes[0]
3754     # check memory requirements on the secondary node
3755     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3756                          instance.name, i_be[constants.BE_MEMORY],
3757                          instance.hypervisor)
3758
3759     # check bridge existence
3760     brlist = [nic.bridge for nic in instance.nics]
3761     result = self.rpc.call_bridges_exist(target_node, brlist)
3762     if result.failed or not result.data:
3763       raise errors.OpPrereqError("One or more target bridges %s does not"
3764                                  " exist on destination node '%s'" %
3765                                  (brlist, target_node))
3766
3767     if not self.op.cleanup:
3768       _CheckNodeNotDrained(self, target_node)
3769       result = self.rpc.call_instance_migratable(instance.primary_node,
3770                                                  instance)
3771       msg = result.RemoteFailMsg()
3772       if msg:
3773         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3774                                    msg)
3775
3776     self.instance = instance
3777
3778   def _WaitUntilSync(self):
3779     """Poll with custom rpc for disk sync.
3780
3781     This uses our own step-based rpc call.
3782
3783     """
3784     self.feedback_fn("* wait until resync is done")
3785     all_done = False
3786     while not all_done:
3787       all_done = True
3788       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3789                                             self.nodes_ip,
3790                                             self.instance.disks)
3791       min_percent = 100
3792       for node, nres in result.items():
3793         msg = nres.RemoteFailMsg()
3794         if msg:
3795           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3796                                    (node, msg))
3797         node_done, node_percent = nres.payload
3798         all_done = all_done and node_done
3799         if node_percent is not None:
3800           min_percent = min(min_percent, node_percent)
3801       if not all_done:
3802         if min_percent < 100:
3803           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3804         time.sleep(2)
3805
3806   def _EnsureSecondary(self, node):
3807     """Demote a node to secondary.
3808
3809     """
3810     self.feedback_fn("* switching node %s to secondary mode" % node)
3811
3812     for dev in self.instance.disks:
3813       self.cfg.SetDiskID(dev, node)
3814
3815     result = self.rpc.call_blockdev_close(node, self.instance.name,
3816                                           self.instance.disks)
3817     msg = result.RemoteFailMsg()
3818     if msg:
3819       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3820                                " error %s" % (node, msg))
3821
3822   def _GoStandalone(self):
3823     """Disconnect from the network.
3824
3825     """
3826     self.feedback_fn("* changing into standalone mode")
3827     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3828                                                self.instance.disks)
3829     for node, nres in result.items():
3830       msg = nres.RemoteFailMsg()
3831       if msg:
3832         raise errors.OpExecError("Cannot disconnect disks node %s,"
3833                                  " error %s" % (node, msg))
3834
3835   def _GoReconnect(self, multimaster):
3836     """Reconnect to the network.
3837
3838     """
3839     if multimaster:
3840       msg = "dual-master"
3841     else:
3842       msg = "single-master"
3843     self.feedback_fn("* changing disks into %s mode" % msg)
3844     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3845                                            self.instance.disks,
3846                                            self.instance.name, multimaster)
3847     for node, nres in result.items():
3848       msg = nres.RemoteFailMsg()
3849       if msg:
3850         raise errors.OpExecError("Cannot change disks config on node %s,"
3851                                  " error: %s" % (node, msg))
3852
3853   def _ExecCleanup(self):
3854     """Try to cleanup after a failed migration.
3855
3856     The cleanup is done by:
3857       - check that the instance is running only on one node
3858         (and update the config if needed)
3859       - change disks on its secondary node to secondary
3860       - wait until disks are fully synchronized
3861       - disconnect from the network
3862       - change disks into single-master mode
3863       - wait again until disks are fully synchronized
3864
3865     """
3866     instance = self.instance
3867     target_node = self.target_node
3868     source_node = self.source_node
3869
3870     # check running on only one node
3871     self.feedback_fn("* checking where the instance actually runs"
3872                      " (if this hangs, the hypervisor might be in"
3873                      " a bad state)")
3874     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3875     for node, result in ins_l.items():
3876       result.Raise()
3877       if not isinstance(result.data, list):
3878         raise errors.OpExecError("Can't contact node '%s'" % node)
3879
3880     runningon_source = instance.name in ins_l[source_node].data
3881     runningon_target = instance.name in ins_l[target_node].data
3882
3883     if runningon_source and runningon_target:
3884       raise errors.OpExecError("Instance seems to be running on two nodes,"
3885                                " or the hypervisor is confused. You will have"
3886                                " to ensure manually that it runs only on one"
3887                                " and restart this operation.")
3888
3889     if not (runningon_source or runningon_target):
3890       raise errors.OpExecError("Instance does not seem to be running at all."
3891                                " In this case, it's safer to repair by"
3892                                " running 'gnt-instance stop' to ensure disk"
3893                                " shutdown, and then restarting it.")
3894
3895     if runningon_target:
3896       # the migration has actually succeeded, we need to update the config
3897       self.feedback_fn("* instance running on secondary node (%s),"
3898                        " updating config" % target_node)
3899       instance.primary_node = target_node
3900       self.cfg.Update(instance)
3901       demoted_node = source_node
3902     else:
3903       self.feedback_fn("* instance confirmed to be running on its"
3904                        " primary node (%s)" % source_node)
3905       demoted_node = target_node
3906
3907     self._EnsureSecondary(demoted_node)
3908     try:
3909       self._WaitUntilSync()
3910     except errors.OpExecError:
3911       # we ignore here errors, since if the device is standalone, it
3912       # won't be able to sync
3913       pass
3914     self._GoStandalone()
3915     self._GoReconnect(False)
3916     self._WaitUntilSync()
3917
3918     self.feedback_fn("* done")
3919
3920   def _RevertDiskStatus(self):
3921     """Try to revert the disk status after a failed migration.
3922
3923     """
3924     target_node = self.target_node
3925     try:
3926       self._EnsureSecondary(target_node)
3927       self._GoStandalone()
3928       self._GoReconnect(False)
3929       self._WaitUntilSync()
3930     except errors.OpExecError, err:
3931       self.LogWarning("Migration failed and I can't reconnect the"
3932                       " drives: error '%s'\n"
3933                       "Please look and recover the instance status" %
3934                       str(err))
3935
3936   def _AbortMigration(self):
3937     """Call the hypervisor code to abort a started migration.
3938
3939     """
3940     instance = self.instance
3941     target_node = self.target_node
3942     migration_info = self.migration_info
3943
3944     abort_result = self.rpc.call_finalize_migration(target_node,
3945                                                     instance,
3946                                                     migration_info,
3947                                                     False)
3948     abort_msg = abort_result.RemoteFailMsg()
3949     if abort_msg:
3950       logging.error("Aborting migration failed on target node %s: %s" %
3951                     (target_node, abort_msg))
3952       # Don't raise an exception here, as we stil have to try to revert the
3953       # disk status, even if this step failed.
3954
3955   def _ExecMigration(self):
3956     """Migrate an instance.
3957
3958     The migrate is done by:
3959       - change the disks into dual-master mode
3960       - wait until disks are fully synchronized again
3961       - migrate the instance
3962       - change disks on the new secondary node (the old primary) to secondary
3963       - wait until disks are fully synchronized
3964       - change disks into single-master mode
3965
3966     """
3967     instance = self.instance
3968     target_node = self.target_node
3969     source_node = self.source_node
3970
3971     self.feedback_fn("* checking disk consistency between source and target")
3972     for dev in instance.disks:
3973       if not _CheckDiskConsistency(self, dev, target_node, False):
3974         raise errors.OpExecError("Disk %s is degraded or not fully"
3975                                  " synchronized on target node,"
3976                                  " aborting migrate." % dev.iv_name)
3977
3978     # First get the migration information from the remote node
3979     result = self.rpc.call_migration_info(source_node, instance)
3980     msg = result.RemoteFailMsg()
3981     if msg:
3982       log_err = ("Failed fetching source migration information from %s: %s" %
3983                  (source_node, msg))
3984       logging.error(log_err)
3985       raise errors.OpExecError(log_err)
3986
3987     self.migration_info = migration_info = result.payload
3988
3989     # Then switch the disks to master/master mode
3990     self._EnsureSecondary(target_node)
3991     self._GoStandalone()
3992     self._GoReconnect(True)
3993     self._WaitUntilSync()
3994
3995     self.feedback_fn("* preparing %s to accept the instance" % target_node)
3996     result = self.rpc.call_accept_instance(target_node,
3997                                            instance,
3998                                            migration_info,
3999                                            self.nodes_ip[target_node])
4000
4001     msg = result.RemoteFailMsg()
4002     if msg:
4003       logging.error("Instance pre-migration failed, trying to revert"
4004                     " disk status: %s", msg)
4005       self._AbortMigration()
4006       self._RevertDiskStatus()
4007       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4008                                (instance.name, msg))
4009
4010     self.feedback_fn("* migrating instance to %s" % target_node)
4011     time.sleep(10)
4012     result = self.rpc.call_instance_migrate(source_node, instance,
4013                                             self.nodes_ip[target_node],
4014                                             self.op.live)
4015     msg = result.RemoteFailMsg()
4016     if msg:
4017       logging.error("Instance migration failed, trying to revert"
4018                     " disk status: %s", msg)
4019       self._AbortMigration()
4020       self._RevertDiskStatus()
4021       raise errors.OpExecError("Could not migrate instance %s: %s" %
4022                                (instance.name, msg))
4023     time.sleep(10)
4024
4025     instance.primary_node = target_node
4026     # distribute new instance config to the other nodes
4027     self.cfg.Update(instance)
4028
4029     result = self.rpc.call_finalize_migration(target_node,
4030                                               instance,
4031                                               migration_info,
4032                                               True)
4033     msg = result.RemoteFailMsg()
4034     if msg:
4035       logging.error("Instance migration succeeded, but finalization failed:"
4036                     " %s" % msg)
4037       raise errors.OpExecError("Could not finalize instance migration: %s" %
4038                                msg)
4039
4040     self._EnsureSecondary(source_node)
4041     self._WaitUntilSync()
4042     self._GoStandalone()
4043     self._GoReconnect(False)
4044     self._WaitUntilSync()
4045
4046     self.feedback_fn("* done")
4047
4048   def Exec(self, feedback_fn):
4049     """Perform the migration.
4050
4051     """
4052     self.feedback_fn = feedback_fn
4053
4054     self.source_node = self.instance.primary_node
4055     self.target_node = self.instance.secondary_nodes[0]
4056     self.all_nodes = [self.source_node, self.target_node]
4057     self.nodes_ip = {
4058       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4059       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4060       }
4061     if self.op.cleanup:
4062       return self._ExecCleanup()
4063     else:
4064       return self._ExecMigration()
4065
4066
4067 def _CreateBlockDev(lu, node, instance, device, force_create,
4068                     info, force_open):
4069   """Create a tree of block devices on a given node.
4070
4071   If this device type has to be created on secondaries, create it and
4072   all its children.
4073
4074   If not, just recurse to children keeping the same 'force' value.
4075
4076   @param lu: the lu on whose behalf we execute
4077   @param node: the node on which to create the device
4078   @type instance: L{objects.Instance}
4079   @param instance: the instance which owns the device
4080   @type device: L{objects.Disk}
4081   @param device: the device to create
4082   @type force_create: boolean
4083   @param force_create: whether to force creation of this device; this
4084       will be change to True whenever we find a device which has
4085       CreateOnSecondary() attribute
4086   @param info: the extra 'metadata' we should attach to the device
4087       (this will be represented as a LVM tag)
4088   @type force_open: boolean
4089   @param force_open: this parameter will be passes to the
4090       L{backend.BlockdevCreate} function where it specifies
4091       whether we run on primary or not, and it affects both
4092       the child assembly and the device own Open() execution
4093
4094   """
4095   if device.CreateOnSecondary():
4096     force_create = True
4097
4098   if device.children:
4099     for child in device.children:
4100       _CreateBlockDev(lu, node, instance, child, force_create,
4101                       info, force_open)
4102
4103   if not force_create:
4104     return
4105
4106   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4107
4108
4109 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4110   """Create a single block device on a given node.
4111
4112   This will not recurse over children of the device, so they must be
4113   created in advance.
4114
4115   @param lu: the lu on whose behalf we execute
4116   @param node: the node on which to create the device
4117   @type instance: L{objects.Instance}
4118   @param instance: the instance which owns the device
4119   @type device: L{objects.Disk}
4120   @param device: the device to create
4121   @param info: the extra 'metadata' we should attach to the device
4122       (this will be represented as a LVM tag)
4123   @type force_open: boolean
4124   @param force_open: this parameter will be passes to the
4125       L{backend.BlockdevCreate} function where it specifies
4126       whether we run on primary or not, and it affects both
4127       the child assembly and the device own Open() execution
4128
4129   """
4130   lu.cfg.SetDiskID(device, node)
4131   result = lu.rpc.call_blockdev_create(node, device, device.size,
4132                                        instance.name, force_open, info)
4133   msg = result.RemoteFailMsg()
4134   if msg:
4135     raise errors.OpExecError("Can't create block device %s on"
4136                              " node %s for instance %s: %s" %
4137                              (device, node, instance.name, msg))
4138   if device.physical_id is None:
4139     device.physical_id = result.payload
4140
4141
4142 def _GenerateUniqueNames(lu, exts):
4143   """Generate a suitable LV name.
4144
4145   This will generate a logical volume name for the given instance.
4146
4147   """
4148   results = []
4149   for val in exts:
4150     new_id = lu.cfg.GenerateUniqueID()
4151     results.append("%s%s" % (new_id, val))
4152   return results
4153
4154
4155 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4156                          p_minor, s_minor):
4157   """Generate a drbd8 device complete with its children.
4158
4159   """
4160   port = lu.cfg.AllocatePort()
4161   vgname = lu.cfg.GetVGName()
4162   shared_secret = lu.cfg.GenerateDRBDSecret()
4163   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4164                           logical_id=(vgname, names[0]))
4165   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4166                           logical_id=(vgname, names[1]))
4167   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4168                           logical_id=(primary, secondary, port,
4169                                       p_minor, s_minor,
4170                                       shared_secret),
4171                           children=[dev_data, dev_meta],
4172                           iv_name=iv_name)
4173   return drbd_dev
4174
4175
4176 def _GenerateDiskTemplate(lu, template_name,
4177                           instance_name, primary_node,
4178                           secondary_nodes, disk_info,
4179                           file_storage_dir, file_driver,
4180                           base_index):
4181   """Generate the entire disk layout for a given template type.
4182
4183   """
4184   #TODO: compute space requirements
4185
4186   vgname = lu.cfg.GetVGName()
4187   disk_count = len(disk_info)
4188   disks = []
4189   if template_name == constants.DT_DISKLESS:
4190     pass
4191   elif template_name == constants.DT_PLAIN:
4192     if len(secondary_nodes) != 0:
4193       raise errors.ProgrammerError("Wrong template configuration")
4194
4195     names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4196                                       for i in range(disk_count)])
4197     for idx, disk in enumerate(disk_info):
4198       disk_index = idx + base_index
4199       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4200                               logical_id=(vgname, names[idx]),
4201                               iv_name="disk/%d" % disk_index,
4202                               mode=disk["mode"])
4203       disks.append(disk_dev)
4204   elif template_name == constants.DT_DRBD8:
4205     if len(secondary_nodes) != 1:
4206       raise errors.ProgrammerError("Wrong template configuration")
4207     remote_node = secondary_nodes[0]
4208     minors = lu.cfg.AllocateDRBDMinor(
4209       [primary_node, remote_node] * len(disk_info), instance_name)
4210
4211     names = []
4212     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4213                                                for i in range(disk_count)]):
4214       names.append(lv_prefix + "_data")
4215       names.append(lv_prefix + "_meta")
4216     for idx, disk in enumerate(disk_info):
4217       disk_index = idx + base_index
4218       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4219                                       disk["size"], names[idx*2:idx*2+2],
4220                                       "disk/%d" % disk_index,
4221                                       minors[idx*2], minors[idx*2+1])
4222       disk_dev.mode = disk["mode"]
4223       disks.append(disk_dev)
4224   elif template_name == constants.DT_FILE:
4225     if len(secondary_nodes) != 0:
4226       raise errors.ProgrammerError("Wrong template configuration")
4227
4228     for idx, disk in enumerate(disk_info):
4229       disk_index = idx + base_index
4230       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4231                               iv_name="disk/%d" % disk_index,
4232                               logical_id=(file_driver,
4233                                           "%s/disk%d" % (file_storage_dir,
4234                                                          disk_index)),
4235                               mode=disk["mode"])
4236       disks.append(disk_dev)
4237   else:
4238     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4239   return disks
4240
4241
4242 def _GetInstanceInfoText(instance):
4243   """Compute that text that should be added to the disk's metadata.
4244
4245   """
4246   return "originstname+%s" % instance.name
4247
4248
4249 def _CreateDisks(lu, instance):
4250   """Create all disks for an instance.
4251
4252   This abstracts away some work from AddInstance.
4253
4254   @type lu: L{LogicalUnit}
4255   @param lu: the logical unit on whose behalf we execute
4256   @type instance: L{objects.Instance}
4257   @param instance: the instance whose disks we should create
4258   @rtype: boolean
4259   @return: the success of the creation
4260
4261   """
4262   info = _GetInstanceInfoText(instance)
4263   pnode = instance.primary_node
4264
4265   if instance.disk_template == constants.DT_FILE:
4266     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4267     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4268
4269     if result.failed or not result.data:
4270       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4271
4272     if not result.data[0]:
4273       raise errors.OpExecError("Failed to create directory '%s'" %
4274                                file_storage_dir)
4275
4276   # Note: this needs to be kept in sync with adding of disks in
4277   # LUSetInstanceParams
4278   for device in instance.disks:
4279     logging.info("Creating volume %s for instance %s",
4280                  device.iv_name, instance.name)
4281     #HARDCODE
4282     for node in instance.all_nodes:
4283       f_create = node == pnode
4284       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4285
4286
4287 def _RemoveDisks(lu, instance):
4288   """Remove all disks for an instance.
4289
4290   This abstracts away some work from `AddInstance()` and
4291   `RemoveInstance()`. Note that in case some of the devices couldn't
4292   be removed, the removal will continue with the other ones (compare
4293   with `_CreateDisks()`).
4294
4295   @type lu: L{LogicalUnit}
4296   @param lu: the logical unit on whose behalf we execute
4297   @type instance: L{objects.Instance}
4298   @param instance: the instance whose disks we should remove
4299   @rtype: boolean
4300   @return: the success of the removal
4301
4302   """
4303   logging.info("Removing block devices for instance %s", instance.name)
4304
4305   all_result = True
4306   for device in instance.disks:
4307     for node, disk in device.ComputeNodeTree(instance.primary_node):
4308       lu.cfg.SetDiskID(disk, node)
4309       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4310       if msg:
4311         lu.LogWarning("Could not remove block device %s on node %s,"
4312                       " continuing anyway: %s", device.iv_name, node, msg)
4313         all_result = False
4314
4315   if instance.disk_template == constants.DT_FILE:
4316     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4317     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4318                                                  file_storage_dir)
4319     if result.failed or not result.data:
4320       logging.error("Could not remove directory '%s'", file_storage_dir)
4321       all_result = False
4322
4323   return all_result
4324
4325
4326 def _ComputeDiskSize(disk_template, disks):
4327   """Compute disk size requirements in the volume group
4328
4329   """
4330   # Required free disk space as a function of disk and swap space
4331   req_size_dict = {
4332     constants.DT_DISKLESS: None,
4333     constants.DT_PLAIN: sum(d["size"] for d in disks),
4334     # 128 MB are added for drbd metadata for each disk
4335     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4336     constants.DT_FILE: None,
4337   }
4338
4339   if disk_template not in req_size_dict:
4340     raise errors.ProgrammerError("Disk template '%s' size requirement"
4341                                  " is unknown" %  disk_template)
4342
4343   return req_size_dict[disk_template]
4344
4345
4346 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4347   """Hypervisor parameter validation.
4348
4349   This function abstract the hypervisor parameter validation to be
4350   used in both instance create and instance modify.
4351
4352   @type lu: L{LogicalUnit}
4353   @param lu: the logical unit for which we check
4354   @type nodenames: list
4355   @param nodenames: the list of nodes on which we should check
4356   @type hvname: string
4357   @param hvname: the name of the hypervisor we should use
4358   @type hvparams: dict
4359   @param hvparams: the parameters which we need to check
4360   @raise errors.OpPrereqError: if the parameters are not valid
4361
4362   """
4363   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4364                                                   hvname,
4365                                                   hvparams)
4366   for node in nodenames:
4367     info = hvinfo[node]
4368     if info.offline:
4369       continue
4370     msg = info.RemoteFailMsg()
4371     if msg:
4372       raise errors.OpPrereqError("Hypervisor parameter validation"
4373                                  " failed on node %s: %s" % (node, msg))
4374
4375
4376 class LUCreateInstance(LogicalUnit):
4377   """Create an instance.
4378
4379   """
4380   HPATH = "instance-add"
4381   HTYPE = constants.HTYPE_INSTANCE
4382   _OP_REQP = ["instance_name", "disks", "disk_template",
4383               "mode", "start",
4384               "wait_for_sync", "ip_check", "nics",
4385               "hvparams", "beparams"]
4386   REQ_BGL = False
4387
4388   def _ExpandNode(self, node):
4389     """Expands and checks one node name.
4390
4391     """
4392     node_full = self.cfg.ExpandNodeName(node)
4393     if node_full is None:
4394       raise errors.OpPrereqError("Unknown node %s" % node)
4395     return node_full
4396
4397   def ExpandNames(self):
4398     """ExpandNames for CreateInstance.
4399
4400     Figure out the right locks for instance creation.
4401
4402     """
4403     self.needed_locks = {}
4404
4405     # set optional parameters to none if they don't exist
4406     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4407       if not hasattr(self.op, attr):
4408         setattr(self.op, attr, None)
4409
4410     # cheap checks, mostly valid constants given
4411
4412     # verify creation mode
4413     if self.op.mode not in (constants.INSTANCE_CREATE,
4414                             constants.INSTANCE_IMPORT):
4415       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4416                                  self.op.mode)
4417
4418     # disk template and mirror node verification
4419     if self.op.disk_template not in constants.DISK_TEMPLATES:
4420       raise errors.OpPrereqError("Invalid disk template name")
4421
4422     if self.op.hypervisor is None:
4423       self.op.hypervisor = self.cfg.GetHypervisorType()
4424
4425     cluster = self.cfg.GetClusterInfo()
4426     enabled_hvs = cluster.enabled_hypervisors
4427     if self.op.hypervisor not in enabled_hvs:
4428       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4429                                  " cluster (%s)" % (self.op.hypervisor,
4430                                   ",".join(enabled_hvs)))
4431
4432     # check hypervisor parameter syntax (locally)
4433     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4434     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4435                                   self.op.hvparams)
4436     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4437     hv_type.CheckParameterSyntax(filled_hvp)
4438     self.hv_full = filled_hvp
4439
4440     # fill and remember the beparams dict
4441     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4442     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4443                                     self.op.beparams)
4444
4445     #### instance parameters check
4446
4447     # instance name verification
4448     hostname1 = utils.HostInfo(self.op.instance_name)
4449     self.op.instance_name = instance_name = hostname1.name
4450
4451     # this is just a preventive check, but someone might still add this
4452     # instance in the meantime, and creation will fail at lock-add time
4453     if instance_name in self.cfg.GetInstanceList():
4454       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4455                                  instance_name)
4456
4457     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4458
4459     # NIC buildup
4460     self.nics = []
4461     for nic in self.op.nics:
4462       # ip validity checks
4463       ip = nic.get("ip", None)
4464       if ip is None or ip.lower() == "none":
4465         nic_ip = None
4466       elif ip.lower() == constants.VALUE_AUTO:
4467         nic_ip = hostname1.ip
4468       else:
4469         if not utils.IsValidIP(ip):
4470           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4471                                      " like a valid IP" % ip)
4472         nic_ip = ip
4473
4474       # MAC address verification
4475       mac = nic.get("mac", constants.VALUE_AUTO)
4476       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4477         if not utils.IsValidMac(mac.lower()):
4478           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4479                                      mac)
4480       # bridge verification
4481       bridge = nic.get("bridge", None)
4482       if bridge is None:
4483         bridge = self.cfg.GetDefBridge()
4484       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4485
4486     # disk checks/pre-build
4487     self.disks = []
4488     for disk in self.op.disks:
4489       mode = disk.get("mode", constants.DISK_RDWR)
4490       if mode not in constants.DISK_ACCESS_SET:
4491         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4492                                    mode)
4493       size = disk.get("size", None)
4494       if size is None:
4495         raise errors.OpPrereqError("Missing disk size")
4496       try:
4497         size = int(size)
4498       except ValueError:
4499         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4500       self.disks.append({"size": size, "mode": mode})
4501
4502     # used in CheckPrereq for ip ping check
4503     self.check_ip = hostname1.ip
4504
4505     # file storage checks
4506     if (self.op.file_driver and
4507         not self.op.file_driver in constants.FILE_DRIVER):
4508       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4509                                  self.op.file_driver)
4510
4511     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4512       raise errors.OpPrereqError("File storage directory path not absolute")
4513
4514     ### Node/iallocator related checks
4515     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4516       raise errors.OpPrereqError("One and only one of iallocator and primary"
4517                                  " node must be given")
4518
4519     if self.op.iallocator:
4520       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4521     else:
4522       self.op.pnode = self._ExpandNode(self.op.pnode)
4523       nodelist = [self.op.pnode]
4524       if self.op.snode is not None:
4525         self.op.snode = self._ExpandNode(self.op.snode)
4526         nodelist.append(self.op.snode)
4527       self.needed_locks[locking.LEVEL_NODE] = nodelist
4528
4529     # in case of import lock the source node too
4530     if self.op.mode == constants.INSTANCE_IMPORT:
4531       src_node = getattr(self.op, "src_node", None)
4532       src_path = getattr(self.op, "src_path", None)
4533
4534       if src_path is None:
4535         self.op.src_path = src_path = self.op.instance_name
4536
4537       if src_node is None:
4538         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4539         self.op.src_node = None
4540         if os.path.isabs(src_path):
4541           raise errors.OpPrereqError("Importing an instance from an absolute"
4542                                      " path requires a source node option.")
4543       else:
4544         self.op.src_node = src_node = self._ExpandNode(src_node)
4545         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4546           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4547         if not os.path.isabs(src_path):
4548           self.op.src_path = src_path = \
4549             os.path.join(constants.EXPORT_DIR, src_path)
4550
4551     else: # INSTANCE_CREATE
4552       if getattr(self.op, "os_type", None) is None:
4553         raise errors.OpPrereqError("No guest OS specified")
4554
4555   def _RunAllocator(self):
4556     """Run the allocator based on input opcode.
4557
4558     """
4559     nics = [n.ToDict() for n in self.nics]
4560     ial = IAllocator(self,
4561                      mode=constants.IALLOCATOR_MODE_ALLOC,
4562                      name=self.op.instance_name,
4563                      disk_template=self.op.disk_template,
4564                      tags=[],
4565                      os=self.op.os_type,
4566                      vcpus=self.be_full[constants.BE_VCPUS],
4567                      mem_size=self.be_full[constants.BE_MEMORY],
4568                      disks=self.disks,
4569                      nics=nics,
4570                      hypervisor=self.op.hypervisor,
4571                      )
4572
4573     ial.Run(self.op.iallocator)
4574
4575     if not ial.success:
4576       raise errors.OpPrereqError("Can't compute nodes using"
4577                                  " iallocator '%s': %s" % (self.op.iallocator,
4578                                                            ial.info))
4579     if len(ial.nodes) != ial.required_nodes:
4580       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4581                                  " of nodes (%s), required %s" %
4582                                  (self.op.iallocator, len(ial.nodes),
4583                                   ial.required_nodes))
4584     self.op.pnode = ial.nodes[0]
4585     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4586                  self.op.instance_name, self.op.iallocator,
4587                  ", ".join(ial.nodes))
4588     if ial.required_nodes == 2:
4589       self.op.snode = ial.nodes[1]
4590
4591   def BuildHooksEnv(self):
4592     """Build hooks env.
4593
4594     This runs on master, primary and secondary nodes of the instance.
4595
4596     """
4597     env = {
4598       "ADD_MODE": self.op.mode,
4599       }
4600     if self.op.mode == constants.INSTANCE_IMPORT:
4601       env["SRC_NODE"] = self.op.src_node
4602       env["SRC_PATH"] = self.op.src_path
4603       env["SRC_IMAGES"] = self.src_images
4604
4605     env.update(_BuildInstanceHookEnv(
4606       name=self.op.instance_name,
4607       primary_node=self.op.pnode,
4608       secondary_nodes=self.secondaries,
4609       status=self.op.start,
4610       os_type=self.op.os_type,
4611       memory=self.be_full[constants.BE_MEMORY],
4612       vcpus=self.be_full[constants.BE_VCPUS],
4613       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4614       disk_template=self.op.disk_template,
4615       disks=[(d["size"], d["mode"]) for d in self.disks],
4616       bep=self.be_full,
4617       hvp=self.hv_full,
4618       hypervisor=self.op.hypervisor,
4619     ))
4620
4621     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4622           self.secondaries)
4623     return env, nl, nl
4624
4625
4626   def CheckPrereq(self):
4627     """Check prerequisites.
4628
4629     """
4630     if (not self.cfg.GetVGName() and
4631         self.op.disk_template not in constants.DTS_NOT_LVM):
4632       raise errors.OpPrereqError("Cluster does not support lvm-based"
4633                                  " instances")
4634
4635     if self.op.mode == constants.INSTANCE_IMPORT:
4636       src_node = self.op.src_node
4637       src_path = self.op.src_path
4638
4639       if src_node is None:
4640         exp_list = self.rpc.call_export_list(
4641           self.acquired_locks[locking.LEVEL_NODE])
4642         found = False
4643         for node in exp_list:
4644           if not exp_list[node].failed and src_path in exp_list[node].data:
4645             found = True
4646             self.op.src_node = src_node = node
4647             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4648                                                        src_path)
4649             break
4650         if not found:
4651           raise errors.OpPrereqError("No export found for relative path %s" %
4652                                       src_path)
4653
4654       _CheckNodeOnline(self, src_node)
4655       result = self.rpc.call_export_info(src_node, src_path)
4656       result.Raise()
4657       if not result.data:
4658         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4659
4660       export_info = result.data
4661       if not export_info.has_section(constants.INISECT_EXP):
4662         raise errors.ProgrammerError("Corrupted export config")
4663
4664       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4665       if (int(ei_version) != constants.EXPORT_VERSION):
4666         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4667                                    (ei_version, constants.EXPORT_VERSION))
4668
4669       # Check that the new instance doesn't have less disks than the export
4670       instance_disks = len(self.disks)
4671       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4672       if instance_disks < export_disks:
4673         raise errors.OpPrereqError("Not enough disks to import."
4674                                    " (instance: %d, export: %d)" %
4675                                    (instance_disks, export_disks))
4676
4677       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4678       disk_images = []
4679       for idx in range(export_disks):
4680         option = 'disk%d_dump' % idx
4681         if export_info.has_option(constants.INISECT_INS, option):
4682           # FIXME: are the old os-es, disk sizes, etc. useful?
4683           export_name = export_info.get(constants.INISECT_INS, option)
4684           image = os.path.join(src_path, export_name)
4685           disk_images.append(image)
4686         else:
4687           disk_images.append(False)
4688
4689       self.src_images = disk_images
4690
4691       old_name = export_info.get(constants.INISECT_INS, 'name')
4692       # FIXME: int() here could throw a ValueError on broken exports
4693       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4694       if self.op.instance_name == old_name:
4695         for idx, nic in enumerate(self.nics):
4696           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4697             nic_mac_ini = 'nic%d_mac' % idx
4698             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4699
4700     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4701     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4702     if self.op.start and not self.op.ip_check:
4703       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4704                                  " adding an instance in start mode")
4705
4706     if self.op.ip_check:
4707       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4708         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4709                                    (self.check_ip, self.op.instance_name))
4710
4711     #### mac address generation
4712     # By generating here the mac address both the allocator and the hooks get
4713     # the real final mac address rather than the 'auto' or 'generate' value.
4714     # There is a race condition between the generation and the instance object
4715     # creation, which means that we know the mac is valid now, but we're not
4716     # sure it will be when we actually add the instance. If things go bad
4717     # adding the instance will abort because of a duplicate mac, and the
4718     # creation job will fail.
4719     for nic in self.nics:
4720       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4721         nic.mac = self.cfg.GenerateMAC()
4722
4723     #### allocator run
4724
4725     if self.op.iallocator is not None:
4726       self._RunAllocator()
4727
4728     #### node related checks
4729
4730     # check primary node
4731     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4732     assert self.pnode is not None, \
4733       "Cannot retrieve locked node %s" % self.op.pnode
4734     if pnode.offline:
4735       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4736                                  pnode.name)
4737     if pnode.drained:
4738       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4739                                  pnode.name)
4740
4741     self.secondaries = []
4742
4743     # mirror node verification
4744     if self.op.disk_template in constants.DTS_NET_MIRROR:
4745       if self.op.snode is None:
4746         raise errors.OpPrereqError("The networked disk templates need"
4747                                    " a mirror node")
4748       if self.op.snode == pnode.name:
4749         raise errors.OpPrereqError("The secondary node cannot be"
4750                                    " the primary node.")
4751       _CheckNodeOnline(self, self.op.snode)
4752       _CheckNodeNotDrained(self, self.op.snode)
4753       self.secondaries.append(self.op.snode)
4754
4755     nodenames = [pnode.name] + self.secondaries
4756
4757     req_size = _ComputeDiskSize(self.op.disk_template,
4758                                 self.disks)
4759
4760     # Check lv size requirements
4761     if req_size is not None:
4762       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4763                                          self.op.hypervisor)
4764       for node in nodenames:
4765         info = nodeinfo[node]
4766         info.Raise()
4767         info = info.data
4768         if not info:
4769           raise errors.OpPrereqError("Cannot get current information"
4770                                      " from node '%s'" % node)
4771         vg_free = info.get('vg_free', None)
4772         if not isinstance(vg_free, int):
4773           raise errors.OpPrereqError("Can't compute free disk space on"
4774                                      " node %s" % node)
4775         if req_size > info['vg_free']:
4776           raise errors.OpPrereqError("Not enough disk space on target node %s."
4777                                      " %d MB available, %d MB required" %
4778                                      (node, info['vg_free'], req_size))
4779
4780     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4781
4782     # os verification
4783     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4784     result.Raise()
4785     if not isinstance(result.data, objects.OS) or not result.data:
4786       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4787                                  " primary node"  % self.op.os_type)
4788
4789     # bridge check on primary node
4790     bridges = [n.bridge for n in self.nics]
4791     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4792     result.Raise()
4793     if not result.data:
4794       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4795                                  " exist on destination node '%s'" %
4796                                  (",".join(bridges), pnode.name))
4797
4798     # memory check on primary node
4799     if self.op.start:
4800       _CheckNodeFreeMemory(self, self.pnode.name,
4801                            "creating instance %s" % self.op.instance_name,
4802                            self.be_full[constants.BE_MEMORY],
4803                            self.op.hypervisor)
4804
4805   def Exec(self, feedback_fn):
4806     """Create and add the instance to the cluster.
4807
4808     """
4809     instance = self.op.instance_name
4810     pnode_name = self.pnode.name
4811
4812     ht_kind = self.op.hypervisor
4813     if ht_kind in constants.HTS_REQ_PORT:
4814       network_port = self.cfg.AllocatePort()
4815     else:
4816       network_port = None
4817
4818     ##if self.op.vnc_bind_address is None:
4819     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4820
4821     # this is needed because os.path.join does not accept None arguments
4822     if self.op.file_storage_dir is None:
4823       string_file_storage_dir = ""
4824     else:
4825       string_file_storage_dir = self.op.file_storage_dir
4826
4827     # build the full file storage dir path
4828     file_storage_dir = os.path.normpath(os.path.join(
4829                                         self.cfg.GetFileStorageDir(),
4830                                         string_file_storage_dir, instance))
4831
4832
4833     disks = _GenerateDiskTemplate(self,
4834                                   self.op.disk_template,
4835                                   instance, pnode_name,
4836                                   self.secondaries,
4837                                   self.disks,
4838                                   file_storage_dir,
4839                                   self.op.file_driver,
4840                                   0)
4841
4842     iobj = objects.Instance(name=instance, os=self.op.os_type,
4843                             primary_node=pnode_name,
4844                             nics=self.nics, disks=disks,
4845                             disk_template=self.op.disk_template,
4846                             admin_up=False,
4847                             network_port=network_port,
4848                             beparams=self.op.beparams,
4849                             hvparams=self.op.hvparams,
4850                             hypervisor=self.op.hypervisor,
4851                             )
4852
4853     feedback_fn("* creating instance disks...")
4854     try:
4855       _CreateDisks(self, iobj)
4856     except errors.OpExecError:
4857       self.LogWarning("Device creation failed, reverting...")
4858       try:
4859         _RemoveDisks(self, iobj)
4860       finally:
4861         self.cfg.ReleaseDRBDMinors(instance)
4862         raise
4863
4864     feedback_fn("adding instance %s to cluster config" % instance)
4865
4866     self.cfg.AddInstance(iobj)
4867     # Declare that we don't want to remove the instance lock anymore, as we've
4868     # added the instance to the config
4869     del self.remove_locks[locking.LEVEL_INSTANCE]
4870     # Unlock all the nodes
4871     if self.op.mode == constants.INSTANCE_IMPORT:
4872       nodes_keep = [self.op.src_node]
4873       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4874                        if node != self.op.src_node]
4875       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4876       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4877     else:
4878       self.context.glm.release(locking.LEVEL_NODE)
4879       del self.acquired_locks[locking.LEVEL_NODE]
4880
4881     if self.op.wait_for_sync:
4882       disk_abort = not _WaitForSync(self, iobj)
4883     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4884       # make sure the disks are not degraded (still sync-ing is ok)
4885       time.sleep(15)
4886       feedback_fn("* checking mirrors status")
4887       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4888     else:
4889       disk_abort = False
4890
4891     if disk_abort:
4892       _RemoveDisks(self, iobj)
4893       self.cfg.RemoveInstance(iobj.name)
4894       # Make sure the instance lock gets removed
4895       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4896       raise errors.OpExecError("There are some degraded disks for"
4897                                " this instance")
4898
4899     feedback_fn("creating os for instance %s on node %s" %
4900                 (instance, pnode_name))
4901
4902     if iobj.disk_template != constants.DT_DISKLESS:
4903       if self.op.mode == constants.INSTANCE_CREATE:
4904         feedback_fn("* running the instance OS create scripts...")
4905         result = self.rpc.call_instance_os_add(pnode_name, iobj)
4906         msg = result.RemoteFailMsg()
4907         if msg:
4908           raise errors.OpExecError("Could not add os for instance %s"
4909                                    " on node %s: %s" %
4910                                    (instance, pnode_name, msg))
4911
4912       elif self.op.mode == constants.INSTANCE_IMPORT:
4913         feedback_fn("* running the instance OS import scripts...")
4914         src_node = self.op.src_node
4915         src_images = self.src_images
4916         cluster_name = self.cfg.GetClusterName()
4917         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4918                                                          src_node, src_images,
4919                                                          cluster_name)
4920         import_result.Raise()
4921         for idx, result in enumerate(import_result.data):
4922           if not result:
4923             self.LogWarning("Could not import the image %s for instance"
4924                             " %s, disk %d, on node %s" %
4925                             (src_images[idx], instance, idx, pnode_name))
4926       else:
4927         # also checked in the prereq part
4928         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4929                                      % self.op.mode)
4930
4931     if self.op.start:
4932       iobj.admin_up = True
4933       self.cfg.Update(iobj)
4934       logging.info("Starting instance %s on node %s", instance, pnode_name)
4935       feedback_fn("* starting instance...")
4936       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4937       msg = result.RemoteFailMsg()
4938       if msg:
4939         raise errors.OpExecError("Could not start instance: %s" % msg)
4940
4941
4942 class LUConnectConsole(NoHooksLU):
4943   """Connect to an instance's console.
4944
4945   This is somewhat special in that it returns the command line that
4946   you need to run on the master node in order to connect to the
4947   console.
4948
4949   """
4950   _OP_REQP = ["instance_name"]
4951   REQ_BGL = False
4952
4953   def ExpandNames(self):
4954     self._ExpandAndLockInstance()
4955
4956   def CheckPrereq(self):
4957     """Check prerequisites.
4958
4959     This checks that the instance is in the cluster.
4960
4961     """
4962     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4963     assert self.instance is not None, \
4964       "Cannot retrieve locked instance %s" % self.op.instance_name
4965     _CheckNodeOnline(self, self.instance.primary_node)
4966
4967   def Exec(self, feedback_fn):
4968     """Connect to the console of an instance
4969
4970     """
4971     instance = self.instance
4972     node = instance.primary_node
4973
4974     node_insts = self.rpc.call_instance_list([node],
4975                                              [instance.hypervisor])[node]
4976     node_insts.Raise()
4977
4978     if instance.name not in node_insts.data:
4979       raise errors.OpExecError("Instance %s is not running." % instance.name)
4980
4981     logging.debug("Connecting to console of %s on %s", instance.name, node)
4982
4983     hyper = hypervisor.GetHypervisor(instance.hypervisor)
4984     cluster = self.cfg.GetClusterInfo()
4985     # beparams and hvparams are passed separately, to avoid editing the
4986     # instance and then saving the defaults in the instance itself.
4987     hvparams = cluster.FillHV(instance)
4988     beparams = cluster.FillBE(instance)
4989     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
4990
4991     # build ssh cmdline
4992     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
4993
4994
4995 class LUReplaceDisks(LogicalUnit):
4996   """Replace the disks of an instance.
4997
4998   """
4999   HPATH = "mirrors-replace"
5000   HTYPE = constants.HTYPE_INSTANCE
5001   _OP_REQP = ["instance_name", "mode", "disks"]
5002   REQ_BGL = False
5003
5004   def CheckArguments(self):
5005     if not hasattr(self.op, "remote_node"):
5006       self.op.remote_node = None
5007     if not hasattr(self.op, "iallocator"):
5008       self.op.iallocator = None
5009
5010     # check for valid parameter combination
5011     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5012     if self.op.mode == constants.REPLACE_DISK_CHG:
5013       if cnt == 2:
5014         raise errors.OpPrereqError("When changing the secondary either an"
5015                                    " iallocator script must be used or the"
5016                                    " new node given")
5017       elif cnt == 0:
5018         raise errors.OpPrereqError("Give either the iallocator or the new"
5019                                    " secondary, not both")
5020     else: # not replacing the secondary
5021       if cnt != 2:
5022         raise errors.OpPrereqError("The iallocator and new node options can"
5023                                    " be used only when changing the"
5024                                    " secondary node")
5025
5026   def ExpandNames(self):
5027     self._ExpandAndLockInstance()
5028
5029     if self.op.iallocator is not None:
5030       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5031     elif self.op.remote_node is not None:
5032       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5033       if remote_node is None:
5034         raise errors.OpPrereqError("Node '%s' not known" %
5035                                    self.op.remote_node)
5036       self.op.remote_node = remote_node
5037       # Warning: do not remove the locking of the new secondary here
5038       # unless DRBD8.AddChildren is changed to work in parallel;
5039       # currently it doesn't since parallel invocations of
5040       # FindUnusedMinor will conflict
5041       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5042       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5043     else:
5044       self.needed_locks[locking.LEVEL_NODE] = []
5045       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5046
5047   def DeclareLocks(self, level):
5048     # If we're not already locking all nodes in the set we have to declare the
5049     # instance's primary/secondary nodes.
5050     if (level == locking.LEVEL_NODE and
5051         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5052       self._LockInstancesNodes()
5053
5054   def _RunAllocator(self):
5055     """Compute a new secondary node using an IAllocator.
5056
5057     """
5058     ial = IAllocator(self,
5059                      mode=constants.IALLOCATOR_MODE_RELOC,
5060                      name=self.op.instance_name,
5061                      relocate_from=[self.sec_node])
5062
5063     ial.Run(self.op.iallocator)
5064
5065     if not ial.success:
5066       raise errors.OpPrereqError("Can't compute nodes using"
5067                                  " iallocator '%s': %s" % (self.op.iallocator,
5068                                                            ial.info))
5069     if len(ial.nodes) != ial.required_nodes:
5070       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5071                                  " of nodes (%s), required %s" %
5072                                  (len(ial.nodes), ial.required_nodes))
5073     self.op.remote_node = ial.nodes[0]
5074     self.LogInfo("Selected new secondary for the instance: %s",
5075                  self.op.remote_node)
5076
5077   def BuildHooksEnv(self):
5078     """Build hooks env.
5079
5080     This runs on the master, the primary and all the secondaries.
5081
5082     """
5083     env = {
5084       "MODE": self.op.mode,
5085       "NEW_SECONDARY": self.op.remote_node,
5086       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5087       }
5088     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5089     nl = [
5090       self.cfg.GetMasterNode(),
5091       self.instance.primary_node,
5092       ]
5093     if self.op.remote_node is not None:
5094       nl.append(self.op.remote_node)
5095     return env, nl, nl
5096
5097   def CheckPrereq(self):
5098     """Check prerequisites.
5099
5100     This checks that the instance is in the cluster.
5101
5102     """
5103     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5104     assert instance is not None, \
5105       "Cannot retrieve locked instance %s" % self.op.instance_name
5106     self.instance = instance
5107
5108     if instance.disk_template != constants.DT_DRBD8:
5109       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5110                                  " instances")
5111
5112     if len(instance.secondary_nodes) != 1:
5113       raise errors.OpPrereqError("The instance has a strange layout,"
5114                                  " expected one secondary but found %d" %
5115                                  len(instance.secondary_nodes))
5116
5117     self.sec_node = instance.secondary_nodes[0]
5118
5119     if self.op.iallocator is not None:
5120       self._RunAllocator()
5121
5122     remote_node = self.op.remote_node
5123     if remote_node is not None:
5124       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5125       assert self.remote_node_info is not None, \
5126         "Cannot retrieve locked node %s" % remote_node
5127     else:
5128       self.remote_node_info = None
5129     if remote_node == instance.primary_node:
5130       raise errors.OpPrereqError("The specified node is the primary node of"
5131                                  " the instance.")
5132     elif remote_node == self.sec_node:
5133       raise errors.OpPrereqError("The specified node is already the"
5134                                  " secondary node of the instance.")
5135
5136     if self.op.mode == constants.REPLACE_DISK_PRI:
5137       n1 = self.tgt_node = instance.primary_node
5138       n2 = self.oth_node = self.sec_node
5139     elif self.op.mode == constants.REPLACE_DISK_SEC:
5140       n1 = self.tgt_node = self.sec_node
5141       n2 = self.oth_node = instance.primary_node
5142     elif self.op.mode == constants.REPLACE_DISK_CHG:
5143       n1 = self.new_node = remote_node
5144       n2 = self.oth_node = instance.primary_node
5145       self.tgt_node = self.sec_node
5146       _CheckNodeNotDrained(self, remote_node)
5147     else:
5148       raise errors.ProgrammerError("Unhandled disk replace mode")
5149
5150     _CheckNodeOnline(self, n1)
5151     _CheckNodeOnline(self, n2)
5152
5153     if not self.op.disks:
5154       self.op.disks = range(len(instance.disks))
5155
5156     for disk_idx in self.op.disks:
5157       instance.FindDisk(disk_idx)
5158
5159   def _ExecD8DiskOnly(self, feedback_fn):
5160     """Replace a disk on the primary or secondary for dbrd8.
5161
5162     The algorithm for replace is quite complicated:
5163
5164       1. for each disk to be replaced:
5165
5166         1. create new LVs on the target node with unique names
5167         1. detach old LVs from the drbd device
5168         1. rename old LVs to name_replaced.<time_t>
5169         1. rename new LVs to old LVs
5170         1. attach the new LVs (with the old names now) to the drbd device
5171
5172       1. wait for sync across all devices
5173
5174       1. for each modified disk:
5175
5176         1. remove old LVs (which have the name name_replaces.<time_t>)
5177
5178     Failures are not very well handled.
5179
5180     """
5181     steps_total = 6
5182     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5183     instance = self.instance
5184     iv_names = {}
5185     vgname = self.cfg.GetVGName()
5186     # start of work
5187     cfg = self.cfg
5188     tgt_node = self.tgt_node
5189     oth_node = self.oth_node
5190
5191     # Step: check device activation
5192     self.proc.LogStep(1, steps_total, "check device existence")
5193     info("checking volume groups")
5194     my_vg = cfg.GetVGName()
5195     results = self.rpc.call_vg_list([oth_node, tgt_node])
5196     if not results:
5197       raise errors.OpExecError("Can't list volume groups on the nodes")
5198     for node in oth_node, tgt_node:
5199       res = results[node]
5200       if res.failed or not res.data or my_vg not in res.data:
5201         raise errors.OpExecError("Volume group '%s' not found on %s" %
5202                                  (my_vg, node))
5203     for idx, dev in enumerate(instance.disks):
5204       if idx not in self.op.disks:
5205         continue
5206       for node in tgt_node, oth_node:
5207         info("checking disk/%d on %s" % (idx, node))
5208         cfg.SetDiskID(dev, node)
5209         result = self.rpc.call_blockdev_find(node, dev)
5210         msg = result.RemoteFailMsg()
5211         if not msg and not result.payload:
5212           msg = "disk not found"
5213         if msg:
5214           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5215                                    (idx, node, msg))
5216
5217     # Step: check other node consistency
5218     self.proc.LogStep(2, steps_total, "check peer consistency")
5219     for idx, dev in enumerate(instance.disks):
5220       if idx not in self.op.disks:
5221         continue
5222       info("checking disk/%d consistency on %s" % (idx, oth_node))
5223       if not _CheckDiskConsistency(self, dev, oth_node,
5224                                    oth_node==instance.primary_node):
5225         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5226                                  " to replace disks on this node (%s)" %
5227                                  (oth_node, tgt_node))
5228
5229     # Step: create new storage
5230     self.proc.LogStep(3, steps_total, "allocate new storage")
5231     for idx, dev in enumerate(instance.disks):
5232       if idx not in self.op.disks:
5233         continue
5234       size = dev.size
5235       cfg.SetDiskID(dev, tgt_node)
5236       lv_names = [".disk%d_%s" % (idx, suf)
5237                   for suf in ["data", "meta"]]
5238       names = _GenerateUniqueNames(self, lv_names)
5239       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5240                              logical_id=(vgname, names[0]))
5241       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5242                              logical_id=(vgname, names[1]))
5243       new_lvs = [lv_data, lv_meta]
5244       old_lvs = dev.children
5245       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5246       info("creating new local storage on %s for %s" %
5247            (tgt_node, dev.iv_name))
5248       # we pass force_create=True to force the LVM creation
5249       for new_lv in new_lvs:
5250         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5251                         _GetInstanceInfoText(instance), False)
5252
5253     # Step: for each lv, detach+rename*2+attach
5254     self.proc.LogStep(4, steps_total, "change drbd configuration")
5255     for dev, old_lvs, new_lvs in iv_names.itervalues():
5256       info("detaching %s drbd from local storage" % dev.iv_name)
5257       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5258       result.Raise()
5259       if not result.data:
5260         raise errors.OpExecError("Can't detach drbd from local storage on node"
5261                                  " %s for device %s" % (tgt_node, dev.iv_name))
5262       #dev.children = []
5263       #cfg.Update(instance)
5264
5265       # ok, we created the new LVs, so now we know we have the needed
5266       # storage; as such, we proceed on the target node to rename
5267       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5268       # using the assumption that logical_id == physical_id (which in
5269       # turn is the unique_id on that node)
5270
5271       # FIXME(iustin): use a better name for the replaced LVs
5272       temp_suffix = int(time.time())
5273       ren_fn = lambda d, suff: (d.physical_id[0],
5274                                 d.physical_id[1] + "_replaced-%s" % suff)
5275       # build the rename list based on what LVs exist on the node
5276       rlist = []
5277       for to_ren in old_lvs:
5278         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5279         if not result.RemoteFailMsg() and result.payload:
5280           # device exists
5281           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5282
5283       info("renaming the old LVs on the target node")
5284       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5285       result.Raise()
5286       if not result.data:
5287         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5288       # now we rename the new LVs to the old LVs
5289       info("renaming the new LVs on the target node")
5290       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5291       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5292       result.Raise()
5293       if not result.data:
5294         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5295
5296       for old, new in zip(old_lvs, new_lvs):
5297         new.logical_id = old.logical_id
5298         cfg.SetDiskID(new, tgt_node)
5299
5300       for disk in old_lvs:
5301         disk.logical_id = ren_fn(disk, temp_suffix)
5302         cfg.SetDiskID(disk, tgt_node)
5303
5304       # now that the new lvs have the old name, we can add them to the device
5305       info("adding new mirror component on %s" % tgt_node)
5306       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5307       if result.failed or not result.data:
5308         for new_lv in new_lvs:
5309           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5310           if msg:
5311             warning("Can't rollback device %s: %s", dev, msg,
5312                     hint="cleanup manually the unused logical volumes")
5313         raise errors.OpExecError("Can't add local storage to drbd")
5314
5315       dev.children = new_lvs
5316       cfg.Update(instance)
5317
5318     # Step: wait for sync
5319
5320     # this can fail as the old devices are degraded and _WaitForSync
5321     # does a combined result over all disks, so we don't check its
5322     # return value
5323     self.proc.LogStep(5, steps_total, "sync devices")
5324     _WaitForSync(self, instance, unlock=True)
5325
5326     # so check manually all the devices
5327     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5328       cfg.SetDiskID(dev, instance.primary_node)
5329       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5330       msg = result.RemoteFailMsg()
5331       if not msg and not result.payload:
5332         msg = "disk not found"
5333       if msg:
5334         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5335                                  (name, msg))
5336       if result.payload[5]:
5337         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5338
5339     # Step: remove old storage
5340     self.proc.LogStep(6, steps_total, "removing old storage")
5341     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5342       info("remove logical volumes for %s" % name)
5343       for lv in old_lvs:
5344         cfg.SetDiskID(lv, tgt_node)
5345         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5346         if msg:
5347           warning("Can't remove old LV: %s" % msg,
5348                   hint="manually remove unused LVs")
5349           continue
5350
5351   def _ExecD8Secondary(self, feedback_fn):
5352     """Replace the secondary node for drbd8.
5353
5354     The algorithm for replace is quite complicated:
5355       - for all disks of the instance:
5356         - create new LVs on the new node with same names
5357         - shutdown the drbd device on the old secondary
5358         - disconnect the drbd network on the primary
5359         - create the drbd device on the new secondary
5360         - network attach the drbd on the primary, using an artifice:
5361           the drbd code for Attach() will connect to the network if it
5362           finds a device which is connected to the good local disks but
5363           not network enabled
5364       - wait for sync across all devices
5365       - remove all disks from the old secondary
5366
5367     Failures are not very well handled.
5368
5369     """
5370     steps_total = 6
5371     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5372     instance = self.instance
5373     iv_names = {}
5374     # start of work
5375     cfg = self.cfg
5376     old_node = self.tgt_node
5377     new_node = self.new_node
5378     pri_node = instance.primary_node
5379     nodes_ip = {
5380       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5381       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5382       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5383       }
5384
5385     # Step: check device activation
5386     self.proc.LogStep(1, steps_total, "check device existence")
5387     info("checking volume groups")
5388     my_vg = cfg.GetVGName()
5389     results = self.rpc.call_vg_list([pri_node, new_node])
5390     for node in pri_node, new_node:
5391       res = results[node]
5392       if res.failed or not res.data or my_vg not in res.data:
5393         raise errors.OpExecError("Volume group '%s' not found on %s" %
5394                                  (my_vg, node))
5395     for idx, dev in enumerate(instance.disks):
5396       if idx not in self.op.disks:
5397         continue
5398       info("checking disk/%d on %s" % (idx, pri_node))
5399       cfg.SetDiskID(dev, pri_node)
5400       result = self.rpc.call_blockdev_find(pri_node, dev)
5401       msg = result.RemoteFailMsg()
5402       if not msg and not result.payload:
5403         msg = "disk not found"
5404       if msg:
5405         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5406                                  (idx, pri_node, msg))
5407
5408     # Step: check other node consistency
5409     self.proc.LogStep(2, steps_total, "check peer consistency")
5410     for idx, dev in enumerate(instance.disks):
5411       if idx not in self.op.disks:
5412         continue
5413       info("checking disk/%d consistency on %s" % (idx, pri_node))
5414       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5415         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5416                                  " unsafe to replace the secondary" %
5417                                  pri_node)
5418
5419     # Step: create new storage
5420     self.proc.LogStep(3, steps_total, "allocate new storage")
5421     for idx, dev in enumerate(instance.disks):
5422       info("adding new local storage on %s for disk/%d" %
5423            (new_node, idx))
5424       # we pass force_create=True to force LVM creation
5425       for new_lv in dev.children:
5426         _CreateBlockDev(self, new_node, instance, new_lv, True,
5427                         _GetInstanceInfoText(instance), False)
5428
5429     # Step 4: dbrd minors and drbd setups changes
5430     # after this, we must manually remove the drbd minors on both the
5431     # error and the success paths
5432     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5433                                    instance.name)
5434     logging.debug("Allocated minors %s" % (minors,))
5435     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5436     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5437       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5438       # create new devices on new_node; note that we create two IDs:
5439       # one without port, so the drbd will be activated without
5440       # networking information on the new node at this stage, and one
5441       # with network, for the latter activation in step 4
5442       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5443       if pri_node == o_node1:
5444         p_minor = o_minor1
5445       else:
5446         p_minor = o_minor2
5447
5448       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5449       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5450
5451       iv_names[idx] = (dev, dev.children, new_net_id)
5452       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5453                     new_net_id)
5454       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5455                               logical_id=new_alone_id,
5456                               children=dev.children,
5457                               size=dev.size)
5458       try:
5459         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5460                               _GetInstanceInfoText(instance), False)
5461       except errors.GenericError:
5462         self.cfg.ReleaseDRBDMinors(instance.name)
5463         raise
5464
5465     for idx, dev in enumerate(instance.disks):
5466       # we have new devices, shutdown the drbd on the old secondary
5467       info("shutting down drbd for disk/%d on old node" % idx)
5468       cfg.SetDiskID(dev, old_node)
5469       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5470       if msg:
5471         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5472                 (idx, msg),
5473                 hint="Please cleanup this device manually as soon as possible")
5474
5475     info("detaching primary drbds from the network (=> standalone)")
5476     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5477                                                instance.disks)[pri_node]
5478
5479     msg = result.RemoteFailMsg()
5480     if msg:
5481       # detaches didn't succeed (unlikely)
5482       self.cfg.ReleaseDRBDMinors(instance.name)
5483       raise errors.OpExecError("Can't detach the disks from the network on"
5484                                " old node: %s" % (msg,))
5485
5486     # if we managed to detach at least one, we update all the disks of
5487     # the instance to point to the new secondary
5488     info("updating instance configuration")
5489     for dev, _, new_logical_id in iv_names.itervalues():
5490       dev.logical_id = new_logical_id
5491       cfg.SetDiskID(dev, pri_node)
5492     cfg.Update(instance)
5493
5494     # and now perform the drbd attach
5495     info("attaching primary drbds to new secondary (standalone => connected)")
5496     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5497                                            instance.disks, instance.name,
5498                                            False)
5499     for to_node, to_result in result.items():
5500       msg = to_result.RemoteFailMsg()
5501       if msg:
5502         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5503                 hint="please do a gnt-instance info to see the"
5504                 " status of disks")
5505
5506     # this can fail as the old devices are degraded and _WaitForSync
5507     # does a combined result over all disks, so we don't check its
5508     # return value
5509     self.proc.LogStep(5, steps_total, "sync devices")
5510     _WaitForSync(self, instance, unlock=True)
5511
5512     # so check manually all the devices
5513     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5514       cfg.SetDiskID(dev, pri_node)
5515       result = self.rpc.call_blockdev_find(pri_node, dev)
5516       msg = result.RemoteFailMsg()
5517       if not msg and not result.payload:
5518         msg = "disk not found"
5519       if msg:
5520         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5521                                  (idx, msg))
5522       if result.payload[5]:
5523         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5524
5525     self.proc.LogStep(6, steps_total, "removing old storage")
5526     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5527       info("remove logical volumes for disk/%d" % idx)
5528       for lv in old_lvs:
5529         cfg.SetDiskID(lv, old_node)
5530         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5531         if msg:
5532           warning("Can't remove LV on old secondary: %s", msg,
5533                   hint="Cleanup stale volumes by hand")
5534
5535   def Exec(self, feedback_fn):
5536     """Execute disk replacement.
5537
5538     This dispatches the disk replacement to the appropriate handler.
5539
5540     """
5541     instance = self.instance
5542
5543     # Activate the instance disks if we're replacing them on a down instance
5544     if not instance.admin_up:
5545       _StartInstanceDisks(self, instance, True)
5546
5547     if self.op.mode == constants.REPLACE_DISK_CHG:
5548       fn = self._ExecD8Secondary
5549     else:
5550       fn = self._ExecD8DiskOnly
5551
5552     ret = fn(feedback_fn)
5553
5554     # Deactivate the instance disks if we're replacing them on a down instance
5555     if not instance.admin_up:
5556       _SafeShutdownInstanceDisks(self, instance)
5557
5558     return ret
5559
5560
5561 class LUGrowDisk(LogicalUnit):
5562   """Grow a disk of an instance.
5563
5564   """
5565   HPATH = "disk-grow"
5566   HTYPE = constants.HTYPE_INSTANCE
5567   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5568   REQ_BGL = False
5569
5570   def ExpandNames(self):
5571     self._ExpandAndLockInstance()
5572     self.needed_locks[locking.LEVEL_NODE] = []
5573     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5574
5575   def DeclareLocks(self, level):
5576     if level == locking.LEVEL_NODE:
5577       self._LockInstancesNodes()
5578
5579   def BuildHooksEnv(self):
5580     """Build hooks env.
5581
5582     This runs on the master, the primary and all the secondaries.
5583
5584     """
5585     env = {
5586       "DISK": self.op.disk,
5587       "AMOUNT": self.op.amount,
5588       }
5589     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5590     nl = [
5591       self.cfg.GetMasterNode(),
5592       self.instance.primary_node,
5593       ]
5594     return env, nl, nl
5595
5596   def CheckPrereq(self):
5597     """Check prerequisites.
5598
5599     This checks that the instance is in the cluster.
5600
5601     """
5602     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5603     assert instance is not None, \
5604       "Cannot retrieve locked instance %s" % self.op.instance_name
5605     nodenames = list(instance.all_nodes)
5606     for node in nodenames:
5607       _CheckNodeOnline(self, node)
5608
5609
5610     self.instance = instance
5611
5612     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5613       raise errors.OpPrereqError("Instance's disk layout does not support"
5614                                  " growing.")
5615
5616     self.disk = instance.FindDisk(self.op.disk)
5617
5618     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5619                                        instance.hypervisor)
5620     for node in nodenames:
5621       info = nodeinfo[node]
5622       if info.failed or not info.data:
5623         raise errors.OpPrereqError("Cannot get current information"
5624                                    " from node '%s'" % node)
5625       vg_free = info.data.get('vg_free', None)
5626       if not isinstance(vg_free, int):
5627         raise errors.OpPrereqError("Can't compute free disk space on"
5628                                    " node %s" % node)
5629       if self.op.amount > vg_free:
5630         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5631                                    " %d MiB available, %d MiB required" %
5632                                    (node, vg_free, self.op.amount))
5633
5634   def Exec(self, feedback_fn):
5635     """Execute disk grow.
5636
5637     """
5638     instance = self.instance
5639     disk = self.disk
5640     for node in instance.all_nodes:
5641       self.cfg.SetDiskID(disk, node)
5642       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5643       msg = result.RemoteFailMsg()
5644       if msg:
5645         raise errors.OpExecError("Grow request failed to node %s: %s" %
5646                                  (node, msg))
5647     disk.RecordGrow(self.op.amount)
5648     self.cfg.Update(instance)
5649     if self.op.wait_for_sync:
5650       disk_abort = not _WaitForSync(self, instance)
5651       if disk_abort:
5652         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5653                              " status.\nPlease check the instance.")
5654
5655
5656 class LUQueryInstanceData(NoHooksLU):
5657   """Query runtime instance data.
5658
5659   """
5660   _OP_REQP = ["instances", "static"]
5661   REQ_BGL = False
5662
5663   def ExpandNames(self):
5664     self.needed_locks = {}
5665     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5666
5667     if not isinstance(self.op.instances, list):
5668       raise errors.OpPrereqError("Invalid argument type 'instances'")
5669
5670     if self.op.instances:
5671       self.wanted_names = []
5672       for name in self.op.instances:
5673         full_name = self.cfg.ExpandInstanceName(name)
5674         if full_name is None:
5675           raise errors.OpPrereqError("Instance '%s' not known" % name)
5676         self.wanted_names.append(full_name)
5677       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5678     else:
5679       self.wanted_names = None
5680       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5681
5682     self.needed_locks[locking.LEVEL_NODE] = []
5683     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5684
5685   def DeclareLocks(self, level):
5686     if level == locking.LEVEL_NODE:
5687       self._LockInstancesNodes()
5688
5689   def CheckPrereq(self):
5690     """Check prerequisites.
5691
5692     This only checks the optional instance list against the existing names.
5693
5694     """
5695     if self.wanted_names is None:
5696       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5697
5698     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5699                              in self.wanted_names]
5700     return
5701
5702   def _ComputeDiskStatus(self, instance, snode, dev):
5703     """Compute block device status.
5704
5705     """
5706     static = self.op.static
5707     if not static:
5708       self.cfg.SetDiskID(dev, instance.primary_node)
5709       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5710       if dev_pstatus.offline:
5711         dev_pstatus = None
5712       else:
5713         msg = dev_pstatus.RemoteFailMsg()
5714         if msg:
5715           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5716                                    (instance.name, msg))
5717         dev_pstatus = dev_pstatus.payload
5718     else:
5719       dev_pstatus = None
5720
5721     if dev.dev_type in constants.LDS_DRBD:
5722       # we change the snode then (otherwise we use the one passed in)
5723       if dev.logical_id[0] == instance.primary_node:
5724         snode = dev.logical_id[1]
5725       else:
5726         snode = dev.logical_id[0]
5727
5728     if snode and not static:
5729       self.cfg.SetDiskID(dev, snode)
5730       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5731       if dev_sstatus.offline:
5732         dev_sstatus = None
5733       else:
5734         msg = dev_sstatus.RemoteFailMsg()
5735         if msg:
5736           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5737                                    (instance.name, msg))
5738         dev_sstatus = dev_sstatus.payload
5739     else:
5740       dev_sstatus = None
5741
5742     if dev.children:
5743       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5744                       for child in dev.children]
5745     else:
5746       dev_children = []
5747
5748     data = {
5749       "iv_name": dev.iv_name,
5750       "dev_type": dev.dev_type,
5751       "logical_id": dev.logical_id,
5752       "physical_id": dev.physical_id,
5753       "pstatus": dev_pstatus,
5754       "sstatus": dev_sstatus,
5755       "children": dev_children,
5756       "mode": dev.mode,
5757       "size": dev.size,
5758       }
5759
5760     return data
5761
5762   def Exec(self, feedback_fn):
5763     """Gather and return data"""
5764     result = {}
5765
5766     cluster = self.cfg.GetClusterInfo()
5767
5768     for instance in self.wanted_instances:
5769       if not self.op.static:
5770         remote_info = self.rpc.call_instance_info(instance.primary_node,
5771                                                   instance.name,
5772                                                   instance.hypervisor)
5773         remote_info.Raise()
5774         remote_info = remote_info.data
5775         if remote_info and "state" in remote_info:
5776           remote_state = "up"
5777         else:
5778           remote_state = "down"
5779       else:
5780         remote_state = None
5781       if instance.admin_up:
5782         config_state = "up"
5783       else:
5784         config_state = "down"
5785
5786       disks = [self._ComputeDiskStatus(instance, None, device)
5787                for device in instance.disks]
5788
5789       idict = {
5790         "name": instance.name,
5791         "config_state": config_state,
5792         "run_state": remote_state,
5793         "pnode": instance.primary_node,
5794         "snodes": instance.secondary_nodes,
5795         "os": instance.os,
5796         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5797         "disks": disks,
5798         "hypervisor": instance.hypervisor,
5799         "network_port": instance.network_port,
5800         "hv_instance": instance.hvparams,
5801         "hv_actual": cluster.FillHV(instance),
5802         "be_instance": instance.beparams,
5803         "be_actual": cluster.FillBE(instance),
5804         }
5805
5806       result[instance.name] = idict
5807
5808     return result
5809
5810
5811 class LUSetInstanceParams(LogicalUnit):
5812   """Modifies an instances's parameters.
5813
5814   """
5815   HPATH = "instance-modify"
5816   HTYPE = constants.HTYPE_INSTANCE
5817   _OP_REQP = ["instance_name"]
5818   REQ_BGL = False
5819
5820   def CheckArguments(self):
5821     if not hasattr(self.op, 'nics'):
5822       self.op.nics = []
5823     if not hasattr(self.op, 'disks'):
5824       self.op.disks = []
5825     if not hasattr(self.op, 'beparams'):
5826       self.op.beparams = {}
5827     if not hasattr(self.op, 'hvparams'):
5828       self.op.hvparams = {}
5829     self.op.force = getattr(self.op, "force", False)
5830     if not (self.op.nics or self.op.disks or
5831             self.op.hvparams or self.op.beparams):
5832       raise errors.OpPrereqError("No changes submitted")
5833
5834     # Disk validation
5835     disk_addremove = 0
5836     for disk_op, disk_dict in self.op.disks:
5837       if disk_op == constants.DDM_REMOVE:
5838         disk_addremove += 1
5839         continue
5840       elif disk_op == constants.DDM_ADD:
5841         disk_addremove += 1
5842       else:
5843         if not isinstance(disk_op, int):
5844           raise errors.OpPrereqError("Invalid disk index")
5845       if disk_op == constants.DDM_ADD:
5846         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5847         if mode not in constants.DISK_ACCESS_SET:
5848           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5849         size = disk_dict.get('size', None)
5850         if size is None:
5851           raise errors.OpPrereqError("Required disk parameter size missing")
5852         try:
5853           size = int(size)
5854         except ValueError, err:
5855           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5856                                      str(err))
5857         disk_dict['size'] = size
5858       else:
5859         # modification of disk
5860         if 'size' in disk_dict:
5861           raise errors.OpPrereqError("Disk size change not possible, use"
5862                                      " grow-disk")
5863
5864     if disk_addremove > 1:
5865       raise errors.OpPrereqError("Only one disk add or remove operation"
5866                                  " supported at a time")
5867
5868     # NIC validation
5869     nic_addremove = 0
5870     for nic_op, nic_dict in self.op.nics:
5871       if nic_op == constants.DDM_REMOVE:
5872         nic_addremove += 1
5873         continue
5874       elif nic_op == constants.DDM_ADD:
5875         nic_addremove += 1
5876       else:
5877         if not isinstance(nic_op, int):
5878           raise errors.OpPrereqError("Invalid nic index")
5879
5880       # nic_dict should be a dict
5881       nic_ip = nic_dict.get('ip', None)
5882       if nic_ip is not None:
5883         if nic_ip.lower() == constants.VALUE_NONE:
5884           nic_dict['ip'] = None
5885         else:
5886           if not utils.IsValidIP(nic_ip):
5887             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5888
5889       if nic_op == constants.DDM_ADD:
5890         nic_bridge = nic_dict.get('bridge', None)
5891         if nic_bridge is None:
5892           nic_dict['bridge'] = self.cfg.GetDefBridge()
5893         nic_mac = nic_dict.get('mac', None)
5894         if nic_mac is None:
5895           nic_dict['mac'] = constants.VALUE_AUTO
5896
5897       if 'mac' in nic_dict:
5898         nic_mac = nic_dict['mac']
5899         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5900           if not utils.IsValidMac(nic_mac):
5901             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5902         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5903           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5904                                      " modifying an existing nic")
5905
5906     if nic_addremove > 1:
5907       raise errors.OpPrereqError("Only one NIC add or remove operation"
5908                                  " supported at a time")
5909
5910   def ExpandNames(self):
5911     self._ExpandAndLockInstance()
5912     self.needed_locks[locking.LEVEL_NODE] = []
5913     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5914
5915   def DeclareLocks(self, level):
5916     if level == locking.LEVEL_NODE:
5917       self._LockInstancesNodes()
5918
5919   def BuildHooksEnv(self):
5920     """Build hooks env.
5921
5922     This runs on the master, primary and secondaries.
5923
5924     """
5925     args = dict()
5926     if constants.BE_MEMORY in self.be_new:
5927       args['memory'] = self.be_new[constants.BE_MEMORY]
5928     if constants.BE_VCPUS in self.be_new:
5929       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5930     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5931     # information at all.
5932     if self.op.nics:
5933       args['nics'] = []
5934       nic_override = dict(self.op.nics)
5935       for idx, nic in enumerate(self.instance.nics):
5936         if idx in nic_override:
5937           this_nic_override = nic_override[idx]
5938         else:
5939           this_nic_override = {}
5940         if 'ip' in this_nic_override:
5941           ip = this_nic_override['ip']
5942         else:
5943           ip = nic.ip
5944         if 'bridge' in this_nic_override:
5945           bridge = this_nic_override['bridge']
5946         else:
5947           bridge = nic.bridge
5948         if 'mac' in this_nic_override:
5949           mac = this_nic_override['mac']
5950         else:
5951           mac = nic.mac
5952         args['nics'].append((ip, bridge, mac))
5953       if constants.DDM_ADD in nic_override:
5954         ip = nic_override[constants.DDM_ADD].get('ip', None)
5955         bridge = nic_override[constants.DDM_ADD]['bridge']
5956         mac = nic_override[constants.DDM_ADD]['mac']
5957         args['nics'].append((ip, bridge, mac))
5958       elif constants.DDM_REMOVE in nic_override:
5959         del args['nics'][-1]
5960
5961     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
5962     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
5963     return env, nl, nl
5964
5965   def CheckPrereq(self):
5966     """Check prerequisites.
5967
5968     This only checks the instance list against the existing names.
5969
5970     """
5971     self.force = self.op.force
5972
5973     # checking the new params on the primary/secondary nodes
5974
5975     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5976     assert self.instance is not None, \
5977       "Cannot retrieve locked instance %s" % self.op.instance_name
5978     pnode = instance.primary_node
5979     nodelist = list(instance.all_nodes)
5980
5981     # hvparams processing
5982     if self.op.hvparams:
5983       i_hvdict = copy.deepcopy(instance.hvparams)
5984       for key, val in self.op.hvparams.iteritems():
5985         if val == constants.VALUE_DEFAULT:
5986           try:
5987             del i_hvdict[key]
5988           except KeyError:
5989             pass
5990         else:
5991           i_hvdict[key] = val
5992       cluster = self.cfg.GetClusterInfo()
5993       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
5994       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
5995                                 i_hvdict)
5996       # local check
5997       hypervisor.GetHypervisor(
5998         instance.hypervisor).CheckParameterSyntax(hv_new)
5999       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6000       self.hv_new = hv_new # the new actual values
6001       self.hv_inst = i_hvdict # the new dict (without defaults)
6002     else:
6003       self.hv_new = self.hv_inst = {}
6004
6005     # beparams processing
6006     if self.op.beparams:
6007       i_bedict = copy.deepcopy(instance.beparams)
6008       for key, val in self.op.beparams.iteritems():
6009         if val == constants.VALUE_DEFAULT:
6010           try:
6011             del i_bedict[key]
6012           except KeyError:
6013             pass
6014         else:
6015           i_bedict[key] = val
6016       cluster = self.cfg.GetClusterInfo()
6017       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
6018       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
6019                                 i_bedict)
6020       self.be_new = be_new # the new actual values
6021       self.be_inst = i_bedict # the new dict (without defaults)
6022     else:
6023       self.be_new = self.be_inst = {}
6024
6025     self.warn = []
6026
6027     if constants.BE_MEMORY in self.op.beparams and not self.force:
6028       mem_check_list = [pnode]
6029       if be_new[constants.BE_AUTO_BALANCE]:
6030         # either we changed auto_balance to yes or it was from before
6031         mem_check_list.extend(instance.secondary_nodes)
6032       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6033                                                   instance.hypervisor)
6034       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6035                                          instance.hypervisor)
6036       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6037         # Assume the primary node is unreachable and go ahead
6038         self.warn.append("Can't get info from primary node %s" % pnode)
6039       else:
6040         if not instance_info.failed and instance_info.data:
6041           current_mem = int(instance_info.data['memory'])
6042         else:
6043           # Assume instance not running
6044           # (there is a slight race condition here, but it's not very probable,
6045           # and we have no other way to check)
6046           current_mem = 0
6047         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6048                     nodeinfo[pnode].data['memory_free'])
6049         if miss_mem > 0:
6050           raise errors.OpPrereqError("This change will prevent the instance"
6051                                      " from starting, due to %d MB of memory"
6052                                      " missing on its primary node" % miss_mem)
6053
6054       if be_new[constants.BE_AUTO_BALANCE]:
6055         for node, nres in nodeinfo.iteritems():
6056           if node not in instance.secondary_nodes:
6057             continue
6058           if nres.failed or not isinstance(nres.data, dict):
6059             self.warn.append("Can't get info from secondary node %s" % node)
6060           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6061             self.warn.append("Not enough memory to failover instance to"
6062                              " secondary node %s" % node)
6063
6064     # NIC processing
6065     for nic_op, nic_dict in self.op.nics:
6066       if nic_op == constants.DDM_REMOVE:
6067         if not instance.nics:
6068           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6069         continue
6070       if nic_op != constants.DDM_ADD:
6071         # an existing nic
6072         if nic_op < 0 or nic_op >= len(instance.nics):
6073           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6074                                      " are 0 to %d" %
6075                                      (nic_op, len(instance.nics)))
6076       if 'bridge' in nic_dict:
6077         nic_bridge = nic_dict['bridge']
6078         if nic_bridge is None:
6079           raise errors.OpPrereqError('Cannot set the nic bridge to None')
6080         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6081           msg = ("Bridge '%s' doesn't exist on one of"
6082                  " the instance nodes" % nic_bridge)
6083           if self.force:
6084             self.warn.append(msg)
6085           else:
6086             raise errors.OpPrereqError(msg)
6087       if 'mac' in nic_dict:
6088         nic_mac = nic_dict['mac']
6089         if nic_mac is None:
6090           raise errors.OpPrereqError('Cannot set the nic mac to None')
6091         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6092           # otherwise generate the mac
6093           nic_dict['mac'] = self.cfg.GenerateMAC()
6094         else:
6095           # or validate/reserve the current one
6096           if self.cfg.IsMacInUse(nic_mac):
6097             raise errors.OpPrereqError("MAC address %s already in use"
6098                                        " in cluster" % nic_mac)
6099
6100     # DISK processing
6101     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6102       raise errors.OpPrereqError("Disk operations not supported for"
6103                                  " diskless instances")
6104     for disk_op, disk_dict in self.op.disks:
6105       if disk_op == constants.DDM_REMOVE:
6106         if len(instance.disks) == 1:
6107           raise errors.OpPrereqError("Cannot remove the last disk of"
6108                                      " an instance")
6109         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6110         ins_l = ins_l[pnode]
6111         if ins_l.failed or not isinstance(ins_l.data, list):
6112           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6113         if instance.name in ins_l.data:
6114           raise errors.OpPrereqError("Instance is running, can't remove"
6115                                      " disks.")
6116
6117       if (disk_op == constants.DDM_ADD and
6118           len(instance.nics) >= constants.MAX_DISKS):
6119         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6120                                    " add more" % constants.MAX_DISKS)
6121       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6122         # an existing disk
6123         if disk_op < 0 or disk_op >= len(instance.disks):
6124           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6125                                      " are 0 to %d" %
6126                                      (disk_op, len(instance.disks)))
6127
6128     return
6129
6130   def Exec(self, feedback_fn):
6131     """Modifies an instance.
6132
6133     All parameters take effect only at the next restart of the instance.
6134
6135     """
6136     # Process here the warnings from CheckPrereq, as we don't have a
6137     # feedback_fn there.
6138     for warn in self.warn:
6139       feedback_fn("WARNING: %s" % warn)
6140
6141     result = []
6142     instance = self.instance
6143     # disk changes
6144     for disk_op, disk_dict in self.op.disks:
6145       if disk_op == constants.DDM_REMOVE:
6146         # remove the last disk
6147         device = instance.disks.pop()
6148         device_idx = len(instance.disks)
6149         for node, disk in device.ComputeNodeTree(instance.primary_node):
6150           self.cfg.SetDiskID(disk, node)
6151           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6152           if msg:
6153             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6154                             " continuing anyway", device_idx, node, msg)
6155         result.append(("disk/%d" % device_idx, "remove"))
6156       elif disk_op == constants.DDM_ADD:
6157         # add a new disk
6158         if instance.disk_template == constants.DT_FILE:
6159           file_driver, file_path = instance.disks[0].logical_id
6160           file_path = os.path.dirname(file_path)
6161         else:
6162           file_driver = file_path = None
6163         disk_idx_base = len(instance.disks)
6164         new_disk = _GenerateDiskTemplate(self,
6165                                          instance.disk_template,
6166                                          instance.name, instance.primary_node,
6167                                          instance.secondary_nodes,
6168                                          [disk_dict],
6169                                          file_path,
6170                                          file_driver,
6171                                          disk_idx_base)[0]
6172         instance.disks.append(new_disk)
6173         info = _GetInstanceInfoText(instance)
6174
6175         logging.info("Creating volume %s for instance %s",
6176                      new_disk.iv_name, instance.name)
6177         # Note: this needs to be kept in sync with _CreateDisks
6178         #HARDCODE
6179         for node in instance.all_nodes:
6180           f_create = node == instance.primary_node
6181           try:
6182             _CreateBlockDev(self, node, instance, new_disk,
6183                             f_create, info, f_create)
6184           except errors.OpExecError, err:
6185             self.LogWarning("Failed to create volume %s (%s) on"
6186                             " node %s: %s",
6187                             new_disk.iv_name, new_disk, node, err)
6188         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6189                        (new_disk.size, new_disk.mode)))
6190       else:
6191         # change a given disk
6192         instance.disks[disk_op].mode = disk_dict['mode']
6193         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6194     # NIC changes
6195     for nic_op, nic_dict in self.op.nics:
6196       if nic_op == constants.DDM_REMOVE:
6197         # remove the last nic
6198         del instance.nics[-1]
6199         result.append(("nic.%d" % len(instance.nics), "remove"))
6200       elif nic_op == constants.DDM_ADD:
6201         # mac and bridge should be set, by now
6202         mac = nic_dict['mac']
6203         bridge = nic_dict['bridge']
6204         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6205                               bridge=bridge)
6206         instance.nics.append(new_nic)
6207         result.append(("nic.%d" % (len(instance.nics) - 1),
6208                        "add:mac=%s,ip=%s,bridge=%s" %
6209                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6210       else:
6211         # change a given nic
6212         for key in 'mac', 'ip', 'bridge':
6213           if key in nic_dict:
6214             setattr(instance.nics[nic_op], key, nic_dict[key])
6215             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6216
6217     # hvparams changes
6218     if self.op.hvparams:
6219       instance.hvparams = self.hv_inst
6220       for key, val in self.op.hvparams.iteritems():
6221         result.append(("hv/%s" % key, val))
6222
6223     # beparams changes
6224     if self.op.beparams:
6225       instance.beparams = self.be_inst
6226       for key, val in self.op.beparams.iteritems():
6227         result.append(("be/%s" % key, val))
6228
6229     self.cfg.Update(instance)
6230
6231     return result
6232
6233
6234 class LUQueryExports(NoHooksLU):
6235   """Query the exports list
6236
6237   """
6238   _OP_REQP = ['nodes']
6239   REQ_BGL = False
6240
6241   def ExpandNames(self):
6242     self.needed_locks = {}
6243     self.share_locks[locking.LEVEL_NODE] = 1
6244     if not self.op.nodes:
6245       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6246     else:
6247       self.needed_locks[locking.LEVEL_NODE] = \
6248         _GetWantedNodes(self, self.op.nodes)
6249
6250   def CheckPrereq(self):
6251     """Check prerequisites.
6252
6253     """
6254     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6255
6256   def Exec(self, feedback_fn):
6257     """Compute the list of all the exported system images.
6258
6259     @rtype: dict
6260     @return: a dictionary with the structure node->(export-list)
6261         where export-list is a list of the instances exported on
6262         that node.
6263
6264     """
6265     rpcresult = self.rpc.call_export_list(self.nodes)
6266     result = {}
6267     for node in rpcresult:
6268       if rpcresult[node].failed:
6269         result[node] = False
6270       else:
6271         result[node] = rpcresult[node].data
6272
6273     return result
6274
6275
6276 class LUExportInstance(LogicalUnit):
6277   """Export an instance to an image in the cluster.
6278
6279   """
6280   HPATH = "instance-export"
6281   HTYPE = constants.HTYPE_INSTANCE
6282   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6283   REQ_BGL = False
6284
6285   def ExpandNames(self):
6286     self._ExpandAndLockInstance()
6287     # FIXME: lock only instance primary and destination node
6288     #
6289     # Sad but true, for now we have do lock all nodes, as we don't know where
6290     # the previous export might be, and and in this LU we search for it and
6291     # remove it from its current node. In the future we could fix this by:
6292     #  - making a tasklet to search (share-lock all), then create the new one,
6293     #    then one to remove, after
6294     #  - removing the removal operation altogether
6295     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6296
6297   def DeclareLocks(self, level):
6298     """Last minute lock declaration."""
6299     # All nodes are locked anyway, so nothing to do here.
6300
6301   def BuildHooksEnv(self):
6302     """Build hooks env.
6303
6304     This will run on the master, primary node and target node.
6305
6306     """
6307     env = {
6308       "EXPORT_NODE": self.op.target_node,
6309       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6310       }
6311     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6312     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6313           self.op.target_node]
6314     return env, nl, nl
6315
6316   def CheckPrereq(self):
6317     """Check prerequisites.
6318
6319     This checks that the instance and node names are valid.
6320
6321     """
6322     instance_name = self.op.instance_name
6323     self.instance = self.cfg.GetInstanceInfo(instance_name)
6324     assert self.instance is not None, \
6325           "Cannot retrieve locked instance %s" % self.op.instance_name
6326     _CheckNodeOnline(self, self.instance.primary_node)
6327
6328     self.dst_node = self.cfg.GetNodeInfo(
6329       self.cfg.ExpandNodeName(self.op.target_node))
6330
6331     if self.dst_node is None:
6332       # This is wrong node name, not a non-locked node
6333       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6334     _CheckNodeOnline(self, self.dst_node.name)
6335     _CheckNodeNotDrained(self, self.dst_node.name)
6336
6337     # instance disk type verification
6338     for disk in self.instance.disks:
6339       if disk.dev_type == constants.LD_FILE:
6340         raise errors.OpPrereqError("Export not supported for instances with"
6341                                    " file-based disks")
6342
6343   def Exec(self, feedback_fn):
6344     """Export an instance to an image in the cluster.
6345
6346     """
6347     instance = self.instance
6348     dst_node = self.dst_node
6349     src_node = instance.primary_node
6350     if self.op.shutdown:
6351       # shutdown the instance, but not the disks
6352       result = self.rpc.call_instance_shutdown(src_node, instance)
6353       msg = result.RemoteFailMsg()
6354       if msg:
6355         raise errors.OpExecError("Could not shutdown instance %s on"
6356                                  " node %s: %s" %
6357                                  (instance.name, src_node, msg))
6358
6359     vgname = self.cfg.GetVGName()
6360
6361     snap_disks = []
6362
6363     # set the disks ID correctly since call_instance_start needs the
6364     # correct drbd minor to create the symlinks
6365     for disk in instance.disks:
6366       self.cfg.SetDiskID(disk, src_node)
6367
6368     try:
6369       for idx, disk in enumerate(instance.disks):
6370         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6371         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6372         if new_dev_name.failed or not new_dev_name.data:
6373           self.LogWarning("Could not snapshot disk/%d on node %s",
6374                           idx, src_node)
6375           snap_disks.append(False)
6376         else:
6377           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6378                                  logical_id=(vgname, new_dev_name.data),
6379                                  physical_id=(vgname, new_dev_name.data),
6380                                  iv_name=disk.iv_name)
6381           snap_disks.append(new_dev)
6382
6383     finally:
6384       if self.op.shutdown and instance.admin_up:
6385         result = self.rpc.call_instance_start(src_node, instance, None, None)
6386         msg = result.RemoteFailMsg()
6387         if msg:
6388           _ShutdownInstanceDisks(self, instance)
6389           raise errors.OpExecError("Could not start instance: %s" % msg)
6390
6391     # TODO: check for size
6392
6393     cluster_name = self.cfg.GetClusterName()
6394     for idx, dev in enumerate(snap_disks):
6395       if dev:
6396         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6397                                                instance, cluster_name, idx)
6398         if result.failed or not result.data:
6399           self.LogWarning("Could not export disk/%d from node %s to"
6400                           " node %s", idx, src_node, dst_node.name)
6401         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6402         if msg:
6403           self.LogWarning("Could not remove snapshot for disk/%d from node"
6404                           " %s: %s", idx, src_node, msg)
6405
6406     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6407     if result.failed or not result.data:
6408       self.LogWarning("Could not finalize export for instance %s on node %s",
6409                       instance.name, dst_node.name)
6410
6411     nodelist = self.cfg.GetNodeList()
6412     nodelist.remove(dst_node.name)
6413
6414     # on one-node clusters nodelist will be empty after the removal
6415     # if we proceed the backup would be removed because OpQueryExports
6416     # substitutes an empty list with the full cluster node list.
6417     if nodelist:
6418       exportlist = self.rpc.call_export_list(nodelist)
6419       for node in exportlist:
6420         if exportlist[node].failed:
6421           continue
6422         if instance.name in exportlist[node].data:
6423           if not self.rpc.call_export_remove(node, instance.name):
6424             self.LogWarning("Could not remove older export for instance %s"
6425                             " on node %s", instance.name, node)
6426
6427
6428 class LURemoveExport(NoHooksLU):
6429   """Remove exports related to the named instance.
6430
6431   """
6432   _OP_REQP = ["instance_name"]
6433   REQ_BGL = False
6434
6435   def ExpandNames(self):
6436     self.needed_locks = {}
6437     # We need all nodes to be locked in order for RemoveExport to work, but we
6438     # don't need to lock the instance itself, as nothing will happen to it (and
6439     # we can remove exports also for a removed instance)
6440     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6441
6442   def CheckPrereq(self):
6443     """Check prerequisites.
6444     """
6445     pass
6446
6447   def Exec(self, feedback_fn):
6448     """Remove any export.
6449
6450     """
6451     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6452     # If the instance was not found we'll try with the name that was passed in.
6453     # This will only work if it was an FQDN, though.
6454     fqdn_warn = False
6455     if not instance_name:
6456       fqdn_warn = True
6457       instance_name = self.op.instance_name
6458
6459     exportlist = self.rpc.call_export_list(self.acquired_locks[
6460       locking.LEVEL_NODE])
6461     found = False
6462     for node in exportlist:
6463       if exportlist[node].failed:
6464         self.LogWarning("Failed to query node %s, continuing" % node)
6465         continue
6466       if instance_name in exportlist[node].data:
6467         found = True
6468         result = self.rpc.call_export_remove(node, instance_name)
6469         if result.failed or not result.data:
6470           logging.error("Could not remove export for instance %s"
6471                         " on node %s", instance_name, node)
6472
6473     if fqdn_warn and not found:
6474       feedback_fn("Export not found. If trying to remove an export belonging"
6475                   " to a deleted instance please use its Fully Qualified"
6476                   " Domain Name.")
6477
6478
6479 class TagsLU(NoHooksLU):
6480   """Generic tags LU.
6481
6482   This is an abstract class which is the parent of all the other tags LUs.
6483
6484   """
6485
6486   def ExpandNames(self):
6487     self.needed_locks = {}
6488     if self.op.kind == constants.TAG_NODE:
6489       name = self.cfg.ExpandNodeName(self.op.name)
6490       if name is None:
6491         raise errors.OpPrereqError("Invalid node name (%s)" %
6492                                    (self.op.name,))
6493       self.op.name = name
6494       self.needed_locks[locking.LEVEL_NODE] = name
6495     elif self.op.kind == constants.TAG_INSTANCE:
6496       name = self.cfg.ExpandInstanceName(self.op.name)
6497       if name is None:
6498         raise errors.OpPrereqError("Invalid instance name (%s)" %
6499                                    (self.op.name,))
6500       self.op.name = name
6501       self.needed_locks[locking.LEVEL_INSTANCE] = name
6502
6503   def CheckPrereq(self):
6504     """Check prerequisites.
6505
6506     """
6507     if self.op.kind == constants.TAG_CLUSTER:
6508       self.target = self.cfg.GetClusterInfo()
6509     elif self.op.kind == constants.TAG_NODE:
6510       self.target = self.cfg.GetNodeInfo(self.op.name)
6511     elif self.op.kind == constants.TAG_INSTANCE:
6512       self.target = self.cfg.GetInstanceInfo(self.op.name)
6513     else:
6514       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6515                                  str(self.op.kind))
6516
6517
6518 class LUGetTags(TagsLU):
6519   """Returns the tags of a given object.
6520
6521   """
6522   _OP_REQP = ["kind", "name"]
6523   REQ_BGL = False
6524
6525   def Exec(self, feedback_fn):
6526     """Returns the tag list.
6527
6528     """
6529     return list(self.target.GetTags())
6530
6531
6532 class LUSearchTags(NoHooksLU):
6533   """Searches the tags for a given pattern.
6534
6535   """
6536   _OP_REQP = ["pattern"]
6537   REQ_BGL = False
6538
6539   def ExpandNames(self):
6540     self.needed_locks = {}
6541
6542   def CheckPrereq(self):
6543     """Check prerequisites.
6544
6545     This checks the pattern passed for validity by compiling it.
6546
6547     """
6548     try:
6549       self.re = re.compile(self.op.pattern)
6550     except re.error, err:
6551       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6552                                  (self.op.pattern, err))
6553
6554   def Exec(self, feedback_fn):
6555     """Returns the tag list.
6556
6557     """
6558     cfg = self.cfg
6559     tgts = [("/cluster", cfg.GetClusterInfo())]
6560     ilist = cfg.GetAllInstancesInfo().values()
6561     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6562     nlist = cfg.GetAllNodesInfo().values()
6563     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6564     results = []
6565     for path, target in tgts:
6566       for tag in target.GetTags():
6567         if self.re.search(tag):
6568           results.append((path, tag))
6569     return results
6570
6571
6572 class LUAddTags(TagsLU):
6573   """Sets a tag on a given object.
6574
6575   """
6576   _OP_REQP = ["kind", "name", "tags"]
6577   REQ_BGL = False
6578
6579   def CheckPrereq(self):
6580     """Check prerequisites.
6581
6582     This checks the type and length of the tag name and value.
6583
6584     """
6585     TagsLU.CheckPrereq(self)
6586     for tag in self.op.tags:
6587       objects.TaggableObject.ValidateTag(tag)
6588
6589   def Exec(self, feedback_fn):
6590     """Sets the tag.
6591
6592     """
6593     try:
6594       for tag in self.op.tags:
6595         self.target.AddTag(tag)
6596     except errors.TagError, err:
6597       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6598     try:
6599       self.cfg.Update(self.target)
6600     except errors.ConfigurationError:
6601       raise errors.OpRetryError("There has been a modification to the"
6602                                 " config file and the operation has been"
6603                                 " aborted. Please retry.")
6604
6605
6606 class LUDelTags(TagsLU):
6607   """Delete a list of tags from a given object.
6608
6609   """
6610   _OP_REQP = ["kind", "name", "tags"]
6611   REQ_BGL = False
6612
6613   def CheckPrereq(self):
6614     """Check prerequisites.
6615
6616     This checks that we have the given tag.
6617
6618     """
6619     TagsLU.CheckPrereq(self)
6620     for tag in self.op.tags:
6621       objects.TaggableObject.ValidateTag(tag)
6622     del_tags = frozenset(self.op.tags)
6623     cur_tags = self.target.GetTags()
6624     if not del_tags <= cur_tags:
6625       diff_tags = del_tags - cur_tags
6626       diff_names = ["'%s'" % tag for tag in diff_tags]
6627       diff_names.sort()
6628       raise errors.OpPrereqError("Tag(s) %s not found" %
6629                                  (",".join(diff_names)))
6630
6631   def Exec(self, feedback_fn):
6632     """Remove the tag from the object.
6633
6634     """
6635     for tag in self.op.tags:
6636       self.target.RemoveTag(tag)
6637     try:
6638       self.cfg.Update(self.target)
6639     except errors.ConfigurationError:
6640       raise errors.OpRetryError("There has been a modification to the"
6641                                 " config file and the operation has been"
6642                                 " aborted. Please retry.")
6643
6644
6645 class LUTestDelay(NoHooksLU):
6646   """Sleep for a specified amount of time.
6647
6648   This LU sleeps on the master and/or nodes for a specified amount of
6649   time.
6650
6651   """
6652   _OP_REQP = ["duration", "on_master", "on_nodes"]
6653   REQ_BGL = False
6654
6655   def ExpandNames(self):
6656     """Expand names and set required locks.
6657
6658     This expands the node list, if any.
6659
6660     """
6661     self.needed_locks = {}
6662     if self.op.on_nodes:
6663       # _GetWantedNodes can be used here, but is not always appropriate to use
6664       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6665       # more information.
6666       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6667       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6668
6669   def CheckPrereq(self):
6670     """Check prerequisites.
6671
6672     """
6673
6674   def Exec(self, feedback_fn):
6675     """Do the actual sleep.
6676
6677     """
6678     if self.op.on_master:
6679       if not utils.TestDelay(self.op.duration):
6680         raise errors.OpExecError("Error during master delay test")
6681     if self.op.on_nodes:
6682       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6683       if not result:
6684         raise errors.OpExecError("Complete failure from rpc call")
6685       for node, node_result in result.items():
6686         node_result.Raise()
6687         if not node_result.data:
6688           raise errors.OpExecError("Failure during rpc call to node %s,"
6689                                    " result: %s" % (node, node_result.data))
6690
6691
6692 class IAllocator(object):
6693   """IAllocator framework.
6694
6695   An IAllocator instance has three sets of attributes:
6696     - cfg that is needed to query the cluster
6697     - input data (all members of the _KEYS class attribute are required)
6698     - four buffer attributes (in|out_data|text), that represent the
6699       input (to the external script) in text and data structure format,
6700       and the output from it, again in two formats
6701     - the result variables from the script (success, info, nodes) for
6702       easy usage
6703
6704   """
6705   _ALLO_KEYS = [
6706     "mem_size", "disks", "disk_template",
6707     "os", "tags", "nics", "vcpus", "hypervisor",
6708     ]
6709   _RELO_KEYS = [
6710     "relocate_from",
6711     ]
6712
6713   def __init__(self, lu, mode, name, **kwargs):
6714     self.lu = lu
6715     # init buffer variables
6716     self.in_text = self.out_text = self.in_data = self.out_data = None
6717     # init all input fields so that pylint is happy
6718     self.mode = mode
6719     self.name = name
6720     self.mem_size = self.disks = self.disk_template = None
6721     self.os = self.tags = self.nics = self.vcpus = None
6722     self.hypervisor = None
6723     self.relocate_from = None
6724     # computed fields
6725     self.required_nodes = None
6726     # init result fields
6727     self.success = self.info = self.nodes = None
6728     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6729       keyset = self._ALLO_KEYS
6730     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6731       keyset = self._RELO_KEYS
6732     else:
6733       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6734                                    " IAllocator" % self.mode)
6735     for key in kwargs:
6736       if key not in keyset:
6737         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6738                                      " IAllocator" % key)
6739       setattr(self, key, kwargs[key])
6740     for key in keyset:
6741       if key not in kwargs:
6742         raise errors.ProgrammerError("Missing input parameter '%s' to"
6743                                      " IAllocator" % key)
6744     self._BuildInputData()
6745
6746   def _ComputeClusterData(self):
6747     """Compute the generic allocator input data.
6748
6749     This is the data that is independent of the actual operation.
6750
6751     """
6752     cfg = self.lu.cfg
6753     cluster_info = cfg.GetClusterInfo()
6754     # cluster data
6755     data = {
6756       "version": constants.IALLOCATOR_VERSION,
6757       "cluster_name": cfg.GetClusterName(),
6758       "cluster_tags": list(cluster_info.GetTags()),
6759       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6760       # we don't have job IDs
6761       }
6762     iinfo = cfg.GetAllInstancesInfo().values()
6763     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6764
6765     # node data
6766     node_results = {}
6767     node_list = cfg.GetNodeList()
6768
6769     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6770       hypervisor_name = self.hypervisor
6771     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6772       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6773
6774     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6775                                            hypervisor_name)
6776     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6777                        cluster_info.enabled_hypervisors)
6778     for nname, nresult in node_data.items():
6779       # first fill in static (config-based) values
6780       ninfo = cfg.GetNodeInfo(nname)
6781       pnr = {
6782         "tags": list(ninfo.GetTags()),
6783         "primary_ip": ninfo.primary_ip,
6784         "secondary_ip": ninfo.secondary_ip,
6785         "offline": ninfo.offline,
6786         "drained": ninfo.drained,
6787         "master_candidate": ninfo.master_candidate,
6788         }
6789
6790       if not ninfo.offline:
6791         nresult.Raise()
6792         if not isinstance(nresult.data, dict):
6793           raise errors.OpExecError("Can't get data for node %s" % nname)
6794         remote_info = nresult.data
6795         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6796                      'vg_size', 'vg_free', 'cpu_total']:
6797           if attr not in remote_info:
6798             raise errors.OpExecError("Node '%s' didn't return attribute"
6799                                      " '%s'" % (nname, attr))
6800           try:
6801             remote_info[attr] = int(remote_info[attr])
6802           except ValueError, err:
6803             raise errors.OpExecError("Node '%s' returned invalid value"
6804                                      " for '%s': %s" % (nname, attr, err))
6805         # compute memory used by primary instances
6806         i_p_mem = i_p_up_mem = 0
6807         for iinfo, beinfo in i_list:
6808           if iinfo.primary_node == nname:
6809             i_p_mem += beinfo[constants.BE_MEMORY]
6810             if iinfo.name not in node_iinfo[nname].data:
6811               i_used_mem = 0
6812             else:
6813               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6814             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6815             remote_info['memory_free'] -= max(0, i_mem_diff)
6816
6817             if iinfo.admin_up:
6818               i_p_up_mem += beinfo[constants.BE_MEMORY]
6819
6820         # compute memory used by instances
6821         pnr_dyn = {
6822           "total_memory": remote_info['memory_total'],
6823           "reserved_memory": remote_info['memory_dom0'],
6824           "free_memory": remote_info['memory_free'],
6825           "total_disk": remote_info['vg_size'],
6826           "free_disk": remote_info['vg_free'],
6827           "total_cpus": remote_info['cpu_total'],
6828           "i_pri_memory": i_p_mem,
6829           "i_pri_up_memory": i_p_up_mem,
6830           }
6831         pnr.update(pnr_dyn)
6832
6833       node_results[nname] = pnr
6834     data["nodes"] = node_results
6835
6836     # instance data
6837     instance_data = {}
6838     for iinfo, beinfo in i_list:
6839       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6840                   for n in iinfo.nics]
6841       pir = {
6842         "tags": list(iinfo.GetTags()),
6843         "admin_up": iinfo.admin_up,
6844         "vcpus": beinfo[constants.BE_VCPUS],
6845         "memory": beinfo[constants.BE_MEMORY],
6846         "os": iinfo.os,
6847         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6848         "nics": nic_data,
6849         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6850         "disk_template": iinfo.disk_template,
6851         "hypervisor": iinfo.hypervisor,
6852         }
6853       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6854                                                  pir["disks"])
6855       instance_data[iinfo.name] = pir
6856
6857     data["instances"] = instance_data
6858
6859     self.in_data = data
6860
6861   def _AddNewInstance(self):
6862     """Add new instance data to allocator structure.
6863
6864     This in combination with _AllocatorGetClusterData will create the
6865     correct structure needed as input for the allocator.
6866
6867     The checks for the completeness of the opcode must have already been
6868     done.
6869
6870     """
6871     data = self.in_data
6872
6873     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6874
6875     if self.disk_template in constants.DTS_NET_MIRROR:
6876       self.required_nodes = 2
6877     else:
6878       self.required_nodes = 1
6879     request = {
6880       "type": "allocate",
6881       "name": self.name,
6882       "disk_template": self.disk_template,
6883       "tags": self.tags,
6884       "os": self.os,
6885       "vcpus": self.vcpus,
6886       "memory": self.mem_size,
6887       "disks": self.disks,
6888       "disk_space_total": disk_space,
6889       "nics": self.nics,
6890       "required_nodes": self.required_nodes,
6891       }
6892     data["request"] = request
6893
6894   def _AddRelocateInstance(self):
6895     """Add relocate instance data to allocator structure.
6896
6897     This in combination with _IAllocatorGetClusterData will create the
6898     correct structure needed as input for the allocator.
6899
6900     The checks for the completeness of the opcode must have already been
6901     done.
6902
6903     """
6904     instance = self.lu.cfg.GetInstanceInfo(self.name)
6905     if instance is None:
6906       raise errors.ProgrammerError("Unknown instance '%s' passed to"
6907                                    " IAllocator" % self.name)
6908
6909     if instance.disk_template not in constants.DTS_NET_MIRROR:
6910       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
6911
6912     if len(instance.secondary_nodes) != 1:
6913       raise errors.OpPrereqError("Instance has not exactly one secondary node")
6914
6915     self.required_nodes = 1
6916     disk_sizes = [{'size': disk.size} for disk in instance.disks]
6917     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
6918
6919     request = {
6920       "type": "relocate",
6921       "name": self.name,
6922       "disk_space_total": disk_space,
6923       "required_nodes": self.required_nodes,
6924       "relocate_from": self.relocate_from,
6925       }
6926     self.in_data["request"] = request
6927
6928   def _BuildInputData(self):
6929     """Build input data structures.
6930
6931     """
6932     self._ComputeClusterData()
6933
6934     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6935       self._AddNewInstance()
6936     else:
6937       self._AddRelocateInstance()
6938
6939     self.in_text = serializer.Dump(self.in_data)
6940
6941   def Run(self, name, validate=True, call_fn=None):
6942     """Run an instance allocator and return the results.
6943
6944     """
6945     if call_fn is None:
6946       call_fn = self.lu.rpc.call_iallocator_runner
6947
6948     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
6949     result.Raise()
6950
6951     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
6952       raise errors.OpExecError("Invalid result from master iallocator runner")
6953
6954     rcode, stdout, stderr, fail = result.data
6955
6956     if rcode == constants.IARUN_NOTFOUND:
6957       raise errors.OpExecError("Can't find allocator '%s'" % name)
6958     elif rcode == constants.IARUN_FAILURE:
6959       raise errors.OpExecError("Instance allocator call failed: %s,"
6960                                " output: %s" % (fail, stdout+stderr))
6961     self.out_text = stdout
6962     if validate:
6963       self._ValidateResult()
6964
6965   def _ValidateResult(self):
6966     """Process the allocator results.
6967
6968     This will process and if successful save the result in
6969     self.out_data and the other parameters.
6970
6971     """
6972     try:
6973       rdict = serializer.Load(self.out_text)
6974     except Exception, err:
6975       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
6976
6977     if not isinstance(rdict, dict):
6978       raise errors.OpExecError("Can't parse iallocator results: not a dict")
6979
6980     for key in "success", "info", "nodes":
6981       if key not in rdict:
6982         raise errors.OpExecError("Can't parse iallocator results:"
6983                                  " missing key '%s'" % key)
6984       setattr(self, key, rdict[key])
6985
6986     if not isinstance(rdict["nodes"], list):
6987       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
6988                                " is not a list")
6989     self.out_data = rdict
6990
6991
6992 class LUTestAllocator(NoHooksLU):
6993   """Run allocator tests.
6994
6995   This LU runs the allocator tests
6996
6997   """
6998   _OP_REQP = ["direction", "mode", "name"]
6999
7000   def CheckPrereq(self):
7001     """Check prerequisites.
7002
7003     This checks the opcode parameters depending on the director and mode test.
7004
7005     """
7006     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7007       for attr in ["name", "mem_size", "disks", "disk_template",
7008                    "os", "tags", "nics", "vcpus"]:
7009         if not hasattr(self.op, attr):
7010           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7011                                      attr)
7012       iname = self.cfg.ExpandInstanceName(self.op.name)
7013       if iname is not None:
7014         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7015                                    iname)
7016       if not isinstance(self.op.nics, list):
7017         raise errors.OpPrereqError("Invalid parameter 'nics'")
7018       for row in self.op.nics:
7019         if (not isinstance(row, dict) or
7020             "mac" not in row or
7021             "ip" not in row or
7022             "bridge" not in row):
7023           raise errors.OpPrereqError("Invalid contents of the"
7024                                      " 'nics' parameter")
7025       if not isinstance(self.op.disks, list):
7026         raise errors.OpPrereqError("Invalid parameter 'disks'")
7027       for row in self.op.disks:
7028         if (not isinstance(row, dict) or
7029             "size" not in row or
7030             not isinstance(row["size"], int) or
7031             "mode" not in row or
7032             row["mode"] not in ['r', 'w']):
7033           raise errors.OpPrereqError("Invalid contents of the"
7034                                      " 'disks' parameter")
7035       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7036         self.op.hypervisor = self.cfg.GetHypervisorType()
7037     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7038       if not hasattr(self.op, "name"):
7039         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7040       fname = self.cfg.ExpandInstanceName(self.op.name)
7041       if fname is None:
7042         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7043                                    self.op.name)
7044       self.op.name = fname
7045       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7046     else:
7047       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7048                                  self.op.mode)
7049
7050     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7051       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7052         raise errors.OpPrereqError("Missing allocator name")
7053     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7054       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7055                                  self.op.direction)
7056
7057   def Exec(self, feedback_fn):
7058     """Run the allocator test.
7059
7060     """
7061     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7062       ial = IAllocator(self,
7063                        mode=self.op.mode,
7064                        name=self.op.name,
7065                        mem_size=self.op.mem_size,
7066                        disks=self.op.disks,
7067                        disk_template=self.op.disk_template,
7068                        os=self.op.os,
7069                        tags=self.op.tags,
7070                        nics=self.op.nics,
7071                        vcpus=self.op.vcpus,
7072                        hypervisor=self.op.hypervisor,
7073                        )
7074     else:
7075       ial = IAllocator(self,
7076                        mode=self.op.mode,
7077                        name=self.op.name,
7078                        relocate_from=list(self.relocate_from),
7079                        )
7080
7081     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7082       result = ial.in_text
7083     else:
7084       ial.Run(self.op.allocator, validate=False)
7085       result = ial.out_text
7086     return result