Add cluster-init --no-etc-hosts parameter
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import re
30 import platform
31 import logging
32 import copy
33
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
43
44
45 class LogicalUnit(object):
46   """Logical Unit base class.
47
48   Subclasses must follow these rules:
49     - implement ExpandNames
50     - implement CheckPrereq
51     - implement Exec
52     - implement BuildHooksEnv
53     - redefine HPATH and HTYPE
54     - optionally redefine their run requirements:
55         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56
57   Note that all commands require root permissions.
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_BGL = True
64
65   def __init__(self, processor, op, context, rpc):
66     """Constructor for LogicalUnit.
67
68     This needs to be overridden in derived classes in order to check op
69     validity.
70
71     """
72     self.proc = processor
73     self.op = op
74     self.cfg = context.cfg
75     self.context = context
76     self.rpc = rpc
77     # Dicts used to declare locking needs to mcpu
78     self.needed_locks = None
79     self.acquired_locks = {}
80     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
81     self.add_locks = {}
82     self.remove_locks = {}
83     # Used to force good behavior when calling helper functions
84     self.recalculate_locks = {}
85     self.__ssh = None
86     # logging
87     self.LogWarning = processor.LogWarning
88     self.LogInfo = processor.LogInfo
89
90     for attr_name in self._OP_REQP:
91       attr_val = getattr(op, attr_name, None)
92       if attr_val is None:
93         raise errors.OpPrereqError("Required parameter '%s' missing" %
94                                    attr_name)
95     self.CheckArguments()
96
97   def __GetSSH(self):
98     """Returns the SshRunner object
99
100     """
101     if not self.__ssh:
102       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
103     return self.__ssh
104
105   ssh = property(fget=__GetSSH)
106
107   def CheckArguments(self):
108     """Check syntactic validity for the opcode arguments.
109
110     This method is for doing a simple syntactic check and ensure
111     validity of opcode parameters, without any cluster-related
112     checks. While the same can be accomplished in ExpandNames and/or
113     CheckPrereq, doing these separate is better because:
114
115       - ExpandNames is left as as purely a lock-related function
116       - CheckPrereq is run after we have acquired locks (and possible
117         waited for them)
118
119     The function is allowed to change the self.op attribute so that
120     later methods can no longer worry about missing parameters.
121
122     """
123     pass
124
125   def ExpandNames(self):
126     """Expand names for this LU.
127
128     This method is called before starting to execute the opcode, and it should
129     update all the parameters of the opcode to their canonical form (e.g. a
130     short node name must be fully expanded after this method has successfully
131     completed). This way locking, hooks, logging, ecc. can work correctly.
132
133     LUs which implement this method must also populate the self.needed_locks
134     member, as a dict with lock levels as keys, and a list of needed lock names
135     as values. Rules:
136
137       - use an empty dict if you don't need any lock
138       - if you don't need any lock at a particular level omit that level
139       - don't put anything for the BGL level
140       - if you want all locks at a level use locking.ALL_SET as a value
141
142     If you need to share locks (rather than acquire them exclusively) at one
143     level you can modify self.share_locks, setting a true value (usually 1) for
144     that level. By default locks are not shared.
145
146     Examples::
147
148       # Acquire all nodes and one instance
149       self.needed_locks = {
150         locking.LEVEL_NODE: locking.ALL_SET,
151         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
152       }
153       # Acquire just two nodes
154       self.needed_locks = {
155         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
156       }
157       # Acquire no locks
158       self.needed_locks = {} # No, you can't leave it to the default value None
159
160     """
161     # The implementation of this method is mandatory only if the new LU is
162     # concurrent, so that old LUs don't need to be changed all at the same
163     # time.
164     if self.REQ_BGL:
165       self.needed_locks = {} # Exclusive LUs don't need locks.
166     else:
167       raise NotImplementedError
168
169   def DeclareLocks(self, level):
170     """Declare LU locking needs for a level
171
172     While most LUs can just declare their locking needs at ExpandNames time,
173     sometimes there's the need to calculate some locks after having acquired
174     the ones before. This function is called just before acquiring locks at a
175     particular level, but after acquiring the ones at lower levels, and permits
176     such calculations. It can be used to modify self.needed_locks, and by
177     default it does nothing.
178
179     This function is only called if you have something already set in
180     self.needed_locks for the level.
181
182     @param level: Locking level which is going to be locked
183     @type level: member of ganeti.locking.LEVELS
184
185     """
186
187   def CheckPrereq(self):
188     """Check prerequisites for this LU.
189
190     This method should check that the prerequisites for the execution
191     of this LU are fulfilled. It can do internode communication, but
192     it should be idempotent - no cluster or system changes are
193     allowed.
194
195     The method should raise errors.OpPrereqError in case something is
196     not fulfilled. Its return value is ignored.
197
198     This method should also update all the parameters of the opcode to
199     their canonical form if it hasn't been done by ExpandNames before.
200
201     """
202     raise NotImplementedError
203
204   def Exec(self, feedback_fn):
205     """Execute the LU.
206
207     This method should implement the actual work. It should raise
208     errors.OpExecError for failures that are somewhat dealt with in
209     code, or expected.
210
211     """
212     raise NotImplementedError
213
214   def BuildHooksEnv(self):
215     """Build hooks environment for this LU.
216
217     This method should return a three-node tuple consisting of: a dict
218     containing the environment that will be used for running the
219     specific hook for this LU, a list of node names on which the hook
220     should run before the execution, and a list of node names on which
221     the hook should run after the execution.
222
223     The keys of the dict must not have 'GANETI_' prefixed as this will
224     be handled in the hooks runner. Also note additional keys will be
225     added by the hooks runner. If the LU doesn't define any
226     environment, an empty dict (and not None) should be returned.
227
228     No nodes should be returned as an empty list (and not None).
229
230     Note that if the HPATH for a LU class is None, this function will
231     not be called.
232
233     """
234     raise NotImplementedError
235
236   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
237     """Notify the LU about the results of its hooks.
238
239     This method is called every time a hooks phase is executed, and notifies
240     the Logical Unit about the hooks' result. The LU can then use it to alter
241     its result based on the hooks.  By default the method does nothing and the
242     previous result is passed back unchanged but any LU can define it if it
243     wants to use the local cluster hook-scripts somehow.
244
245     @param phase: one of L{constants.HOOKS_PHASE_POST} or
246         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
247     @param hook_results: the results of the multi-node hooks rpc call
248     @param feedback_fn: function used send feedback back to the caller
249     @param lu_result: the previous Exec result this LU had, or None
250         in the PRE phase
251     @return: the new Exec result, based on the previous result
252         and hook results
253
254     """
255     return lu_result
256
257   def _ExpandAndLockInstance(self):
258     """Helper function to expand and lock an instance.
259
260     Many LUs that work on an instance take its name in self.op.instance_name
261     and need to expand it and then declare the expanded name for locking. This
262     function does it, and then updates self.op.instance_name to the expanded
263     name. It also initializes needed_locks as a dict, if this hasn't been done
264     before.
265
266     """
267     if self.needed_locks is None:
268       self.needed_locks = {}
269     else:
270       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
271         "_ExpandAndLockInstance called with instance-level locks set"
272     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
273     if expanded_name is None:
274       raise errors.OpPrereqError("Instance '%s' not known" %
275                                   self.op.instance_name)
276     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
277     self.op.instance_name = expanded_name
278
279   def _LockInstancesNodes(self, primary_only=False):
280     """Helper function to declare instances' nodes for locking.
281
282     This function should be called after locking one or more instances to lock
283     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
284     with all primary or secondary nodes for instances already locked and
285     present in self.needed_locks[locking.LEVEL_INSTANCE].
286
287     It should be called from DeclareLocks, and for safety only works if
288     self.recalculate_locks[locking.LEVEL_NODE] is set.
289
290     In the future it may grow parameters to just lock some instance's nodes, or
291     to just lock primaries or secondary nodes, if needed.
292
293     If should be called in DeclareLocks in a way similar to::
294
295       if level == locking.LEVEL_NODE:
296         self._LockInstancesNodes()
297
298     @type primary_only: boolean
299     @param primary_only: only lock primary nodes of locked instances
300
301     """
302     assert locking.LEVEL_NODE in self.recalculate_locks, \
303       "_LockInstancesNodes helper function called with no nodes to recalculate"
304
305     # TODO: check if we're really been called with the instance locks held
306
307     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
308     # future we might want to have different behaviors depending on the value
309     # of self.recalculate_locks[locking.LEVEL_NODE]
310     wanted_nodes = []
311     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
312       instance = self.context.cfg.GetInstanceInfo(instance_name)
313       wanted_nodes.append(instance.primary_node)
314       if not primary_only:
315         wanted_nodes.extend(instance.secondary_nodes)
316
317     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
318       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
319     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
320       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
321
322     del self.recalculate_locks[locking.LEVEL_NODE]
323
324
325 class NoHooksLU(LogicalUnit):
326   """Simple LU which runs no hooks.
327
328   This LU is intended as a parent for other LogicalUnits which will
329   run no hooks, in order to reduce duplicate code.
330
331   """
332   HPATH = None
333   HTYPE = None
334
335
336 def _GetWantedNodes(lu, nodes):
337   """Returns list of checked and expanded node names.
338
339   @type lu: L{LogicalUnit}
340   @param lu: the logical unit on whose behalf we execute
341   @type nodes: list
342   @param nodes: list of node names or None for all nodes
343   @rtype: list
344   @return: the list of nodes, sorted
345   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
346
347   """
348   if not isinstance(nodes, list):
349     raise errors.OpPrereqError("Invalid argument type 'nodes'")
350
351   if not nodes:
352     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
353       " non-empty list of nodes whose name is to be expanded.")
354
355   wanted = []
356   for name in nodes:
357     node = lu.cfg.ExpandNodeName(name)
358     if node is None:
359       raise errors.OpPrereqError("No such node name '%s'" % name)
360     wanted.append(node)
361
362   return utils.NiceSort(wanted)
363
364
365 def _GetWantedInstances(lu, instances):
366   """Returns list of checked and expanded instance names.
367
368   @type lu: L{LogicalUnit}
369   @param lu: the logical unit on whose behalf we execute
370   @type instances: list
371   @param instances: list of instance names or None for all instances
372   @rtype: list
373   @return: the list of instances, sorted
374   @raise errors.OpPrereqError: if the instances parameter is wrong type
375   @raise errors.OpPrereqError: if any of the passed instances is not found
376
377   """
378   if not isinstance(instances, list):
379     raise errors.OpPrereqError("Invalid argument type 'instances'")
380
381   if instances:
382     wanted = []
383
384     for name in instances:
385       instance = lu.cfg.ExpandInstanceName(name)
386       if instance is None:
387         raise errors.OpPrereqError("No such instance name '%s'" % name)
388       wanted.append(instance)
389
390   else:
391     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
392   return wanted
393
394
395 def _CheckOutputFields(static, dynamic, selected):
396   """Checks whether all selected fields are valid.
397
398   @type static: L{utils.FieldSet}
399   @param static: static fields set
400   @type dynamic: L{utils.FieldSet}
401   @param dynamic: dynamic fields set
402
403   """
404   f = utils.FieldSet()
405   f.Extend(static)
406   f.Extend(dynamic)
407
408   delta = f.NonMatching(selected)
409   if delta:
410     raise errors.OpPrereqError("Unknown output fields selected: %s"
411                                % ",".join(delta))
412
413
414 def _CheckBooleanOpField(op, name):
415   """Validates boolean opcode parameters.
416
417   This will ensure that an opcode parameter is either a boolean value,
418   or None (but that it always exists).
419
420   """
421   val = getattr(op, name, None)
422   if not (val is None or isinstance(val, bool)):
423     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
424                                (name, str(val)))
425   setattr(op, name, val)
426
427
428 def _CheckNodeOnline(lu, node):
429   """Ensure that a given node is online.
430
431   @param lu: the LU on behalf of which we make the check
432   @param node: the node to check
433   @raise errors.OpPrereqError: if the node is offline
434
435   """
436   if lu.cfg.GetNodeInfo(node).offline:
437     raise errors.OpPrereqError("Can't use offline node %s" % node)
438
439
440 def _CheckNodeNotDrained(lu, node):
441   """Ensure that a given node is not drained.
442
443   @param lu: the LU on behalf of which we make the check
444   @param node: the node to check
445   @raise errors.OpPrereqError: if the node is drained
446
447   """
448   if lu.cfg.GetNodeInfo(node).drained:
449     raise errors.OpPrereqError("Can't use drained node %s" % node)
450
451
452 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
453                           memory, vcpus, nics, disk_template, disks,
454                           bep, hvp, hypervisor_name):
455   """Builds instance related env variables for hooks
456
457   This builds the hook environment from individual variables.
458
459   @type name: string
460   @param name: the name of the instance
461   @type primary_node: string
462   @param primary_node: the name of the instance's primary node
463   @type secondary_nodes: list
464   @param secondary_nodes: list of secondary nodes as strings
465   @type os_type: string
466   @param os_type: the name of the instance's OS
467   @type status: boolean
468   @param status: the should_run status of the instance
469   @type memory: string
470   @param memory: the memory size of the instance
471   @type vcpus: string
472   @param vcpus: the count of VCPUs the instance has
473   @type nics: list
474   @param nics: list of tuples (ip, bridge, mac) representing
475       the NICs the instance  has
476   @type disk_template: string
477   @param disk_template: the disk template of the instance
478   @type disks: list
479   @param disks: the list of (size, mode) pairs
480   @type bep: dict
481   @param bep: the backend parameters for the instance
482   @type hvp: dict
483   @param hvp: the hypervisor parameters for the instance
484   @type hypervisor_name: string
485   @param hypervisor_name: the hypervisor for the instance
486   @rtype: dict
487   @return: the hook environment for this instance
488
489   """
490   if status:
491     str_status = "up"
492   else:
493     str_status = "down"
494   env = {
495     "OP_TARGET": name,
496     "INSTANCE_NAME": name,
497     "INSTANCE_PRIMARY": primary_node,
498     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
499     "INSTANCE_OS_TYPE": os_type,
500     "INSTANCE_STATUS": str_status,
501     "INSTANCE_MEMORY": memory,
502     "INSTANCE_VCPUS": vcpus,
503     "INSTANCE_DISK_TEMPLATE": disk_template,
504     "INSTANCE_HYPERVISOR": hypervisor_name,
505   }
506
507   if nics:
508     nic_count = len(nics)
509     for idx, (ip, bridge, mac) in enumerate(nics):
510       if ip is None:
511         ip = ""
512       env["INSTANCE_NIC%d_IP" % idx] = ip
513       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
514       env["INSTANCE_NIC%d_MAC" % idx] = mac
515   else:
516     nic_count = 0
517
518   env["INSTANCE_NIC_COUNT"] = nic_count
519
520   if disks:
521     disk_count = len(disks)
522     for idx, (size, mode) in enumerate(disks):
523       env["INSTANCE_DISK%d_SIZE" % idx] = size
524       env["INSTANCE_DISK%d_MODE" % idx] = mode
525   else:
526     disk_count = 0
527
528   env["INSTANCE_DISK_COUNT"] = disk_count
529
530   for source, kind in [(bep, "BE"), (hvp, "HV")]:
531     for key, value in source.items():
532       env["INSTANCE_%s_%s" % (kind, key)] = value
533
534   return env
535
536
537 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
538   """Builds instance related env variables for hooks from an object.
539
540   @type lu: L{LogicalUnit}
541   @param lu: the logical unit on whose behalf we execute
542   @type instance: L{objects.Instance}
543   @param instance: the instance for which we should build the
544       environment
545   @type override: dict
546   @param override: dictionary with key/values that will override
547       our values
548   @rtype: dict
549   @return: the hook environment dictionary
550
551   """
552   cluster = lu.cfg.GetClusterInfo()
553   bep = cluster.FillBE(instance)
554   hvp = cluster.FillHV(instance)
555   args = {
556     'name': instance.name,
557     'primary_node': instance.primary_node,
558     'secondary_nodes': instance.secondary_nodes,
559     'os_type': instance.os,
560     'status': instance.admin_up,
561     'memory': bep[constants.BE_MEMORY],
562     'vcpus': bep[constants.BE_VCPUS],
563     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
564     'disk_template': instance.disk_template,
565     'disks': [(disk.size, disk.mode) for disk in instance.disks],
566     'bep': bep,
567     'hvp': hvp,
568     'hypervisor_name': instance.hypervisor,
569   }
570   if override:
571     args.update(override)
572   return _BuildInstanceHookEnv(**args)
573
574
575 def _AdjustCandidatePool(lu):
576   """Adjust the candidate pool after node operations.
577
578   """
579   mod_list = lu.cfg.MaintainCandidatePool()
580   if mod_list:
581     lu.LogInfo("Promoted nodes to master candidate role: %s",
582                ", ".join(node.name for node in mod_list))
583     for name in mod_list:
584       lu.context.ReaddNode(name)
585   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
586   if mc_now > mc_max:
587     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
588                (mc_now, mc_max))
589
590
591 def _CheckInstanceBridgesExist(lu, instance):
592   """Check that the bridges needed by an instance exist.
593
594   """
595   # check bridges existence
596   brlist = [nic.bridge for nic in instance.nics]
597   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
598   result.Raise()
599   if not result.data:
600     raise errors.OpPrereqError("One or more target bridges %s does not"
601                                " exist on destination node '%s'" %
602                                (brlist, instance.primary_node))
603
604
605 class LUDestroyCluster(NoHooksLU):
606   """Logical unit for destroying the cluster.
607
608   """
609   _OP_REQP = []
610
611   def CheckPrereq(self):
612     """Check prerequisites.
613
614     This checks whether the cluster is empty.
615
616     Any errors are signaled by raising errors.OpPrereqError.
617
618     """
619     master = self.cfg.GetMasterNode()
620
621     nodelist = self.cfg.GetNodeList()
622     if len(nodelist) != 1 or nodelist[0] != master:
623       raise errors.OpPrereqError("There are still %d node(s) in"
624                                  " this cluster." % (len(nodelist) - 1))
625     instancelist = self.cfg.GetInstanceList()
626     if instancelist:
627       raise errors.OpPrereqError("There are still %d instance(s) in"
628                                  " this cluster." % len(instancelist))
629
630   def Exec(self, feedback_fn):
631     """Destroys the cluster.
632
633     """
634     master = self.cfg.GetMasterNode()
635     result = self.rpc.call_node_stop_master(master, False)
636     result.Raise()
637     if not result.data:
638       raise errors.OpExecError("Could not disable the master role")
639     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
640     utils.CreateBackup(priv_key)
641     utils.CreateBackup(pub_key)
642     return master
643
644
645 class LUVerifyCluster(LogicalUnit):
646   """Verifies the cluster status.
647
648   """
649   HPATH = "cluster-verify"
650   HTYPE = constants.HTYPE_CLUSTER
651   _OP_REQP = ["skip_checks"]
652   REQ_BGL = False
653
654   def ExpandNames(self):
655     self.needed_locks = {
656       locking.LEVEL_NODE: locking.ALL_SET,
657       locking.LEVEL_INSTANCE: locking.ALL_SET,
658     }
659     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
660
661   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
662                   node_result, feedback_fn, master_files,
663                   drbd_map, vg_name):
664     """Run multiple tests against a node.
665
666     Test list:
667
668       - compares ganeti version
669       - checks vg existence and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     @type nodeinfo: L{objects.Node}
674     @param nodeinfo: the node to check
675     @param file_list: required list of files
676     @param local_cksum: dictionary of local files and their checksums
677     @param node_result: the results from the node
678     @param feedback_fn: function used to accumulate results
679     @param master_files: list of files that only masters should have
680     @param drbd_map: the useddrbd minors for this node, in
681         form of minor: (instance, must_exist) which correspond to instances
682         and their running status
683     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
684
685     """
686     node = nodeinfo.name
687
688     # main result, node_result should be a non-empty dict
689     if not node_result or not isinstance(node_result, dict):
690       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
691       return True
692
693     # compares ganeti version
694     local_version = constants.PROTOCOL_VERSION
695     remote_version = node_result.get('version', None)
696     if not (remote_version and isinstance(remote_version, (list, tuple)) and
697             len(remote_version) == 2):
698       feedback_fn("  - ERROR: connection to %s failed" % (node))
699       return True
700
701     if local_version != remote_version[0]:
702       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
703                   " node %s %s" % (local_version, node, remote_version[0]))
704       return True
705
706     # node seems compatible, we can actually try to look into its results
707
708     bad = False
709
710     # full package version
711     if constants.RELEASE_VERSION != remote_version[1]:
712       feedback_fn("  - WARNING: software version mismatch: master %s,"
713                   " node %s %s" %
714                   (constants.RELEASE_VERSION, node, remote_version[1]))
715
716     # checks vg existence and size > 20G
717     if vg_name is not None:
718       vglist = node_result.get(constants.NV_VGLIST, None)
719       if not vglist:
720         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
721                         (node,))
722         bad = True
723       else:
724         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
725                                               constants.MIN_VG_SIZE)
726         if vgstatus:
727           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
728           bad = True
729
730     # checks config file checksum
731
732     remote_cksum = node_result.get(constants.NV_FILELIST, None)
733     if not isinstance(remote_cksum, dict):
734       bad = True
735       feedback_fn("  - ERROR: node hasn't returned file checksum data")
736     else:
737       for file_name in file_list:
738         node_is_mc = nodeinfo.master_candidate
739         must_have_file = file_name not in master_files
740         if file_name not in remote_cksum:
741           if node_is_mc or must_have_file:
742             bad = True
743             feedback_fn("  - ERROR: file '%s' missing" % file_name)
744         elif remote_cksum[file_name] != local_cksum[file_name]:
745           if node_is_mc or must_have_file:
746             bad = True
747             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
748           else:
749             # not candidate and this is not a must-have file
750             bad = True
751             feedback_fn("  - ERROR: file '%s' should not exist on non master"
752                         " candidates (and the file is outdated)" % file_name)
753         else:
754           # all good, except non-master/non-must have combination
755           if not node_is_mc and not must_have_file:
756             feedback_fn("  - ERROR: file '%s' should not exist on non master"
757                         " candidates" % file_name)
758
759     # checks ssh to any
760
761     if constants.NV_NODELIST not in node_result:
762       bad = True
763       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
764     else:
765       if node_result[constants.NV_NODELIST]:
766         bad = True
767         for node in node_result[constants.NV_NODELIST]:
768           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
769                           (node, node_result[constants.NV_NODELIST][node]))
770
771     if constants.NV_NODENETTEST not in node_result:
772       bad = True
773       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
774     else:
775       if node_result[constants.NV_NODENETTEST]:
776         bad = True
777         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
778         for node in nlist:
779           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
780                           (node, node_result[constants.NV_NODENETTEST][node]))
781
782     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
783     if isinstance(hyp_result, dict):
784       for hv_name, hv_result in hyp_result.iteritems():
785         if hv_result is not None:
786           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
787                       (hv_name, hv_result))
788
789     # check used drbd list
790     if vg_name is not None:
791       used_minors = node_result.get(constants.NV_DRBDLIST, [])
792       if not isinstance(used_minors, (tuple, list)):
793         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
794                     str(used_minors))
795       else:
796         for minor, (iname, must_exist) in drbd_map.items():
797           if minor not in used_minors and must_exist:
798             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
799                         " not active" % (minor, iname))
800             bad = True
801         for minor in used_minors:
802           if minor not in drbd_map:
803             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
804                         minor)
805             bad = True
806
807     return bad
808
809   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
810                       node_instance, feedback_fn, n_offline):
811     """Verify an instance.
812
813     This function checks to see if the required block devices are
814     available on the instance's node.
815
816     """
817     bad = False
818
819     node_current = instanceconfig.primary_node
820
821     node_vol_should = {}
822     instanceconfig.MapLVsByNode(node_vol_should)
823
824     for node in node_vol_should:
825       if node in n_offline:
826         # ignore missing volumes on offline nodes
827         continue
828       for volume in node_vol_should[node]:
829         if node not in node_vol_is or volume not in node_vol_is[node]:
830           feedback_fn("  - ERROR: volume %s missing on node %s" %
831                           (volume, node))
832           bad = True
833
834     if instanceconfig.admin_up:
835       if ((node_current not in node_instance or
836           not instance in node_instance[node_current]) and
837           node_current not in n_offline):
838         feedback_fn("  - ERROR: instance %s not running on node %s" %
839                         (instance, node_current))
840         bad = True
841
842     for node in node_instance:
843       if (not node == node_current):
844         if instance in node_instance[node]:
845           feedback_fn("  - ERROR: instance %s should not run on node %s" %
846                           (instance, node))
847           bad = True
848
849     return bad
850
851   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
852     """Verify if there are any unknown volumes in the cluster.
853
854     The .os, .swap and backup volumes are ignored. All other volumes are
855     reported as unknown.
856
857     """
858     bad = False
859
860     for node in node_vol_is:
861       for volume in node_vol_is[node]:
862         if node not in node_vol_should or volume not in node_vol_should[node]:
863           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
864                       (volume, node))
865           bad = True
866     return bad
867
868   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
869     """Verify the list of running instances.
870
871     This checks what instances are running but unknown to the cluster.
872
873     """
874     bad = False
875     for node in node_instance:
876       for runninginstance in node_instance[node]:
877         if runninginstance not in instancelist:
878           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
879                           (runninginstance, node))
880           bad = True
881     return bad
882
883   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
884     """Verify N+1 Memory Resilience.
885
886     Check that if one single node dies we can still start all the instances it
887     was primary for.
888
889     """
890     bad = False
891
892     for node, nodeinfo in node_info.iteritems():
893       # This code checks that every node which is now listed as secondary has
894       # enough memory to host all instances it is supposed to should a single
895       # other node in the cluster fail.
896       # FIXME: not ready for failover to an arbitrary node
897       # FIXME: does not support file-backed instances
898       # WARNING: we currently take into account down instances as well as up
899       # ones, considering that even if they're down someone might want to start
900       # them even in the event of a node failure.
901       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
902         needed_mem = 0
903         for instance in instances:
904           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
905           if bep[constants.BE_AUTO_BALANCE]:
906             needed_mem += bep[constants.BE_MEMORY]
907         if nodeinfo['mfree'] < needed_mem:
908           feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
909                       " failovers should node %s fail" % (node, prinode))
910           bad = True
911     return bad
912
913   def CheckPrereq(self):
914     """Check prerequisites.
915
916     Transform the list of checks we're going to skip into a set and check that
917     all its members are valid.
918
919     """
920     self.skip_set = frozenset(self.op.skip_checks)
921     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
922       raise errors.OpPrereqError("Invalid checks to be skipped specified")
923
924   def BuildHooksEnv(self):
925     """Build hooks env.
926
927     Cluster-Verify hooks just ran in the post phase and their failure makes
928     the output be logged in the verify output and the verification to fail.
929
930     """
931     all_nodes = self.cfg.GetNodeList()
932     env = {
933       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
934       }
935     for node in self.cfg.GetAllNodesInfo().values():
936       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
937
938     return env, [], all_nodes
939
940   def Exec(self, feedback_fn):
941     """Verify integrity of cluster, performing various test on nodes.
942
943     """
944     bad = False
945     feedback_fn("* Verifying global settings")
946     for msg in self.cfg.VerifyConfig():
947       feedback_fn("  - ERROR: %s" % msg)
948
949     vg_name = self.cfg.GetVGName()
950     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
951     nodelist = utils.NiceSort(self.cfg.GetNodeList())
952     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
953     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
954     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
955                         for iname in instancelist)
956     i_non_redundant = [] # Non redundant instances
957     i_non_a_balanced = [] # Non auto-balanced instances
958     n_offline = [] # List of offline nodes
959     n_drained = [] # List of nodes being drained
960     node_volume = {}
961     node_instance = {}
962     node_info = {}
963     instance_cfg = {}
964
965     # FIXME: verify OS list
966     # do local checksums
967     master_files = [constants.CLUSTER_CONF_FILE]
968
969     file_names = ssconf.SimpleStore().GetFileList()
970     file_names.append(constants.SSL_CERT_FILE)
971     file_names.append(constants.RAPI_CERT_FILE)
972     file_names.extend(master_files)
973
974     local_checksums = utils.FingerprintFiles(file_names)
975
976     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
977     node_verify_param = {
978       constants.NV_FILELIST: file_names,
979       constants.NV_NODELIST: [node.name for node in nodeinfo
980                               if not node.offline],
981       constants.NV_HYPERVISOR: hypervisors,
982       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
983                                   node.secondary_ip) for node in nodeinfo
984                                  if not node.offline],
985       constants.NV_INSTANCELIST: hypervisors,
986       constants.NV_VERSION: None,
987       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
988       }
989     if vg_name is not None:
990       node_verify_param[constants.NV_VGLIST] = None
991       node_verify_param[constants.NV_LVLIST] = vg_name
992       node_verify_param[constants.NV_DRBDLIST] = None
993     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
994                                            self.cfg.GetClusterName())
995
996     cluster = self.cfg.GetClusterInfo()
997     master_node = self.cfg.GetMasterNode()
998     all_drbd_map = self.cfg.ComputeDRBDMap()
999
1000     for node_i in nodeinfo:
1001       node = node_i.name
1002       nresult = all_nvinfo[node].data
1003
1004       if node_i.offline:
1005         feedback_fn("* Skipping offline node %s" % (node,))
1006         n_offline.append(node)
1007         continue
1008
1009       if node == master_node:
1010         ntype = "master"
1011       elif node_i.master_candidate:
1012         ntype = "master candidate"
1013       elif node_i.drained:
1014         ntype = "drained"
1015         n_drained.append(node)
1016       else:
1017         ntype = "regular"
1018       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1019
1020       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1021         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1022         bad = True
1023         continue
1024
1025       node_drbd = {}
1026       for minor, instance in all_drbd_map[node].items():
1027         if instance not in instanceinfo:
1028           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1029                       instance)
1030           # ghost instance should not be running, but otherwise we
1031           # don't give double warnings (both ghost instance and
1032           # unallocated minor in use)
1033           node_drbd[minor] = (instance, False)
1034         else:
1035           instance = instanceinfo[instance]
1036           node_drbd[minor] = (instance.name, instance.admin_up)
1037       result = self._VerifyNode(node_i, file_names, local_checksums,
1038                                 nresult, feedback_fn, master_files,
1039                                 node_drbd, vg_name)
1040       bad = bad or result
1041
1042       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1043       if vg_name is None:
1044         node_volume[node] = {}
1045       elif isinstance(lvdata, basestring):
1046         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1047                     (node, utils.SafeEncode(lvdata)))
1048         bad = True
1049         node_volume[node] = {}
1050       elif not isinstance(lvdata, dict):
1051         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1052         bad = True
1053         continue
1054       else:
1055         node_volume[node] = lvdata
1056
1057       # node_instance
1058       idata = nresult.get(constants.NV_INSTANCELIST, None)
1059       if not isinstance(idata, list):
1060         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1061                     (node,))
1062         bad = True
1063         continue
1064
1065       node_instance[node] = idata
1066
1067       # node_info
1068       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1069       if not isinstance(nodeinfo, dict):
1070         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1071         bad = True
1072         continue
1073
1074       try:
1075         node_info[node] = {
1076           "mfree": int(nodeinfo['memory_free']),
1077           "pinst": [],
1078           "sinst": [],
1079           # dictionary holding all instances this node is secondary for,
1080           # grouped by their primary node. Each key is a cluster node, and each
1081           # value is a list of instances which have the key as primary and the
1082           # current node as secondary.  this is handy to calculate N+1 memory
1083           # availability if you can only failover from a primary to its
1084           # secondary.
1085           "sinst-by-pnode": {},
1086         }
1087         # FIXME: devise a free space model for file based instances as well
1088         if vg_name is not None:
1089           if (constants.NV_VGLIST not in nresult or
1090               vg_name not in nresult[constants.NV_VGLIST]):
1091             feedback_fn("  - ERROR: node %s didn't return data for the"
1092                         " volume group '%s' - it is either missing or broken" %
1093                         (node, vg_name))
1094             bad = True
1095             continue
1096           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1097       except (ValueError, KeyError):
1098         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1099                     " from node %s" % (node,))
1100         bad = True
1101         continue
1102
1103     node_vol_should = {}
1104
1105     for instance in instancelist:
1106       feedback_fn("* Verifying instance %s" % instance)
1107       inst_config = instanceinfo[instance]
1108       result =  self._VerifyInstance(instance, inst_config, node_volume,
1109                                      node_instance, feedback_fn, n_offline)
1110       bad = bad or result
1111       inst_nodes_offline = []
1112
1113       inst_config.MapLVsByNode(node_vol_should)
1114
1115       instance_cfg[instance] = inst_config
1116
1117       pnode = inst_config.primary_node
1118       if pnode in node_info:
1119         node_info[pnode]['pinst'].append(instance)
1120       elif pnode not in n_offline:
1121         feedback_fn("  - ERROR: instance %s, connection to primary node"
1122                     " %s failed" % (instance, pnode))
1123         bad = True
1124
1125       if pnode in n_offline:
1126         inst_nodes_offline.append(pnode)
1127
1128       # If the instance is non-redundant we cannot survive losing its primary
1129       # node, so we are not N+1 compliant. On the other hand we have no disk
1130       # templates with more than one secondary so that situation is not well
1131       # supported either.
1132       # FIXME: does not support file-backed instances
1133       if len(inst_config.secondary_nodes) == 0:
1134         i_non_redundant.append(instance)
1135       elif len(inst_config.secondary_nodes) > 1:
1136         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1137                     % instance)
1138
1139       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1140         i_non_a_balanced.append(instance)
1141
1142       for snode in inst_config.secondary_nodes:
1143         if snode in node_info:
1144           node_info[snode]['sinst'].append(instance)
1145           if pnode not in node_info[snode]['sinst-by-pnode']:
1146             node_info[snode]['sinst-by-pnode'][pnode] = []
1147           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1148         elif snode not in n_offline:
1149           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1150                       " %s failed" % (instance, snode))
1151           bad = True
1152         if snode in n_offline:
1153           inst_nodes_offline.append(snode)
1154
1155       if inst_nodes_offline:
1156         # warn that the instance lives on offline nodes, and set bad=True
1157         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1158                     ", ".join(inst_nodes_offline))
1159         bad = True
1160
1161     feedback_fn("* Verifying orphan volumes")
1162     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1163                                        feedback_fn)
1164     bad = bad or result
1165
1166     feedback_fn("* Verifying remaining instances")
1167     result = self._VerifyOrphanInstances(instancelist, node_instance,
1168                                          feedback_fn)
1169     bad = bad or result
1170
1171     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1172       feedback_fn("* Verifying N+1 Memory redundancy")
1173       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1174       bad = bad or result
1175
1176     feedback_fn("* Other Notes")
1177     if i_non_redundant:
1178       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1179                   % len(i_non_redundant))
1180
1181     if i_non_a_balanced:
1182       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1183                   % len(i_non_a_balanced))
1184
1185     if n_offline:
1186       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1187
1188     if n_drained:
1189       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1190
1191     return not bad
1192
1193   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1194     """Analyze the post-hooks' result
1195
1196     This method analyses the hook result, handles it, and sends some
1197     nicely-formatted feedback back to the user.
1198
1199     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1200         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1201     @param hooks_results: the results of the multi-node hooks rpc call
1202     @param feedback_fn: function used send feedback back to the caller
1203     @param lu_result: previous Exec result
1204     @return: the new Exec result, based on the previous result
1205         and hook results
1206
1207     """
1208     # We only really run POST phase hooks, and are only interested in
1209     # their results
1210     if phase == constants.HOOKS_PHASE_POST:
1211       # Used to change hooks' output to proper indentation
1212       indent_re = re.compile('^', re.M)
1213       feedback_fn("* Hooks Results")
1214       if not hooks_results:
1215         feedback_fn("  - ERROR: general communication failure")
1216         lu_result = 1
1217       else:
1218         for node_name in hooks_results:
1219           show_node_header = True
1220           res = hooks_results[node_name]
1221           if res.failed or res.data is False or not isinstance(res.data, list):
1222             if res.offline:
1223               # no need to warn or set fail return value
1224               continue
1225             feedback_fn("    Communication failure in hooks execution")
1226             lu_result = 1
1227             continue
1228           for script, hkr, output in res.data:
1229             if hkr == constants.HKR_FAIL:
1230               # The node header is only shown once, if there are
1231               # failing hooks on that node
1232               if show_node_header:
1233                 feedback_fn("  Node %s:" % node_name)
1234                 show_node_header = False
1235               feedback_fn("    ERROR: Script %s failed, output:" % script)
1236               output = indent_re.sub('      ', output)
1237               feedback_fn("%s" % output)
1238               lu_result = 1
1239
1240       return lu_result
1241
1242
1243 class LUVerifyDisks(NoHooksLU):
1244   """Verifies the cluster disks status.
1245
1246   """
1247   _OP_REQP = []
1248   REQ_BGL = False
1249
1250   def ExpandNames(self):
1251     self.needed_locks = {
1252       locking.LEVEL_NODE: locking.ALL_SET,
1253       locking.LEVEL_INSTANCE: locking.ALL_SET,
1254     }
1255     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1256
1257   def CheckPrereq(self):
1258     """Check prerequisites.
1259
1260     This has no prerequisites.
1261
1262     """
1263     pass
1264
1265   def Exec(self, feedback_fn):
1266     """Verify integrity of cluster disks.
1267
1268     """
1269     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1270
1271     vg_name = self.cfg.GetVGName()
1272     nodes = utils.NiceSort(self.cfg.GetNodeList())
1273     instances = [self.cfg.GetInstanceInfo(name)
1274                  for name in self.cfg.GetInstanceList()]
1275
1276     nv_dict = {}
1277     for inst in instances:
1278       inst_lvs = {}
1279       if (not inst.admin_up or
1280           inst.disk_template not in constants.DTS_NET_MIRROR):
1281         continue
1282       inst.MapLVsByNode(inst_lvs)
1283       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1284       for node, vol_list in inst_lvs.iteritems():
1285         for vol in vol_list:
1286           nv_dict[(node, vol)] = inst
1287
1288     if not nv_dict:
1289       return result
1290
1291     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1292
1293     for node in nodes:
1294       # node_volume
1295       lvs = node_lvs[node]
1296       if lvs.failed:
1297         if not lvs.offline:
1298           self.LogWarning("Connection to node %s failed: %s" %
1299                           (node, lvs.data))
1300         continue
1301       lvs = lvs.data
1302       if isinstance(lvs, basestring):
1303         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1304         res_nlvm[node] = lvs
1305         continue
1306       elif not isinstance(lvs, dict):
1307         logging.warning("Connection to node %s failed or invalid data"
1308                         " returned", node)
1309         res_nodes.append(node)
1310         continue
1311
1312       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1313         inst = nv_dict.pop((node, lv_name), None)
1314         if (not lv_online and inst is not None
1315             and inst.name not in res_instances):
1316           res_instances.append(inst.name)
1317
1318     # any leftover items in nv_dict are missing LVs, let's arrange the
1319     # data better
1320     for key, inst in nv_dict.iteritems():
1321       if inst.name not in res_missing:
1322         res_missing[inst.name] = []
1323       res_missing[inst.name].append(key)
1324
1325     return result
1326
1327
1328 class LURepairDiskSizes(NoHooksLU):
1329   """Verifies the cluster disks sizes.
1330
1331   """
1332   _OP_REQP = ["instances"]
1333   REQ_BGL = False
1334
1335   def ExpandNames(self):
1336
1337     if not isinstance(self.op.instances, list):
1338       raise errors.OpPrereqError("Invalid argument type 'instances'")
1339
1340     if self.op.instances:
1341       self.wanted_names = []
1342       for name in self.op.instances:
1343         full_name = self.cfg.ExpandInstanceName(name)
1344         if full_name is None:
1345           raise errors.OpPrereqError("Instance '%s' not known" % name)
1346         self.wanted_names.append(full_name)
1347       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
1348       self.needed_locks = {
1349         locking.LEVEL_NODE: [],
1350         locking.LEVEL_INSTANCE: self.wanted_names,
1351         }
1352       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1353     else:
1354       self.wanted_names = None
1355       self.needed_locks = {
1356         locking.LEVEL_NODE: locking.ALL_SET,
1357         locking.LEVEL_INSTANCE: locking.ALL_SET,
1358         }
1359     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1360
1361   def DeclareLocks(self, level):
1362     if level == locking.LEVEL_NODE and self.wanted_names is not None:
1363       self._LockInstancesNodes(primary_only=True)
1364
1365   def CheckPrereq(self):
1366     """Check prerequisites.
1367
1368     This only checks the optional instance list against the existing names.
1369
1370     """
1371     if self.wanted_names is None:
1372       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1373
1374     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1375                              in self.wanted_names]
1376
1377   def Exec(self, feedback_fn):
1378     """Verify the size of cluster disks.
1379
1380     """
1381     # TODO: check child disks too
1382     # TODO: check differences in size between primary/secondary nodes
1383     per_node_disks = {}
1384     for instance in self.wanted_instances:
1385       pnode = instance.primary_node
1386       if pnode not in per_node_disks:
1387         per_node_disks[pnode] = []
1388       for idx, disk in enumerate(instance.disks):
1389         per_node_disks[pnode].append((instance, idx, disk))
1390
1391     changed = []
1392     for node, dskl in per_node_disks.items():
1393       result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl])
1394       if result.failed:
1395         self.LogWarning("Failure in blockdev_getsizes call to node"
1396                         " %s, ignoring", node)
1397         continue
1398       if len(result.data) != len(dskl):
1399         self.LogWarning("Invalid result from node %s, ignoring node results",
1400                         node)
1401         continue
1402       for ((instance, idx, disk), size) in zip(dskl, result.data):
1403         if size is None:
1404           self.LogWarning("Disk %d of instance %s did not return size"
1405                           " information, ignoring", idx, instance.name)
1406           continue
1407         if not isinstance(size, (int, long)):
1408           self.LogWarning("Disk %d of instance %s did not return valid"
1409                           " size information, ignoring", idx, instance.name)
1410           continue
1411         size = size >> 20
1412         if size != disk.size:
1413           self.LogInfo("Disk %d of instance %s has mismatched size,"
1414                        " correcting: recorded %d, actual %d", idx,
1415                        instance.name, disk.size, size)
1416           disk.size = size
1417           self.cfg.Update(instance)
1418           changed.append((instance.name, idx, size))
1419     return changed
1420
1421
1422 class LURenameCluster(LogicalUnit):
1423   """Rename the cluster.
1424
1425   """
1426   HPATH = "cluster-rename"
1427   HTYPE = constants.HTYPE_CLUSTER
1428   _OP_REQP = ["name"]
1429
1430   def BuildHooksEnv(self):
1431     """Build hooks env.
1432
1433     """
1434     env = {
1435       "OP_TARGET": self.cfg.GetClusterName(),
1436       "NEW_NAME": self.op.name,
1437       }
1438     mn = self.cfg.GetMasterNode()
1439     return env, [mn], [mn]
1440
1441   def CheckPrereq(self):
1442     """Verify that the passed name is a valid one.
1443
1444     """
1445     hostname = utils.HostInfo(self.op.name)
1446
1447     new_name = hostname.name
1448     self.ip = new_ip = hostname.ip
1449     old_name = self.cfg.GetClusterName()
1450     old_ip = self.cfg.GetMasterIP()
1451     if new_name == old_name and new_ip == old_ip:
1452       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1453                                  " cluster has changed")
1454     if new_ip != old_ip:
1455       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1456         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1457                                    " reachable on the network. Aborting." %
1458                                    new_ip)
1459
1460     self.op.name = new_name
1461
1462   def Exec(self, feedback_fn):
1463     """Rename the cluster.
1464
1465     """
1466     clustername = self.op.name
1467     ip = self.ip
1468
1469     # shutdown the master IP
1470     master = self.cfg.GetMasterNode()
1471     result = self.rpc.call_node_stop_master(master, False)
1472     if result.failed or not result.data:
1473       raise errors.OpExecError("Could not disable the master role")
1474
1475     try:
1476       cluster = self.cfg.GetClusterInfo()
1477       cluster.cluster_name = clustername
1478       cluster.master_ip = ip
1479       self.cfg.Update(cluster)
1480
1481       # update the known hosts file
1482       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1483       node_list = self.cfg.GetNodeList()
1484       try:
1485         node_list.remove(master)
1486       except ValueError:
1487         pass
1488       result = self.rpc.call_upload_file(node_list,
1489                                          constants.SSH_KNOWN_HOSTS_FILE)
1490       for to_node, to_result in result.iteritems():
1491         if to_result.failed or not to_result.data:
1492           logging.error("Copy of file %s to node %s failed",
1493                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1494
1495     finally:
1496       result = self.rpc.call_node_start_master(master, False, False)
1497       if result.failed or not result.data:
1498         self.LogWarning("Could not re-enable the master role on"
1499                         " the master, please restart manually.")
1500
1501
1502 def _RecursiveCheckIfLVMBased(disk):
1503   """Check if the given disk or its children are lvm-based.
1504
1505   @type disk: L{objects.Disk}
1506   @param disk: the disk to check
1507   @rtype: boolean
1508   @return: boolean indicating whether a LD_LV dev_type was found or not
1509
1510   """
1511   if disk.children:
1512     for chdisk in disk.children:
1513       if _RecursiveCheckIfLVMBased(chdisk):
1514         return True
1515   return disk.dev_type == constants.LD_LV
1516
1517
1518 class LUSetClusterParams(LogicalUnit):
1519   """Change the parameters of the cluster.
1520
1521   """
1522   HPATH = "cluster-modify"
1523   HTYPE = constants.HTYPE_CLUSTER
1524   _OP_REQP = []
1525   REQ_BGL = False
1526
1527   def CheckArguments(self):
1528     """Check parameters
1529
1530     """
1531     if not hasattr(self.op, "candidate_pool_size"):
1532       self.op.candidate_pool_size = None
1533     if self.op.candidate_pool_size is not None:
1534       try:
1535         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1536       except (ValueError, TypeError), err:
1537         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1538                                    str(err))
1539       if self.op.candidate_pool_size < 1:
1540         raise errors.OpPrereqError("At least one master candidate needed")
1541
1542   def ExpandNames(self):
1543     # FIXME: in the future maybe other cluster params won't require checking on
1544     # all nodes to be modified.
1545     self.needed_locks = {
1546       locking.LEVEL_NODE: locking.ALL_SET,
1547     }
1548     self.share_locks[locking.LEVEL_NODE] = 1
1549
1550   def BuildHooksEnv(self):
1551     """Build hooks env.
1552
1553     """
1554     env = {
1555       "OP_TARGET": self.cfg.GetClusterName(),
1556       "NEW_VG_NAME": self.op.vg_name,
1557       }
1558     mn = self.cfg.GetMasterNode()
1559     return env, [mn], [mn]
1560
1561   def CheckPrereq(self):
1562     """Check prerequisites.
1563
1564     This checks whether the given params don't conflict and
1565     if the given volume group is valid.
1566
1567     """
1568     if self.op.vg_name is not None and not self.op.vg_name:
1569       instances = self.cfg.GetAllInstancesInfo().values()
1570       for inst in instances:
1571         for disk in inst.disks:
1572           if _RecursiveCheckIfLVMBased(disk):
1573             raise errors.OpPrereqError("Cannot disable lvm storage while"
1574                                        " lvm-based instances exist")
1575
1576     node_list = self.acquired_locks[locking.LEVEL_NODE]
1577
1578     # if vg_name not None, checks given volume group on all nodes
1579     if self.op.vg_name:
1580       vglist = self.rpc.call_vg_list(node_list)
1581       for node in node_list:
1582         if vglist[node].failed:
1583           # ignoring down node
1584           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1585           continue
1586         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1587                                               self.op.vg_name,
1588                                               constants.MIN_VG_SIZE)
1589         if vgstatus:
1590           raise errors.OpPrereqError("Error on node '%s': %s" %
1591                                      (node, vgstatus))
1592
1593     self.cluster = cluster = self.cfg.GetClusterInfo()
1594     # validate beparams changes
1595     if self.op.beparams:
1596       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1597       self.new_beparams = cluster.FillDict(
1598         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1599
1600     # hypervisor list/parameters
1601     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1602     if self.op.hvparams:
1603       if not isinstance(self.op.hvparams, dict):
1604         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1605       for hv_name, hv_dict in self.op.hvparams.items():
1606         if hv_name not in self.new_hvparams:
1607           self.new_hvparams[hv_name] = hv_dict
1608         else:
1609           self.new_hvparams[hv_name].update(hv_dict)
1610
1611     if self.op.enabled_hypervisors is not None:
1612       self.hv_list = self.op.enabled_hypervisors
1613       if not self.hv_list:
1614         raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1615                                    " least one member")
1616       invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1617       if invalid_hvs:
1618         raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1619                                    " entries: %s" % invalid_hvs)
1620     else:
1621       self.hv_list = cluster.enabled_hypervisors
1622
1623     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1624       # either the enabled list has changed, or the parameters have, validate
1625       for hv_name, hv_params in self.new_hvparams.items():
1626         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1627             (self.op.enabled_hypervisors and
1628              hv_name in self.op.enabled_hypervisors)):
1629           # either this is a new hypervisor, or its parameters have changed
1630           hv_class = hypervisor.GetHypervisor(hv_name)
1631           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1632           hv_class.CheckParameterSyntax(hv_params)
1633           _CheckHVParams(self, node_list, hv_name, hv_params)
1634
1635   def Exec(self, feedback_fn):
1636     """Change the parameters of the cluster.
1637
1638     """
1639     if self.op.vg_name is not None:
1640       new_volume = self.op.vg_name
1641       if not new_volume:
1642         new_volume = None
1643       if new_volume != self.cfg.GetVGName():
1644         self.cfg.SetVGName(new_volume)
1645       else:
1646         feedback_fn("Cluster LVM configuration already in desired"
1647                     " state, not changing")
1648     if self.op.hvparams:
1649       self.cluster.hvparams = self.new_hvparams
1650     if self.op.enabled_hypervisors is not None:
1651       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1652     if self.op.beparams:
1653       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1654     if self.op.candidate_pool_size is not None:
1655       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1656       # we need to update the pool size here, otherwise the save will fail
1657       _AdjustCandidatePool(self)
1658
1659     self.cfg.Update(self.cluster)
1660
1661
1662 class LURedistributeConfig(NoHooksLU):
1663   """Force the redistribution of cluster configuration.
1664
1665   This is a very simple LU.
1666
1667   """
1668   _OP_REQP = []
1669   REQ_BGL = False
1670
1671   def ExpandNames(self):
1672     self.needed_locks = {
1673       locking.LEVEL_NODE: locking.ALL_SET,
1674     }
1675     self.share_locks[locking.LEVEL_NODE] = 1
1676
1677   def CheckPrereq(self):
1678     """Check prerequisites.
1679
1680     """
1681
1682   def Exec(self, feedback_fn):
1683     """Redistribute the configuration.
1684
1685     """
1686     self.cfg.Update(self.cfg.GetClusterInfo())
1687
1688
1689 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1690   """Sleep and poll for an instance's disk to sync.
1691
1692   """
1693   if not instance.disks:
1694     return True
1695
1696   if not oneshot:
1697     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1698
1699   node = instance.primary_node
1700
1701   for dev in instance.disks:
1702     lu.cfg.SetDiskID(dev, node)
1703
1704   retries = 0
1705   degr_retries = 10 # in seconds, as we sleep 1 second each time
1706   while True:
1707     max_time = 0
1708     done = True
1709     cumul_degraded = False
1710     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1711     if rstats.failed or not rstats.data:
1712       lu.LogWarning("Can't get any data from node %s", node)
1713       retries += 1
1714       if retries >= 10:
1715         raise errors.RemoteError("Can't contact node %s for mirror data,"
1716                                  " aborting." % node)
1717       time.sleep(6)
1718       continue
1719     rstats = rstats.data
1720     retries = 0
1721     for i, mstat in enumerate(rstats):
1722       if mstat is None:
1723         lu.LogWarning("Can't compute data for node %s/%s",
1724                            node, instance.disks[i].iv_name)
1725         continue
1726       # we ignore the ldisk parameter
1727       perc_done, est_time, is_degraded, _ = mstat
1728       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1729       if perc_done is not None:
1730         done = False
1731         if est_time is not None:
1732           rem_time = "%d estimated seconds remaining" % est_time
1733           max_time = est_time
1734         else:
1735           rem_time = "no time estimate"
1736         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1737                         (instance.disks[i].iv_name, perc_done, rem_time))
1738
1739     # if we're done but degraded, let's do a few small retries, to
1740     # make sure we see a stable and not transient situation; therefore
1741     # we force restart of the loop
1742     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1743       logging.info("Degraded disks found, %d retries left", degr_retries)
1744       degr_retries -= 1
1745       time.sleep(1)
1746       continue
1747
1748     if done or oneshot:
1749       break
1750
1751     time.sleep(min(60, max_time))
1752
1753   if done:
1754     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1755   return not cumul_degraded
1756
1757
1758 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1759   """Check that mirrors are not degraded.
1760
1761   The ldisk parameter, if True, will change the test from the
1762   is_degraded attribute (which represents overall non-ok status for
1763   the device(s)) to the ldisk (representing the local storage status).
1764
1765   """
1766   lu.cfg.SetDiskID(dev, node)
1767   if ldisk:
1768     idx = 6
1769   else:
1770     idx = 5
1771
1772   result = True
1773   if on_primary or dev.AssembleOnSecondary():
1774     rstats = lu.rpc.call_blockdev_find(node, dev)
1775     msg = rstats.RemoteFailMsg()
1776     if msg:
1777       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1778       result = False
1779     elif not rstats.payload:
1780       lu.LogWarning("Can't find disk on node %s", node)
1781       result = False
1782     else:
1783       result = result and (not rstats.payload[idx])
1784   if dev.children:
1785     for child in dev.children:
1786       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1787
1788   return result
1789
1790
1791 class LUDiagnoseOS(NoHooksLU):
1792   """Logical unit for OS diagnose/query.
1793
1794   """
1795   _OP_REQP = ["output_fields", "names"]
1796   REQ_BGL = False
1797   _FIELDS_STATIC = utils.FieldSet()
1798   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1799
1800   def ExpandNames(self):
1801     if self.op.names:
1802       raise errors.OpPrereqError("Selective OS query not supported")
1803
1804     _CheckOutputFields(static=self._FIELDS_STATIC,
1805                        dynamic=self._FIELDS_DYNAMIC,
1806                        selected=self.op.output_fields)
1807
1808     # Lock all nodes, in shared mode
1809     # Temporary removal of locks, should be reverted later
1810     # TODO: reintroduce locks when they are lighter-weight
1811     self.needed_locks = {}
1812     #self.share_locks[locking.LEVEL_NODE] = 1
1813     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1814
1815   def CheckPrereq(self):
1816     """Check prerequisites.
1817
1818     """
1819
1820   @staticmethod
1821   def _DiagnoseByOS(node_list, rlist):
1822     """Remaps a per-node return list into an a per-os per-node dictionary
1823
1824     @param node_list: a list with the names of all nodes
1825     @param rlist: a map with node names as keys and OS objects as values
1826
1827     @rtype: dict
1828     @return: a dictionary with osnames as keys and as value another map, with
1829         nodes as keys and list of OS objects as values, eg::
1830
1831           {"debian-etch": {"node1": [<object>,...],
1832                            "node2": [<object>,]}
1833           }
1834
1835     """
1836     all_os = {}
1837     # we build here the list of nodes that didn't fail the RPC (at RPC
1838     # level), so that nodes with a non-responding node daemon don't
1839     # make all OSes invalid
1840     good_nodes = [node_name for node_name in rlist
1841                   if not rlist[node_name].failed]
1842     for node_name, nr in rlist.iteritems():
1843       if nr.failed or not nr.data:
1844         continue
1845       for os_obj in nr.data:
1846         if os_obj.name not in all_os:
1847           # build a list of nodes for this os containing empty lists
1848           # for each node in node_list
1849           all_os[os_obj.name] = {}
1850           for nname in good_nodes:
1851             all_os[os_obj.name][nname] = []
1852         all_os[os_obj.name][node_name].append(os_obj)
1853     return all_os
1854
1855   def Exec(self, feedback_fn):
1856     """Compute the list of OSes.
1857
1858     """
1859     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1860     node_data = self.rpc.call_os_diagnose(valid_nodes)
1861     if node_data == False:
1862       raise errors.OpExecError("Can't gather the list of OSes")
1863     pol = self._DiagnoseByOS(valid_nodes, node_data)
1864     output = []
1865     for os_name, os_data in pol.iteritems():
1866       row = []
1867       for field in self.op.output_fields:
1868         if field == "name":
1869           val = os_name
1870         elif field == "valid":
1871           val = utils.all([osl and osl[0] for osl in os_data.values()])
1872         elif field == "node_status":
1873           val = {}
1874           for node_name, nos_list in os_data.iteritems():
1875             val[node_name] = [(v.status, v.path) for v in nos_list]
1876         else:
1877           raise errors.ParameterError(field)
1878         row.append(val)
1879       output.append(row)
1880
1881     return output
1882
1883
1884 class LURemoveNode(LogicalUnit):
1885   """Logical unit for removing a node.
1886
1887   """
1888   HPATH = "node-remove"
1889   HTYPE = constants.HTYPE_NODE
1890   _OP_REQP = ["node_name"]
1891
1892   def BuildHooksEnv(self):
1893     """Build hooks env.
1894
1895     This doesn't run on the target node in the pre phase as a failed
1896     node would then be impossible to remove.
1897
1898     """
1899     env = {
1900       "OP_TARGET": self.op.node_name,
1901       "NODE_NAME": self.op.node_name,
1902       }
1903     all_nodes = self.cfg.GetNodeList()
1904     all_nodes.remove(self.op.node_name)
1905     return env, all_nodes, all_nodes
1906
1907   def CheckPrereq(self):
1908     """Check prerequisites.
1909
1910     This checks:
1911      - the node exists in the configuration
1912      - it does not have primary or secondary instances
1913      - it's not the master
1914
1915     Any errors are signaled by raising errors.OpPrereqError.
1916
1917     """
1918     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1919     if node is None:
1920       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1921
1922     instance_list = self.cfg.GetInstanceList()
1923
1924     masternode = self.cfg.GetMasterNode()
1925     if node.name == masternode:
1926       raise errors.OpPrereqError("Node is the master node,"
1927                                  " you need to failover first.")
1928
1929     for instance_name in instance_list:
1930       instance = self.cfg.GetInstanceInfo(instance_name)
1931       if node.name in instance.all_nodes:
1932         raise errors.OpPrereqError("Instance %s is still running on the node,"
1933                                    " please remove first." % instance_name)
1934     self.op.node_name = node.name
1935     self.node = node
1936
1937   def Exec(self, feedback_fn):
1938     """Removes the node from the cluster.
1939
1940     """
1941     node = self.node
1942     logging.info("Stopping the node daemon and removing configs from node %s",
1943                  node.name)
1944
1945     self.context.RemoveNode(node.name)
1946
1947     self.rpc.call_node_leave_cluster(node.name)
1948
1949     # Promote nodes to master candidate as needed
1950     _AdjustCandidatePool(self)
1951
1952
1953 class LUQueryNodes(NoHooksLU):
1954   """Logical unit for querying nodes.
1955
1956   """
1957   _OP_REQP = ["output_fields", "names", "use_locking"]
1958   REQ_BGL = False
1959   _FIELDS_DYNAMIC = utils.FieldSet(
1960     "dtotal", "dfree",
1961     "mtotal", "mnode", "mfree",
1962     "bootid",
1963     "ctotal", "cnodes", "csockets",
1964     )
1965
1966   _FIELDS_STATIC = utils.FieldSet(
1967     "name", "pinst_cnt", "sinst_cnt",
1968     "pinst_list", "sinst_list",
1969     "pip", "sip", "tags",
1970     "serial_no",
1971     "master_candidate",
1972     "master",
1973     "offline",
1974     "drained",
1975     "role",
1976     )
1977
1978   def ExpandNames(self):
1979     _CheckOutputFields(static=self._FIELDS_STATIC,
1980                        dynamic=self._FIELDS_DYNAMIC,
1981                        selected=self.op.output_fields)
1982
1983     self.needed_locks = {}
1984     self.share_locks[locking.LEVEL_NODE] = 1
1985
1986     if self.op.names:
1987       self.wanted = _GetWantedNodes(self, self.op.names)
1988     else:
1989       self.wanted = locking.ALL_SET
1990
1991     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1992     self.do_locking = self.do_node_query and self.op.use_locking
1993     if self.do_locking:
1994       # if we don't request only static fields, we need to lock the nodes
1995       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1996
1997
1998   def CheckPrereq(self):
1999     """Check prerequisites.
2000
2001     """
2002     # The validation of the node list is done in the _GetWantedNodes,
2003     # if non empty, and if empty, there's no validation to do
2004     pass
2005
2006   def Exec(self, feedback_fn):
2007     """Computes the list of nodes and their attributes.
2008
2009     """
2010     all_info = self.cfg.GetAllNodesInfo()
2011     if self.do_locking:
2012       nodenames = self.acquired_locks[locking.LEVEL_NODE]
2013     elif self.wanted != locking.ALL_SET:
2014       nodenames = self.wanted
2015       missing = set(nodenames).difference(all_info.keys())
2016       if missing:
2017         raise errors.OpExecError(
2018           "Some nodes were removed before retrieving their data: %s" % missing)
2019     else:
2020       nodenames = all_info.keys()
2021
2022     nodenames = utils.NiceSort(nodenames)
2023     nodelist = [all_info[name] for name in nodenames]
2024
2025     # begin data gathering
2026
2027     if self.do_node_query:
2028       live_data = {}
2029       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2030                                           self.cfg.GetHypervisorType())
2031       for name in nodenames:
2032         nodeinfo = node_data[name]
2033         if not nodeinfo.failed and nodeinfo.data:
2034           nodeinfo = nodeinfo.data
2035           fn = utils.TryConvert
2036           live_data[name] = {
2037             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2038             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2039             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2040             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2041             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2042             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2043             "bootid": nodeinfo.get('bootid', None),
2044             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2045             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2046             }
2047         else:
2048           live_data[name] = {}
2049     else:
2050       live_data = dict.fromkeys(nodenames, {})
2051
2052     node_to_primary = dict([(name, set()) for name in nodenames])
2053     node_to_secondary = dict([(name, set()) for name in nodenames])
2054
2055     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2056                              "sinst_cnt", "sinst_list"))
2057     if inst_fields & frozenset(self.op.output_fields):
2058       instancelist = self.cfg.GetInstanceList()
2059
2060       for instance_name in instancelist:
2061         inst = self.cfg.GetInstanceInfo(instance_name)
2062         if inst.primary_node in node_to_primary:
2063           node_to_primary[inst.primary_node].add(inst.name)
2064         for secnode in inst.secondary_nodes:
2065           if secnode in node_to_secondary:
2066             node_to_secondary[secnode].add(inst.name)
2067
2068     master_node = self.cfg.GetMasterNode()
2069
2070     # end data gathering
2071
2072     output = []
2073     for node in nodelist:
2074       node_output = []
2075       for field in self.op.output_fields:
2076         if field == "name":
2077           val = node.name
2078         elif field == "pinst_list":
2079           val = list(node_to_primary[node.name])
2080         elif field == "sinst_list":
2081           val = list(node_to_secondary[node.name])
2082         elif field == "pinst_cnt":
2083           val = len(node_to_primary[node.name])
2084         elif field == "sinst_cnt":
2085           val = len(node_to_secondary[node.name])
2086         elif field == "pip":
2087           val = node.primary_ip
2088         elif field == "sip":
2089           val = node.secondary_ip
2090         elif field == "tags":
2091           val = list(node.GetTags())
2092         elif field == "serial_no":
2093           val = node.serial_no
2094         elif field == "master_candidate":
2095           val = node.master_candidate
2096         elif field == "master":
2097           val = node.name == master_node
2098         elif field == "offline":
2099           val = node.offline
2100         elif field == "drained":
2101           val = node.drained
2102         elif self._FIELDS_DYNAMIC.Matches(field):
2103           val = live_data[node.name].get(field, None)
2104         elif field == "role":
2105           if node.name == master_node:
2106             val = "M"
2107           elif node.master_candidate:
2108             val = "C"
2109           elif node.drained:
2110             val = "D"
2111           elif node.offline:
2112             val = "O"
2113           else:
2114             val = "R"
2115         else:
2116           raise errors.ParameterError(field)
2117         node_output.append(val)
2118       output.append(node_output)
2119
2120     return output
2121
2122
2123 class LUQueryNodeVolumes(NoHooksLU):
2124   """Logical unit for getting volumes on node(s).
2125
2126   """
2127   _OP_REQP = ["nodes", "output_fields"]
2128   REQ_BGL = False
2129   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2130   _FIELDS_STATIC = utils.FieldSet("node")
2131
2132   def ExpandNames(self):
2133     _CheckOutputFields(static=self._FIELDS_STATIC,
2134                        dynamic=self._FIELDS_DYNAMIC,
2135                        selected=self.op.output_fields)
2136
2137     self.needed_locks = {}
2138     self.share_locks[locking.LEVEL_NODE] = 1
2139     if not self.op.nodes:
2140       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2141     else:
2142       self.needed_locks[locking.LEVEL_NODE] = \
2143         _GetWantedNodes(self, self.op.nodes)
2144
2145   def CheckPrereq(self):
2146     """Check prerequisites.
2147
2148     This checks that the fields required are valid output fields.
2149
2150     """
2151     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2152
2153   def Exec(self, feedback_fn):
2154     """Computes the list of nodes and their attributes.
2155
2156     """
2157     nodenames = self.nodes
2158     volumes = self.rpc.call_node_volumes(nodenames)
2159
2160     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2161              in self.cfg.GetInstanceList()]
2162
2163     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2164
2165     output = []
2166     for node in nodenames:
2167       if node not in volumes or volumes[node].failed or not volumes[node].data:
2168         continue
2169
2170       node_vols = volumes[node].data[:]
2171       node_vols.sort(key=lambda vol: vol['dev'])
2172
2173       for vol in node_vols:
2174         node_output = []
2175         for field in self.op.output_fields:
2176           if field == "node":
2177             val = node
2178           elif field == "phys":
2179             val = vol['dev']
2180           elif field == "vg":
2181             val = vol['vg']
2182           elif field == "name":
2183             val = vol['name']
2184           elif field == "size":
2185             val = int(float(vol['size']))
2186           elif field == "instance":
2187             for inst in ilist:
2188               if node not in lv_by_node[inst]:
2189                 continue
2190               if vol['name'] in lv_by_node[inst][node]:
2191                 val = inst.name
2192                 break
2193             else:
2194               val = '-'
2195           else:
2196             raise errors.ParameterError(field)
2197           node_output.append(str(val))
2198
2199         output.append(node_output)
2200
2201     return output
2202
2203
2204 class LUAddNode(LogicalUnit):
2205   """Logical unit for adding node to the cluster.
2206
2207   """
2208   HPATH = "node-add"
2209   HTYPE = constants.HTYPE_NODE
2210   _OP_REQP = ["node_name"]
2211
2212   def BuildHooksEnv(self):
2213     """Build hooks env.
2214
2215     This will run on all nodes before, and on all nodes + the new node after.
2216
2217     """
2218     env = {
2219       "OP_TARGET": self.op.node_name,
2220       "NODE_NAME": self.op.node_name,
2221       "NODE_PIP": self.op.primary_ip,
2222       "NODE_SIP": self.op.secondary_ip,
2223       }
2224     nodes_0 = self.cfg.GetNodeList()
2225     nodes_1 = nodes_0 + [self.op.node_name, ]
2226     return env, nodes_0, nodes_1
2227
2228   def CheckPrereq(self):
2229     """Check prerequisites.
2230
2231     This checks:
2232      - the new node is not already in the config
2233      - it is resolvable
2234      - its parameters (single/dual homed) matches the cluster
2235
2236     Any errors are signaled by raising errors.OpPrereqError.
2237
2238     """
2239     node_name = self.op.node_name
2240     cfg = self.cfg
2241
2242     dns_data = utils.HostInfo(node_name)
2243
2244     node = dns_data.name
2245     primary_ip = self.op.primary_ip = dns_data.ip
2246     secondary_ip = getattr(self.op, "secondary_ip", None)
2247     if secondary_ip is None:
2248       secondary_ip = primary_ip
2249     if not utils.IsValidIP(secondary_ip):
2250       raise errors.OpPrereqError("Invalid secondary IP given")
2251     self.op.secondary_ip = secondary_ip
2252
2253     node_list = cfg.GetNodeList()
2254     if not self.op.readd and node in node_list:
2255       raise errors.OpPrereqError("Node %s is already in the configuration" %
2256                                  node)
2257     elif self.op.readd and node not in node_list:
2258       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2259
2260     for existing_node_name in node_list:
2261       existing_node = cfg.GetNodeInfo(existing_node_name)
2262
2263       if self.op.readd and node == existing_node_name:
2264         if (existing_node.primary_ip != primary_ip or
2265             existing_node.secondary_ip != secondary_ip):
2266           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2267                                      " address configuration as before")
2268         continue
2269
2270       if (existing_node.primary_ip == primary_ip or
2271           existing_node.secondary_ip == primary_ip or
2272           existing_node.primary_ip == secondary_ip or
2273           existing_node.secondary_ip == secondary_ip):
2274         raise errors.OpPrereqError("New node ip address(es) conflict with"
2275                                    " existing node %s" % existing_node.name)
2276
2277     # check that the type of the node (single versus dual homed) is the
2278     # same as for the master
2279     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2280     master_singlehomed = myself.secondary_ip == myself.primary_ip
2281     newbie_singlehomed = secondary_ip == primary_ip
2282     if master_singlehomed != newbie_singlehomed:
2283       if master_singlehomed:
2284         raise errors.OpPrereqError("The master has no private ip but the"
2285                                    " new node has one")
2286       else:
2287         raise errors.OpPrereqError("The master has a private ip but the"
2288                                    " new node doesn't have one")
2289
2290     # checks reachability
2291     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2292       raise errors.OpPrereqError("Node not reachable by ping")
2293
2294     if not newbie_singlehomed:
2295       # check reachability from my secondary ip to newbie's secondary ip
2296       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2297                            source=myself.secondary_ip):
2298         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2299                                    " based ping to noded port")
2300
2301     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2302     if self.op.readd:
2303       exceptions = [node]
2304     else:
2305       exceptions = []
2306     mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2307     # the new node will increase mc_max with one, so:
2308     mc_max = min(mc_max + 1, cp_size)
2309     self.master_candidate = mc_now < mc_max
2310
2311     if self.op.readd:
2312       self.new_node = self.cfg.GetNodeInfo(node)
2313       assert self.new_node is not None, "Can't retrieve locked node %s" % node
2314     else:
2315       self.new_node = objects.Node(name=node,
2316                                    primary_ip=primary_ip,
2317                                    secondary_ip=secondary_ip,
2318                                    master_candidate=self.master_candidate,
2319                                    offline=False, drained=False)
2320
2321   def Exec(self, feedback_fn):
2322     """Adds the new node to the cluster.
2323
2324     """
2325     new_node = self.new_node
2326     node = new_node.name
2327
2328     # for re-adds, reset the offline/drained/master-candidate flags;
2329     # we need to reset here, otherwise offline would prevent RPC calls
2330     # later in the procedure; this also means that if the re-add
2331     # fails, we are left with a non-offlined, broken node
2332     if self.op.readd:
2333       new_node.drained = new_node.offline = False
2334       self.LogInfo("Readding a node, the offline/drained flags were reset")
2335       # if we demote the node, we do cleanup later in the procedure
2336       new_node.master_candidate = self.master_candidate
2337
2338     # notify the user about any possible mc promotion
2339     if new_node.master_candidate:
2340       self.LogInfo("Node will be a master candidate")
2341
2342     # check connectivity
2343     result = self.rpc.call_version([node])[node]
2344     result.Raise()
2345     if result.data:
2346       if constants.PROTOCOL_VERSION == result.data:
2347         logging.info("Communication to node %s fine, sw version %s match",
2348                      node, result.data)
2349       else:
2350         raise errors.OpExecError("Version mismatch master version %s,"
2351                                  " node version %s" %
2352                                  (constants.PROTOCOL_VERSION, result.data))
2353     else:
2354       raise errors.OpExecError("Cannot get version from the new node")
2355
2356     # setup ssh on node
2357     logging.info("Copy ssh key to node %s", node)
2358     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2359     keyarray = []
2360     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2361                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2362                 priv_key, pub_key]
2363
2364     for i in keyfiles:
2365       f = open(i, 'r')
2366       try:
2367         keyarray.append(f.read())
2368       finally:
2369         f.close()
2370
2371     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2372                                     keyarray[2],
2373                                     keyarray[3], keyarray[4], keyarray[5])
2374
2375     msg = result.RemoteFailMsg()
2376     if msg:
2377       raise errors.OpExecError("Cannot transfer ssh keys to the"
2378                                " new node: %s" % msg)
2379
2380     # Add node to our /etc/hosts, and add key to known_hosts
2381     if self.cfg.GetClusterInfo().modify_etc_hosts:
2382       utils.AddHostToEtcHosts(new_node.name)
2383
2384     if new_node.secondary_ip != new_node.primary_ip:
2385       result = self.rpc.call_node_has_ip_address(new_node.name,
2386                                                  new_node.secondary_ip)
2387       if result.failed or not result.data:
2388         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2389                                  " you gave (%s). Please fix and re-run this"
2390                                  " command." % new_node.secondary_ip)
2391
2392     node_verify_list = [self.cfg.GetMasterNode()]
2393     node_verify_param = {
2394       'nodelist': [node],
2395       # TODO: do a node-net-test as well?
2396     }
2397
2398     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2399                                        self.cfg.GetClusterName())
2400     for verifier in node_verify_list:
2401       if result[verifier].failed or not result[verifier].data:
2402         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2403                                  " for remote verification" % verifier)
2404       if result[verifier].data['nodelist']:
2405         for failed in result[verifier].data['nodelist']:
2406           feedback_fn("ssh/hostname verification failed %s -> %s" %
2407                       (verifier, result[verifier].data['nodelist'][failed]))
2408         raise errors.OpExecError("ssh/hostname verification failed.")
2409
2410     # Distribute updated /etc/hosts and known_hosts to all nodes,
2411     # including the node just added
2412     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2413     dist_nodes = self.cfg.GetNodeList()
2414     if not self.op.readd:
2415       dist_nodes.append(node)
2416     if myself.name in dist_nodes:
2417       dist_nodes.remove(myself.name)
2418
2419     logging.debug("Copying hosts and known_hosts to all nodes")
2420     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2421       result = self.rpc.call_upload_file(dist_nodes, fname)
2422       for to_node, to_result in result.iteritems():
2423         if to_result.failed or not to_result.data:
2424           logging.error("Copy of file %s to node %s failed", fname, to_node)
2425
2426     to_copy = []
2427     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2428     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2429       to_copy.append(constants.VNC_PASSWORD_FILE)
2430
2431     for fname in to_copy:
2432       result = self.rpc.call_upload_file([node], fname)
2433       if result[node].failed or not result[node]:
2434         logging.error("Could not copy file %s to node %s", fname, node)
2435
2436     if self.op.readd:
2437       self.context.ReaddNode(new_node)
2438       # make sure we redistribute the config
2439       self.cfg.Update(new_node)
2440       # and make sure the new node will not have old files around
2441       if not new_node.master_candidate:
2442         result = self.rpc.call_node_demote_from_mc(new_node.name)
2443         msg = result.RemoteFailMsg()
2444         if msg:
2445           self.LogWarning("Node failed to demote itself from master"
2446                           " candidate status: %s" % msg)
2447     else:
2448       self.context.AddNode(new_node)
2449
2450
2451 class LUSetNodeParams(LogicalUnit):
2452   """Modifies the parameters of a node.
2453
2454   """
2455   HPATH = "node-modify"
2456   HTYPE = constants.HTYPE_NODE
2457   _OP_REQP = ["node_name"]
2458   REQ_BGL = False
2459
2460   def CheckArguments(self):
2461     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2462     if node_name is None:
2463       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2464     self.op.node_name = node_name
2465     _CheckBooleanOpField(self.op, 'master_candidate')
2466     _CheckBooleanOpField(self.op, 'offline')
2467     _CheckBooleanOpField(self.op, 'drained')
2468     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2469     if all_mods.count(None) == 3:
2470       raise errors.OpPrereqError("Please pass at least one modification")
2471     if all_mods.count(True) > 1:
2472       raise errors.OpPrereqError("Can't set the node into more than one"
2473                                  " state at the same time")
2474
2475   def ExpandNames(self):
2476     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2477
2478   def BuildHooksEnv(self):
2479     """Build hooks env.
2480
2481     This runs on the master node.
2482
2483     """
2484     env = {
2485       "OP_TARGET": self.op.node_name,
2486       "MASTER_CANDIDATE": str(self.op.master_candidate),
2487       "OFFLINE": str(self.op.offline),
2488       "DRAINED": str(self.op.drained),
2489       }
2490     nl = [self.cfg.GetMasterNode(),
2491           self.op.node_name]
2492     return env, nl, nl
2493
2494   def CheckPrereq(self):
2495     """Check prerequisites.
2496
2497     This only checks the instance list against the existing names.
2498
2499     """
2500     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2501
2502     if ((self.op.master_candidate == False or self.op.offline == True or
2503          self.op.drained == True) and node.master_candidate):
2504       # we will demote the node from master_candidate
2505       if self.op.node_name == self.cfg.GetMasterNode():
2506         raise errors.OpPrereqError("The master node has to be a"
2507                                    " master candidate, online and not drained")
2508       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2509       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2510       if num_candidates <= cp_size:
2511         msg = ("Not enough master candidates (desired"
2512                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2513         if self.op.force:
2514           self.LogWarning(msg)
2515         else:
2516           raise errors.OpPrereqError(msg)
2517
2518     if (self.op.master_candidate == True and
2519         ((node.offline and not self.op.offline == False) or
2520          (node.drained and not self.op.drained == False))):
2521       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2522                                  " to master_candidate" % node.name)
2523
2524     return
2525
2526   def Exec(self, feedback_fn):
2527     """Modifies a node.
2528
2529     """
2530     node = self.node
2531
2532     result = []
2533     changed_mc = False
2534
2535     if self.op.offline is not None:
2536       node.offline = self.op.offline
2537       result.append(("offline", str(self.op.offline)))
2538       if self.op.offline == True:
2539         if node.master_candidate:
2540           node.master_candidate = False
2541           changed_mc = True
2542           result.append(("master_candidate", "auto-demotion due to offline"))
2543         if node.drained:
2544           node.drained = False
2545           result.append(("drained", "clear drained status due to offline"))
2546
2547     if self.op.master_candidate is not None:
2548       node.master_candidate = self.op.master_candidate
2549       changed_mc = True
2550       result.append(("master_candidate", str(self.op.master_candidate)))
2551       if self.op.master_candidate == False:
2552         rrc = self.rpc.call_node_demote_from_mc(node.name)
2553         msg = rrc.RemoteFailMsg()
2554         if msg:
2555           self.LogWarning("Node failed to demote itself: %s" % msg)
2556
2557     if self.op.drained is not None:
2558       node.drained = self.op.drained
2559       result.append(("drained", str(self.op.drained)))
2560       if self.op.drained == True:
2561         if node.master_candidate:
2562           node.master_candidate = False
2563           changed_mc = True
2564           result.append(("master_candidate", "auto-demotion due to drain"))
2565           rrc = self.rpc.call_node_demote_from_mc(node.name)
2566           msg = rrc.RemoteFailMsg()
2567           if msg:
2568             self.LogWarning("Node failed to demote itself: %s" % msg)
2569         if node.offline:
2570           node.offline = False
2571           result.append(("offline", "clear offline status due to drain"))
2572
2573     # this will trigger configuration file update, if needed
2574     self.cfg.Update(node)
2575     # this will trigger job queue propagation or cleanup
2576     if changed_mc:
2577       self.context.ReaddNode(node)
2578
2579     return result
2580
2581
2582 class LUQueryClusterInfo(NoHooksLU):
2583   """Query cluster configuration.
2584
2585   """
2586   _OP_REQP = []
2587   REQ_BGL = False
2588
2589   def ExpandNames(self):
2590     self.needed_locks = {}
2591
2592   def CheckPrereq(self):
2593     """No prerequsites needed for this LU.
2594
2595     """
2596     pass
2597
2598   def Exec(self, feedback_fn):
2599     """Return cluster config.
2600
2601     """
2602     cluster = self.cfg.GetClusterInfo()
2603     result = {
2604       "software_version": constants.RELEASE_VERSION,
2605       "protocol_version": constants.PROTOCOL_VERSION,
2606       "config_version": constants.CONFIG_VERSION,
2607       "os_api_version": constants.OS_API_VERSION,
2608       "export_version": constants.EXPORT_VERSION,
2609       "architecture": (platform.architecture()[0], platform.machine()),
2610       "name": cluster.cluster_name,
2611       "master": cluster.master_node,
2612       "default_hypervisor": cluster.default_hypervisor,
2613       "enabled_hypervisors": cluster.enabled_hypervisors,
2614       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2615                         for hypervisor_name in cluster.enabled_hypervisors]),
2616       "beparams": cluster.beparams,
2617       "candidate_pool_size": cluster.candidate_pool_size,
2618       "default_bridge": cluster.default_bridge,
2619       "master_netdev": cluster.master_netdev,
2620       "volume_group_name": cluster.volume_group_name,
2621       "file_storage_dir": cluster.file_storage_dir,
2622       }
2623
2624     return result
2625
2626
2627 class LUQueryConfigValues(NoHooksLU):
2628   """Return configuration values.
2629
2630   """
2631   _OP_REQP = []
2632   REQ_BGL = False
2633   _FIELDS_DYNAMIC = utils.FieldSet()
2634   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2635
2636   def ExpandNames(self):
2637     self.needed_locks = {}
2638
2639     _CheckOutputFields(static=self._FIELDS_STATIC,
2640                        dynamic=self._FIELDS_DYNAMIC,
2641                        selected=self.op.output_fields)
2642
2643   def CheckPrereq(self):
2644     """No prerequisites.
2645
2646     """
2647     pass
2648
2649   def Exec(self, feedback_fn):
2650     """Dump a representation of the cluster config to the standard output.
2651
2652     """
2653     values = []
2654     for field in self.op.output_fields:
2655       if field == "cluster_name":
2656         entry = self.cfg.GetClusterName()
2657       elif field == "master_node":
2658         entry = self.cfg.GetMasterNode()
2659       elif field == "drain_flag":
2660         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2661       else:
2662         raise errors.ParameterError(field)
2663       values.append(entry)
2664     return values
2665
2666
2667 class LUActivateInstanceDisks(NoHooksLU):
2668   """Bring up an instance's disks.
2669
2670   """
2671   _OP_REQP = ["instance_name"]
2672   REQ_BGL = False
2673
2674   def ExpandNames(self):
2675     self._ExpandAndLockInstance()
2676     self.needed_locks[locking.LEVEL_NODE] = []
2677     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2678
2679   def DeclareLocks(self, level):
2680     if level == locking.LEVEL_NODE:
2681       self._LockInstancesNodes()
2682
2683   def CheckPrereq(self):
2684     """Check prerequisites.
2685
2686     This checks that the instance is in the cluster.
2687
2688     """
2689     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2690     assert self.instance is not None, \
2691       "Cannot retrieve locked instance %s" % self.op.instance_name
2692     _CheckNodeOnline(self, self.instance.primary_node)
2693     if not hasattr(self.op, "ignore_size"):
2694       self.op.ignore_size = False
2695
2696   def Exec(self, feedback_fn):
2697     """Activate the disks.
2698
2699     """
2700     disks_ok, disks_info = \
2701               _AssembleInstanceDisks(self, self.instance,
2702                                      ignore_size=self.op.ignore_size)
2703     if not disks_ok:
2704       raise errors.OpExecError("Cannot activate block devices")
2705
2706     return disks_info
2707
2708
2709 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
2710                            ignore_size=False):
2711   """Prepare the block devices for an instance.
2712
2713   This sets up the block devices on all nodes.
2714
2715   @type lu: L{LogicalUnit}
2716   @param lu: the logical unit on whose behalf we execute
2717   @type instance: L{objects.Instance}
2718   @param instance: the instance for whose disks we assemble
2719   @type ignore_secondaries: boolean
2720   @param ignore_secondaries: if true, errors on secondary nodes
2721       won't result in an error return from the function
2722   @type ignore_size: boolean
2723   @param ignore_size: if true, the current known size of the disk
2724       will not be used during the disk activation, useful for cases
2725       when the size is wrong
2726   @return: False if the operation failed, otherwise a list of
2727       (host, instance_visible_name, node_visible_name)
2728       with the mapping from node devices to instance devices
2729
2730   """
2731   device_info = []
2732   disks_ok = True
2733   iname = instance.name
2734   # With the two passes mechanism we try to reduce the window of
2735   # opportunity for the race condition of switching DRBD to primary
2736   # before handshaking occured, but we do not eliminate it
2737
2738   # The proper fix would be to wait (with some limits) until the
2739   # connection has been made and drbd transitions from WFConnection
2740   # into any other network-connected state (Connected, SyncTarget,
2741   # SyncSource, etc.)
2742
2743   # 1st pass, assemble on all nodes in secondary mode
2744   for inst_disk in instance.disks:
2745     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2746       if ignore_size:
2747         node_disk = node_disk.Copy()
2748         node_disk.UnsetSize()
2749       lu.cfg.SetDiskID(node_disk, node)
2750       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2751       msg = result.RemoteFailMsg()
2752       if msg:
2753         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2754                            " (is_primary=False, pass=1): %s",
2755                            inst_disk.iv_name, node, msg)
2756         if not ignore_secondaries:
2757           disks_ok = False
2758
2759   # FIXME: race condition on drbd migration to primary
2760
2761   # 2nd pass, do only the primary node
2762   for inst_disk in instance.disks:
2763     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2764       if node != instance.primary_node:
2765         continue
2766       if ignore_size:
2767         node_disk = node_disk.Copy()
2768         node_disk.UnsetSize()
2769       lu.cfg.SetDiskID(node_disk, node)
2770       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2771       msg = result.RemoteFailMsg()
2772       if msg:
2773         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2774                            " (is_primary=True, pass=2): %s",
2775                            inst_disk.iv_name, node, msg)
2776         disks_ok = False
2777     device_info.append((instance.primary_node, inst_disk.iv_name,
2778                         result.payload))
2779
2780   # leave the disks configured for the primary node
2781   # this is a workaround that would be fixed better by
2782   # improving the logical/physical id handling
2783   for disk in instance.disks:
2784     lu.cfg.SetDiskID(disk, instance.primary_node)
2785
2786   return disks_ok, device_info
2787
2788
2789 def _StartInstanceDisks(lu, instance, force):
2790   """Start the disks of an instance.
2791
2792   """
2793   disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2794                                            ignore_secondaries=force)
2795   if not disks_ok:
2796     _ShutdownInstanceDisks(lu, instance)
2797     if force is not None and not force:
2798       lu.proc.LogWarning("", hint="If the message above refers to a"
2799                          " secondary node,"
2800                          " you can retry the operation using '--force'.")
2801     raise errors.OpExecError("Disk consistency error")
2802
2803
2804 class LUDeactivateInstanceDisks(NoHooksLU):
2805   """Shutdown an instance's disks.
2806
2807   """
2808   _OP_REQP = ["instance_name"]
2809   REQ_BGL = False
2810
2811   def ExpandNames(self):
2812     self._ExpandAndLockInstance()
2813     self.needed_locks[locking.LEVEL_NODE] = []
2814     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2815
2816   def DeclareLocks(self, level):
2817     if level == locking.LEVEL_NODE:
2818       self._LockInstancesNodes()
2819
2820   def CheckPrereq(self):
2821     """Check prerequisites.
2822
2823     This checks that the instance is in the cluster.
2824
2825     """
2826     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2827     assert self.instance is not None, \
2828       "Cannot retrieve locked instance %s" % self.op.instance_name
2829
2830   def Exec(self, feedback_fn):
2831     """Deactivate the disks
2832
2833     """
2834     instance = self.instance
2835     _SafeShutdownInstanceDisks(self, instance)
2836
2837
2838 def _SafeShutdownInstanceDisks(lu, instance):
2839   """Shutdown block devices of an instance.
2840
2841   This function checks if an instance is running, before calling
2842   _ShutdownInstanceDisks.
2843
2844   """
2845   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2846                                       [instance.hypervisor])
2847   ins_l = ins_l[instance.primary_node]
2848   if ins_l.failed or not isinstance(ins_l.data, list):
2849     raise errors.OpExecError("Can't contact node '%s'" %
2850                              instance.primary_node)
2851
2852   if instance.name in ins_l.data:
2853     raise errors.OpExecError("Instance is running, can't shutdown"
2854                              " block devices.")
2855
2856   _ShutdownInstanceDisks(lu, instance)
2857
2858
2859 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2860   """Shutdown block devices of an instance.
2861
2862   This does the shutdown on all nodes of the instance.
2863
2864   If the ignore_primary is false, errors on the primary node are
2865   ignored.
2866
2867   """
2868   all_result = True
2869   for disk in instance.disks:
2870     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2871       lu.cfg.SetDiskID(top_disk, node)
2872       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2873       msg = result.RemoteFailMsg()
2874       if msg:
2875         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2876                       disk.iv_name, node, msg)
2877         if not ignore_primary or node != instance.primary_node:
2878           all_result = False
2879   return all_result
2880
2881
2882 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2883   """Checks if a node has enough free memory.
2884
2885   This function check if a given node has the needed amount of free
2886   memory. In case the node has less memory or we cannot get the
2887   information from the node, this function raise an OpPrereqError
2888   exception.
2889
2890   @type lu: C{LogicalUnit}
2891   @param lu: a logical unit from which we get configuration data
2892   @type node: C{str}
2893   @param node: the node to check
2894   @type reason: C{str}
2895   @param reason: string to use in the error message
2896   @type requested: C{int}
2897   @param requested: the amount of memory in MiB to check for
2898   @type hypervisor_name: C{str}
2899   @param hypervisor_name: the hypervisor to ask for memory stats
2900   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2901       we cannot check the node
2902
2903   """
2904   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2905   nodeinfo[node].Raise()
2906   free_mem = nodeinfo[node].data.get('memory_free')
2907   if not isinstance(free_mem, int):
2908     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2909                              " was '%s'" % (node, free_mem))
2910   if requested > free_mem:
2911     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2912                              " needed %s MiB, available %s MiB" %
2913                              (node, reason, requested, free_mem))
2914
2915
2916 class LUStartupInstance(LogicalUnit):
2917   """Starts an instance.
2918
2919   """
2920   HPATH = "instance-start"
2921   HTYPE = constants.HTYPE_INSTANCE
2922   _OP_REQP = ["instance_name", "force"]
2923   REQ_BGL = False
2924
2925   def ExpandNames(self):
2926     self._ExpandAndLockInstance()
2927
2928   def BuildHooksEnv(self):
2929     """Build hooks env.
2930
2931     This runs on master, primary and secondary nodes of the instance.
2932
2933     """
2934     env = {
2935       "FORCE": self.op.force,
2936       }
2937     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2938     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2939     return env, nl, nl
2940
2941   def CheckPrereq(self):
2942     """Check prerequisites.
2943
2944     This checks that the instance is in the cluster.
2945
2946     """
2947     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2948     assert self.instance is not None, \
2949       "Cannot retrieve locked instance %s" % self.op.instance_name
2950
2951     # extra beparams
2952     self.beparams = getattr(self.op, "beparams", {})
2953     if self.beparams:
2954       if not isinstance(self.beparams, dict):
2955         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2956                                    " dict" % (type(self.beparams), ))
2957       # fill the beparams dict
2958       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2959       self.op.beparams = self.beparams
2960
2961     # extra hvparams
2962     self.hvparams = getattr(self.op, "hvparams", {})
2963     if self.hvparams:
2964       if not isinstance(self.hvparams, dict):
2965         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2966                                    " dict" % (type(self.hvparams), ))
2967
2968       # check hypervisor parameter syntax (locally)
2969       cluster = self.cfg.GetClusterInfo()
2970       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2971       filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2972                                     instance.hvparams)
2973       filled_hvp.update(self.hvparams)
2974       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2975       hv_type.CheckParameterSyntax(filled_hvp)
2976       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2977       self.op.hvparams = self.hvparams
2978
2979     _CheckNodeOnline(self, instance.primary_node)
2980
2981     bep = self.cfg.GetClusterInfo().FillBE(instance)
2982     # check bridges existence
2983     _CheckInstanceBridgesExist(self, instance)
2984
2985     remote_info = self.rpc.call_instance_info(instance.primary_node,
2986                                               instance.name,
2987                                               instance.hypervisor)
2988     remote_info.Raise()
2989     if not remote_info.data:
2990       _CheckNodeFreeMemory(self, instance.primary_node,
2991                            "starting instance %s" % instance.name,
2992                            bep[constants.BE_MEMORY], instance.hypervisor)
2993
2994   def Exec(self, feedback_fn):
2995     """Start the instance.
2996
2997     """
2998     instance = self.instance
2999     force = self.op.force
3000
3001     self.cfg.MarkInstanceUp(instance.name)
3002
3003     node_current = instance.primary_node
3004
3005     _StartInstanceDisks(self, instance, force)
3006
3007     result = self.rpc.call_instance_start(node_current, instance,
3008                                           self.hvparams, self.beparams)
3009     msg = result.RemoteFailMsg()
3010     if msg:
3011       _ShutdownInstanceDisks(self, instance)
3012       raise errors.OpExecError("Could not start instance: %s" % msg)
3013
3014
3015 class LURebootInstance(LogicalUnit):
3016   """Reboot an instance.
3017
3018   """
3019   HPATH = "instance-reboot"
3020   HTYPE = constants.HTYPE_INSTANCE
3021   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3022   REQ_BGL = False
3023
3024   def ExpandNames(self):
3025     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3026                                    constants.INSTANCE_REBOOT_HARD,
3027                                    constants.INSTANCE_REBOOT_FULL]:
3028       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3029                                   (constants.INSTANCE_REBOOT_SOFT,
3030                                    constants.INSTANCE_REBOOT_HARD,
3031                                    constants.INSTANCE_REBOOT_FULL))
3032     self._ExpandAndLockInstance()
3033
3034   def BuildHooksEnv(self):
3035     """Build hooks env.
3036
3037     This runs on master, primary and secondary nodes of the instance.
3038
3039     """
3040     env = {
3041       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3042       "REBOOT_TYPE": self.op.reboot_type,
3043       }
3044     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3045     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3046     return env, nl, nl
3047
3048   def CheckPrereq(self):
3049     """Check prerequisites.
3050
3051     This checks that the instance is in the cluster.
3052
3053     """
3054     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3055     assert self.instance is not None, \
3056       "Cannot retrieve locked instance %s" % self.op.instance_name
3057
3058     _CheckNodeOnline(self, instance.primary_node)
3059
3060     # check bridges existence
3061     _CheckInstanceBridgesExist(self, instance)
3062
3063   def Exec(self, feedback_fn):
3064     """Reboot the instance.
3065
3066     """
3067     instance = self.instance
3068     ignore_secondaries = self.op.ignore_secondaries
3069     reboot_type = self.op.reboot_type
3070
3071     node_current = instance.primary_node
3072
3073     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3074                        constants.INSTANCE_REBOOT_HARD]:
3075       for disk in instance.disks:
3076         self.cfg.SetDiskID(disk, node_current)
3077       result = self.rpc.call_instance_reboot(node_current, instance,
3078                                              reboot_type)
3079       msg = result.RemoteFailMsg()
3080       if msg:
3081         raise errors.OpExecError("Could not reboot instance: %s" % msg)
3082     else:
3083       result = self.rpc.call_instance_shutdown(node_current, instance)
3084       msg = result.RemoteFailMsg()
3085       if msg:
3086         raise errors.OpExecError("Could not shutdown instance for"
3087                                  " full reboot: %s" % msg)
3088       _ShutdownInstanceDisks(self, instance)
3089       _StartInstanceDisks(self, instance, ignore_secondaries)
3090       result = self.rpc.call_instance_start(node_current, instance, None, None)
3091       msg = result.RemoteFailMsg()
3092       if msg:
3093         _ShutdownInstanceDisks(self, instance)
3094         raise errors.OpExecError("Could not start instance for"
3095                                  " full reboot: %s" % msg)
3096
3097     self.cfg.MarkInstanceUp(instance.name)
3098
3099
3100 class LUShutdownInstance(LogicalUnit):
3101   """Shutdown an instance.
3102
3103   """
3104   HPATH = "instance-stop"
3105   HTYPE = constants.HTYPE_INSTANCE
3106   _OP_REQP = ["instance_name"]
3107   REQ_BGL = False
3108
3109   def ExpandNames(self):
3110     self._ExpandAndLockInstance()
3111
3112   def BuildHooksEnv(self):
3113     """Build hooks env.
3114
3115     This runs on master, primary and secondary nodes of the instance.
3116
3117     """
3118     env = _BuildInstanceHookEnvByObject(self, self.instance)
3119     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3120     return env, nl, nl
3121
3122   def CheckPrereq(self):
3123     """Check prerequisites.
3124
3125     This checks that the instance is in the cluster.
3126
3127     """
3128     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3129     assert self.instance is not None, \
3130       "Cannot retrieve locked instance %s" % self.op.instance_name
3131     _CheckNodeOnline(self, self.instance.primary_node)
3132
3133   def Exec(self, feedback_fn):
3134     """Shutdown the instance.
3135
3136     """
3137     instance = self.instance
3138     node_current = instance.primary_node
3139     self.cfg.MarkInstanceDown(instance.name)
3140     result = self.rpc.call_instance_shutdown(node_current, instance)
3141     msg = result.RemoteFailMsg()
3142     if msg:
3143       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3144
3145     _ShutdownInstanceDisks(self, instance)
3146
3147
3148 class LUReinstallInstance(LogicalUnit):
3149   """Reinstall an instance.
3150
3151   """
3152   HPATH = "instance-reinstall"
3153   HTYPE = constants.HTYPE_INSTANCE
3154   _OP_REQP = ["instance_name"]
3155   REQ_BGL = False
3156
3157   def ExpandNames(self):
3158     self._ExpandAndLockInstance()
3159
3160   def BuildHooksEnv(self):
3161     """Build hooks env.
3162
3163     This runs on master, primary and secondary nodes of the instance.
3164
3165     """
3166     env = _BuildInstanceHookEnvByObject(self, self.instance)
3167     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3168     return env, nl, nl
3169
3170   def CheckPrereq(self):
3171     """Check prerequisites.
3172
3173     This checks that the instance is in the cluster and is not running.
3174
3175     """
3176     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3177     assert instance is not None, \
3178       "Cannot retrieve locked instance %s" % self.op.instance_name
3179     _CheckNodeOnline(self, instance.primary_node)
3180
3181     if instance.disk_template == constants.DT_DISKLESS:
3182       raise errors.OpPrereqError("Instance '%s' has no disks" %
3183                                  self.op.instance_name)
3184     if instance.admin_up:
3185       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3186                                  self.op.instance_name)
3187     remote_info = self.rpc.call_instance_info(instance.primary_node,
3188                                               instance.name,
3189                                               instance.hypervisor)
3190     remote_info.Raise()
3191     if remote_info.data:
3192       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3193                                  (self.op.instance_name,
3194                                   instance.primary_node))
3195
3196     self.op.os_type = getattr(self.op, "os_type", None)
3197     if self.op.os_type is not None:
3198       # OS verification
3199       pnode = self.cfg.GetNodeInfo(
3200         self.cfg.ExpandNodeName(instance.primary_node))
3201       if pnode is None:
3202         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3203                                    self.op.pnode)
3204       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3205       result.Raise()
3206       if not isinstance(result.data, objects.OS):
3207         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3208                                    " primary node"  % self.op.os_type)
3209
3210     self.instance = instance
3211
3212   def Exec(self, feedback_fn):
3213     """Reinstall the instance.
3214
3215     """
3216     inst = self.instance
3217
3218     if self.op.os_type is not None:
3219       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3220       inst.os = self.op.os_type
3221       self.cfg.Update(inst)
3222
3223     _StartInstanceDisks(self, inst, None)
3224     try:
3225       feedback_fn("Running the instance OS create scripts...")
3226       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3227       msg = result.RemoteFailMsg()
3228       if msg:
3229         raise errors.OpExecError("Could not install OS for instance %s"
3230                                  " on node %s: %s" %
3231                                  (inst.name, inst.primary_node, msg))
3232     finally:
3233       _ShutdownInstanceDisks(self, inst)
3234
3235
3236 class LURenameInstance(LogicalUnit):
3237   """Rename an instance.
3238
3239   """
3240   HPATH = "instance-rename"
3241   HTYPE = constants.HTYPE_INSTANCE
3242   _OP_REQP = ["instance_name", "new_name"]
3243
3244   def BuildHooksEnv(self):
3245     """Build hooks env.
3246
3247     This runs on master, primary and secondary nodes of the instance.
3248
3249     """
3250     env = _BuildInstanceHookEnvByObject(self, self.instance)
3251     env["INSTANCE_NEW_NAME"] = self.op.new_name
3252     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3253     return env, nl, nl
3254
3255   def CheckPrereq(self):
3256     """Check prerequisites.
3257
3258     This checks that the instance is in the cluster and is not running.
3259
3260     """
3261     instance = self.cfg.GetInstanceInfo(
3262       self.cfg.ExpandInstanceName(self.op.instance_name))
3263     if instance is None:
3264       raise errors.OpPrereqError("Instance '%s' not known" %
3265                                  self.op.instance_name)
3266     _CheckNodeOnline(self, instance.primary_node)
3267
3268     if instance.admin_up:
3269       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3270                                  self.op.instance_name)
3271     remote_info = self.rpc.call_instance_info(instance.primary_node,
3272                                               instance.name,
3273                                               instance.hypervisor)
3274     remote_info.Raise()
3275     if remote_info.data:
3276       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3277                                  (self.op.instance_name,
3278                                   instance.primary_node))
3279     self.instance = instance
3280
3281     # new name verification
3282     name_info = utils.HostInfo(self.op.new_name)
3283
3284     self.op.new_name = new_name = name_info.name
3285     instance_list = self.cfg.GetInstanceList()
3286     if new_name in instance_list:
3287       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3288                                  new_name)
3289
3290     if not getattr(self.op, "ignore_ip", False):
3291       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3292         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3293                                    (name_info.ip, new_name))
3294
3295
3296   def Exec(self, feedback_fn):
3297     """Reinstall the instance.
3298
3299     """
3300     inst = self.instance
3301     old_name = inst.name
3302
3303     if inst.disk_template == constants.DT_FILE:
3304       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3305
3306     self.cfg.RenameInstance(inst.name, self.op.new_name)
3307     # Change the instance lock. This is definitely safe while we hold the BGL
3308     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3309     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3310
3311     # re-read the instance from the configuration after rename
3312     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3313
3314     if inst.disk_template == constants.DT_FILE:
3315       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3316       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3317                                                      old_file_storage_dir,
3318                                                      new_file_storage_dir)
3319       result.Raise()
3320       if not result.data:
3321         raise errors.OpExecError("Could not connect to node '%s' to rename"
3322                                  " directory '%s' to '%s' (but the instance"
3323                                  " has been renamed in Ganeti)" % (
3324                                  inst.primary_node, old_file_storage_dir,
3325                                  new_file_storage_dir))
3326
3327       if not result.data[0]:
3328         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3329                                  " (but the instance has been renamed in"
3330                                  " Ganeti)" % (old_file_storage_dir,
3331                                                new_file_storage_dir))
3332
3333     _StartInstanceDisks(self, inst, None)
3334     try:
3335       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3336                                                  old_name)
3337       msg = result.RemoteFailMsg()
3338       if msg:
3339         msg = ("Could not run OS rename script for instance %s on node %s"
3340                " (but the instance has been renamed in Ganeti): %s" %
3341                (inst.name, inst.primary_node, msg))
3342         self.proc.LogWarning(msg)
3343     finally:
3344       _ShutdownInstanceDisks(self, inst)
3345
3346
3347 class LURemoveInstance(LogicalUnit):
3348   """Remove an instance.
3349
3350   """
3351   HPATH = "instance-remove"
3352   HTYPE = constants.HTYPE_INSTANCE
3353   _OP_REQP = ["instance_name", "ignore_failures"]
3354   REQ_BGL = False
3355
3356   def ExpandNames(self):
3357     self._ExpandAndLockInstance()
3358     self.needed_locks[locking.LEVEL_NODE] = []
3359     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3360
3361   def DeclareLocks(self, level):
3362     if level == locking.LEVEL_NODE:
3363       self._LockInstancesNodes()
3364
3365   def BuildHooksEnv(self):
3366     """Build hooks env.
3367
3368     This runs on master, primary and secondary nodes of the instance.
3369
3370     """
3371     env = _BuildInstanceHookEnvByObject(self, self.instance)
3372     nl = [self.cfg.GetMasterNode()]
3373     return env, nl, nl
3374
3375   def CheckPrereq(self):
3376     """Check prerequisites.
3377
3378     This checks that the instance is in the cluster.
3379
3380     """
3381     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3382     assert self.instance is not None, \
3383       "Cannot retrieve locked instance %s" % self.op.instance_name
3384
3385   def Exec(self, feedback_fn):
3386     """Remove the instance.
3387
3388     """
3389     instance = self.instance
3390     logging.info("Shutting down instance %s on node %s",
3391                  instance.name, instance.primary_node)
3392
3393     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3394     msg = result.RemoteFailMsg()
3395     if msg:
3396       if self.op.ignore_failures:
3397         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3398       else:
3399         raise errors.OpExecError("Could not shutdown instance %s on"
3400                                  " node %s: %s" %
3401                                  (instance.name, instance.primary_node, msg))
3402
3403     logging.info("Removing block devices for instance %s", instance.name)
3404
3405     if not _RemoveDisks(self, instance):
3406       if self.op.ignore_failures:
3407         feedback_fn("Warning: can't remove instance's disks")
3408       else:
3409         raise errors.OpExecError("Can't remove instance's disks")
3410
3411     logging.info("Removing instance %s out of cluster config", instance.name)
3412
3413     self.cfg.RemoveInstance(instance.name)
3414     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3415
3416
3417 class LUQueryInstances(NoHooksLU):
3418   """Logical unit for querying instances.
3419
3420   """
3421   _OP_REQP = ["output_fields", "names", "use_locking"]
3422   REQ_BGL = False
3423   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3424                                     "admin_state",
3425                                     "disk_template", "ip", "mac", "bridge",
3426                                     "sda_size", "sdb_size", "vcpus", "tags",
3427                                     "network_port", "beparams",
3428                                     r"(disk)\.(size)/([0-9]+)",
3429                                     r"(disk)\.(sizes)", "disk_usage",
3430                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3431                                     r"(nic)\.(macs|ips|bridges)",
3432                                     r"(disk|nic)\.(count)",
3433                                     "serial_no", "hypervisor", "hvparams",] +
3434                                   ["hv/%s" % name
3435                                    for name in constants.HVS_PARAMETERS] +
3436                                   ["be/%s" % name
3437                                    for name in constants.BES_PARAMETERS])
3438   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3439
3440
3441   def ExpandNames(self):
3442     _CheckOutputFields(static=self._FIELDS_STATIC,
3443                        dynamic=self._FIELDS_DYNAMIC,
3444                        selected=self.op.output_fields)
3445
3446     self.needed_locks = {}
3447     self.share_locks[locking.LEVEL_INSTANCE] = 1
3448     self.share_locks[locking.LEVEL_NODE] = 1
3449
3450     if self.op.names:
3451       self.wanted = _GetWantedInstances(self, self.op.names)
3452     else:
3453       self.wanted = locking.ALL_SET
3454
3455     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3456     self.do_locking = self.do_node_query and self.op.use_locking
3457     if self.do_locking:
3458       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3459       self.needed_locks[locking.LEVEL_NODE] = []
3460       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3461
3462   def DeclareLocks(self, level):
3463     if level == locking.LEVEL_NODE and self.do_locking:
3464       self._LockInstancesNodes()
3465
3466   def CheckPrereq(self):
3467     """Check prerequisites.
3468
3469     """
3470     pass
3471
3472   def Exec(self, feedback_fn):
3473     """Computes the list of nodes and their attributes.
3474
3475     """
3476     all_info = self.cfg.GetAllInstancesInfo()
3477     if self.wanted == locking.ALL_SET:
3478       # caller didn't specify instance names, so ordering is not important
3479       if self.do_locking:
3480         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3481       else:
3482         instance_names = all_info.keys()
3483       instance_names = utils.NiceSort(instance_names)
3484     else:
3485       # caller did specify names, so we must keep the ordering
3486       if self.do_locking:
3487         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3488       else:
3489         tgt_set = all_info.keys()
3490       missing = set(self.wanted).difference(tgt_set)
3491       if missing:
3492         raise errors.OpExecError("Some instances were removed before"
3493                                  " retrieving their data: %s" % missing)
3494       instance_names = self.wanted
3495
3496     instance_list = [all_info[iname] for iname in instance_names]
3497
3498     # begin data gathering
3499
3500     nodes = frozenset([inst.primary_node for inst in instance_list])
3501     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3502
3503     bad_nodes = []
3504     off_nodes = []
3505     if self.do_node_query:
3506       live_data = {}
3507       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3508       for name in nodes:
3509         result = node_data[name]
3510         if result.offline:
3511           # offline nodes will be in both lists
3512           off_nodes.append(name)
3513         if result.failed:
3514           bad_nodes.append(name)
3515         else:
3516           if result.data:
3517             live_data.update(result.data)
3518             # else no instance is alive
3519     else:
3520       live_data = dict([(name, {}) for name in instance_names])
3521
3522     # end data gathering
3523
3524     HVPREFIX = "hv/"
3525     BEPREFIX = "be/"
3526     output = []
3527     for instance in instance_list:
3528       iout = []
3529       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3530       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3531       for field in self.op.output_fields:
3532         st_match = self._FIELDS_STATIC.Matches(field)
3533         if field == "name":
3534           val = instance.name
3535         elif field == "os":
3536           val = instance.os
3537         elif field == "pnode":
3538           val = instance.primary_node
3539         elif field == "snodes":
3540           val = list(instance.secondary_nodes)
3541         elif field == "admin_state":
3542           val = instance.admin_up
3543         elif field == "oper_state":
3544           if instance.primary_node in bad_nodes:
3545             val = None
3546           else:
3547             val = bool(live_data.get(instance.name))
3548         elif field == "status":
3549           if instance.primary_node in off_nodes:
3550             val = "ERROR_nodeoffline"
3551           elif instance.primary_node in bad_nodes:
3552             val = "ERROR_nodedown"
3553           else:
3554             running = bool(live_data.get(instance.name))
3555             if running:
3556               if instance.admin_up:
3557                 val = "running"
3558               else:
3559                 val = "ERROR_up"
3560             else:
3561               if instance.admin_up:
3562                 val = "ERROR_down"
3563               else:
3564                 val = "ADMIN_down"
3565         elif field == "oper_ram":
3566           if instance.primary_node in bad_nodes:
3567             val = None
3568           elif instance.name in live_data:
3569             val = live_data[instance.name].get("memory", "?")
3570           else:
3571             val = "-"
3572         elif field == "vcpus":
3573           val = i_be[constants.BE_VCPUS]
3574         elif field == "disk_template":
3575           val = instance.disk_template
3576         elif field == "ip":
3577           if instance.nics:
3578             val = instance.nics[0].ip
3579           else:
3580             val = None
3581         elif field == "bridge":
3582           if instance.nics:
3583             val = instance.nics[0].bridge
3584           else:
3585             val = None
3586         elif field == "mac":
3587           if instance.nics:
3588             val = instance.nics[0].mac
3589           else:
3590             val = None
3591         elif field == "sda_size" or field == "sdb_size":
3592           idx = ord(field[2]) - ord('a')
3593           try:
3594             val = instance.FindDisk(idx).size
3595           except errors.OpPrereqError:
3596             val = None
3597         elif field == "disk_usage": # total disk usage per node
3598           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3599           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3600         elif field == "tags":
3601           val = list(instance.GetTags())
3602         elif field == "serial_no":
3603           val = instance.serial_no
3604         elif field == "network_port":
3605           val = instance.network_port
3606         elif field == "hypervisor":
3607           val = instance.hypervisor
3608         elif field == "hvparams":
3609           val = i_hv
3610         elif (field.startswith(HVPREFIX) and
3611               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3612           val = i_hv.get(field[len(HVPREFIX):], None)
3613         elif field == "beparams":
3614           val = i_be
3615         elif (field.startswith(BEPREFIX) and
3616               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3617           val = i_be.get(field[len(BEPREFIX):], None)
3618         elif st_match and st_match.groups():
3619           # matches a variable list
3620           st_groups = st_match.groups()
3621           if st_groups and st_groups[0] == "disk":
3622             if st_groups[1] == "count":
3623               val = len(instance.disks)
3624             elif st_groups[1] == "sizes":
3625               val = [disk.size for disk in instance.disks]
3626             elif st_groups[1] == "size":
3627               try:
3628                 val = instance.FindDisk(st_groups[2]).size
3629               except errors.OpPrereqError:
3630                 val = None
3631             else:
3632               assert False, "Unhandled disk parameter"
3633           elif st_groups[0] == "nic":
3634             if st_groups[1] == "count":
3635               val = len(instance.nics)
3636             elif st_groups[1] == "macs":
3637               val = [nic.mac for nic in instance.nics]
3638             elif st_groups[1] == "ips":
3639               val = [nic.ip for nic in instance.nics]
3640             elif st_groups[1] == "bridges":
3641               val = [nic.bridge for nic in instance.nics]
3642             else:
3643               # index-based item
3644               nic_idx = int(st_groups[2])
3645               if nic_idx >= len(instance.nics):
3646                 val = None
3647               else:
3648                 if st_groups[1] == "mac":
3649                   val = instance.nics[nic_idx].mac
3650                 elif st_groups[1] == "ip":
3651                   val = instance.nics[nic_idx].ip
3652                 elif st_groups[1] == "bridge":
3653                   val = instance.nics[nic_idx].bridge
3654                 else:
3655                   assert False, "Unhandled NIC parameter"
3656           else:
3657             assert False, ("Declared but unhandled variable parameter '%s'" %
3658                            field)
3659         else:
3660           assert False, "Declared but unhandled parameter '%s'" % field
3661         iout.append(val)
3662       output.append(iout)
3663
3664     return output
3665
3666
3667 class LUFailoverInstance(LogicalUnit):
3668   """Failover an instance.
3669
3670   """
3671   HPATH = "instance-failover"
3672   HTYPE = constants.HTYPE_INSTANCE
3673   _OP_REQP = ["instance_name", "ignore_consistency"]
3674   REQ_BGL = False
3675
3676   def ExpandNames(self):
3677     self._ExpandAndLockInstance()
3678     self.needed_locks[locking.LEVEL_NODE] = []
3679     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3680
3681   def DeclareLocks(self, level):
3682     if level == locking.LEVEL_NODE:
3683       self._LockInstancesNodes()
3684
3685   def BuildHooksEnv(self):
3686     """Build hooks env.
3687
3688     This runs on master, primary and secondary nodes of the instance.
3689
3690     """
3691     env = {
3692       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3693       }
3694     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3695     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3696     return env, nl, nl
3697
3698   def CheckPrereq(self):
3699     """Check prerequisites.
3700
3701     This checks that the instance is in the cluster.
3702
3703     """
3704     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3705     assert self.instance is not None, \
3706       "Cannot retrieve locked instance %s" % self.op.instance_name
3707
3708     bep = self.cfg.GetClusterInfo().FillBE(instance)
3709     if instance.disk_template not in constants.DTS_NET_MIRROR:
3710       raise errors.OpPrereqError("Instance's disk layout is not"
3711                                  " network mirrored, cannot failover.")
3712
3713     secondary_nodes = instance.secondary_nodes
3714     if not secondary_nodes:
3715       raise errors.ProgrammerError("no secondary node but using "
3716                                    "a mirrored disk template")
3717
3718     target_node = secondary_nodes[0]
3719     _CheckNodeOnline(self, target_node)
3720     _CheckNodeNotDrained(self, target_node)
3721
3722     if instance.admin_up:
3723       # check memory requirements on the secondary node
3724       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3725                            instance.name, bep[constants.BE_MEMORY],
3726                            instance.hypervisor)
3727     else:
3728       self.LogInfo("Not checking memory on the secondary node as"
3729                    " instance will not be started")
3730
3731     # check bridge existence
3732     brlist = [nic.bridge for nic in instance.nics]
3733     result = self.rpc.call_bridges_exist(target_node, brlist)
3734     result.Raise()
3735     if not result.data:
3736       raise errors.OpPrereqError("One or more target bridges %s does not"
3737                                  " exist on destination node '%s'" %
3738                                  (brlist, target_node))
3739
3740   def Exec(self, feedback_fn):
3741     """Failover an instance.
3742
3743     The failover is done by shutting it down on its present node and
3744     starting it on the secondary.
3745
3746     """
3747     instance = self.instance
3748
3749     source_node = instance.primary_node
3750     target_node = instance.secondary_nodes[0]
3751
3752     feedback_fn("* checking disk consistency between source and target")
3753     for dev in instance.disks:
3754       # for drbd, these are drbd over lvm
3755       if not _CheckDiskConsistency(self, dev, target_node, False):
3756         if instance.admin_up and not self.op.ignore_consistency:
3757           raise errors.OpExecError("Disk %s is degraded on target node,"
3758                                    " aborting failover." % dev.iv_name)
3759
3760     feedback_fn("* shutting down instance on source node")
3761     logging.info("Shutting down instance %s on node %s",
3762                  instance.name, source_node)
3763
3764     result = self.rpc.call_instance_shutdown(source_node, instance)
3765     msg = result.RemoteFailMsg()
3766     if msg:
3767       if self.op.ignore_consistency:
3768         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3769                              " Proceeding anyway. Please make sure node"
3770                              " %s is down. Error details: %s",
3771                              instance.name, source_node, source_node, msg)
3772       else:
3773         raise errors.OpExecError("Could not shutdown instance %s on"
3774                                  " node %s: %s" %
3775                                  (instance.name, source_node, msg))
3776
3777     feedback_fn("* deactivating the instance's disks on source node")
3778     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3779       raise errors.OpExecError("Can't shut down the instance's disks.")
3780
3781     instance.primary_node = target_node
3782     # distribute new instance config to the other nodes
3783     self.cfg.Update(instance)
3784
3785     # Only start the instance if it's marked as up
3786     if instance.admin_up:
3787       feedback_fn("* activating the instance's disks on target node")
3788       logging.info("Starting instance %s on node %s",
3789                    instance.name, target_node)
3790
3791       disks_ok, _ = _AssembleInstanceDisks(self, instance,
3792                                                ignore_secondaries=True)
3793       if not disks_ok:
3794         _ShutdownInstanceDisks(self, instance)
3795         raise errors.OpExecError("Can't activate the instance's disks")
3796
3797       feedback_fn("* starting the instance on the target node")
3798       result = self.rpc.call_instance_start(target_node, instance, None, None)
3799       msg = result.RemoteFailMsg()
3800       if msg:
3801         _ShutdownInstanceDisks(self, instance)
3802         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3803                                  (instance.name, target_node, msg))
3804
3805
3806 class LUMigrateInstance(LogicalUnit):
3807   """Migrate an instance.
3808
3809   This is migration without shutting down, compared to the failover,
3810   which is done with shutdown.
3811
3812   """
3813   HPATH = "instance-migrate"
3814   HTYPE = constants.HTYPE_INSTANCE
3815   _OP_REQP = ["instance_name", "live", "cleanup"]
3816
3817   REQ_BGL = False
3818
3819   def ExpandNames(self):
3820     self._ExpandAndLockInstance()
3821     self.needed_locks[locking.LEVEL_NODE] = []
3822     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3823
3824   def DeclareLocks(self, level):
3825     if level == locking.LEVEL_NODE:
3826       self._LockInstancesNodes()
3827
3828   def BuildHooksEnv(self):
3829     """Build hooks env.
3830
3831     This runs on master, primary and secondary nodes of the instance.
3832
3833     """
3834     env = _BuildInstanceHookEnvByObject(self, self.instance)
3835     env["MIGRATE_LIVE"] = self.op.live
3836     env["MIGRATE_CLEANUP"] = self.op.cleanup
3837     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3838     return env, nl, nl
3839
3840   def CheckPrereq(self):
3841     """Check prerequisites.
3842
3843     This checks that the instance is in the cluster.
3844
3845     """
3846     instance = self.cfg.GetInstanceInfo(
3847       self.cfg.ExpandInstanceName(self.op.instance_name))
3848     if instance is None:
3849       raise errors.OpPrereqError("Instance '%s' not known" %
3850                                  self.op.instance_name)
3851
3852     if instance.disk_template != constants.DT_DRBD8:
3853       raise errors.OpPrereqError("Instance's disk layout is not"
3854                                  " drbd8, cannot migrate.")
3855
3856     secondary_nodes = instance.secondary_nodes
3857     if not secondary_nodes:
3858       raise errors.ConfigurationError("No secondary node but using"
3859                                       " drbd8 disk template")
3860
3861     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3862
3863     target_node = secondary_nodes[0]
3864     # check memory requirements on the secondary node
3865     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3866                          instance.name, i_be[constants.BE_MEMORY],
3867                          instance.hypervisor)
3868
3869     # check bridge existence
3870     brlist = [nic.bridge for nic in instance.nics]
3871     result = self.rpc.call_bridges_exist(target_node, brlist)
3872     if result.failed or not result.data:
3873       raise errors.OpPrereqError("One or more target bridges %s does not"
3874                                  " exist on destination node '%s'" %
3875                                  (brlist, target_node))
3876
3877     if not self.op.cleanup:
3878       _CheckNodeNotDrained(self, target_node)
3879       result = self.rpc.call_instance_migratable(instance.primary_node,
3880                                                  instance)
3881       msg = result.RemoteFailMsg()
3882       if msg:
3883         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3884                                    msg)
3885
3886     self.instance = instance
3887
3888   def _WaitUntilSync(self):
3889     """Poll with custom rpc for disk sync.
3890
3891     This uses our own step-based rpc call.
3892
3893     """
3894     self.feedback_fn("* wait until resync is done")
3895     all_done = False
3896     while not all_done:
3897       all_done = True
3898       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3899                                             self.nodes_ip,
3900                                             self.instance.disks)
3901       min_percent = 100
3902       for node, nres in result.items():
3903         msg = nres.RemoteFailMsg()
3904         if msg:
3905           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3906                                    (node, msg))
3907         node_done, node_percent = nres.payload
3908         all_done = all_done and node_done
3909         if node_percent is not None:
3910           min_percent = min(min_percent, node_percent)
3911       if not all_done:
3912         if min_percent < 100:
3913           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3914         time.sleep(2)
3915
3916   def _EnsureSecondary(self, node):
3917     """Demote a node to secondary.
3918
3919     """
3920     self.feedback_fn("* switching node %s to secondary mode" % node)
3921
3922     for dev in self.instance.disks:
3923       self.cfg.SetDiskID(dev, node)
3924
3925     result = self.rpc.call_blockdev_close(node, self.instance.name,
3926                                           self.instance.disks)
3927     msg = result.RemoteFailMsg()
3928     if msg:
3929       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3930                                " error %s" % (node, msg))
3931
3932   def _GoStandalone(self):
3933     """Disconnect from the network.
3934
3935     """
3936     self.feedback_fn("* changing into standalone mode")
3937     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3938                                                self.instance.disks)
3939     for node, nres in result.items():
3940       msg = nres.RemoteFailMsg()
3941       if msg:
3942         raise errors.OpExecError("Cannot disconnect disks node %s,"
3943                                  " error %s" % (node, msg))
3944
3945   def _GoReconnect(self, multimaster):
3946     """Reconnect to the network.
3947
3948     """
3949     if multimaster:
3950       msg = "dual-master"
3951     else:
3952       msg = "single-master"
3953     self.feedback_fn("* changing disks into %s mode" % msg)
3954     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3955                                            self.instance.disks,
3956                                            self.instance.name, multimaster)
3957     for node, nres in result.items():
3958       msg = nres.RemoteFailMsg()
3959       if msg:
3960         raise errors.OpExecError("Cannot change disks config on node %s,"
3961                                  " error: %s" % (node, msg))
3962
3963   def _ExecCleanup(self):
3964     """Try to cleanup after a failed migration.
3965
3966     The cleanup is done by:
3967       - check that the instance is running only on one node
3968         (and update the config if needed)
3969       - change disks on its secondary node to secondary
3970       - wait until disks are fully synchronized
3971       - disconnect from the network
3972       - change disks into single-master mode
3973       - wait again until disks are fully synchronized
3974
3975     """
3976     instance = self.instance
3977     target_node = self.target_node
3978     source_node = self.source_node
3979
3980     # check running on only one node
3981     self.feedback_fn("* checking where the instance actually runs"
3982                      " (if this hangs, the hypervisor might be in"
3983                      " a bad state)")
3984     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3985     for node, result in ins_l.items():
3986       result.Raise()
3987       if not isinstance(result.data, list):
3988         raise errors.OpExecError("Can't contact node '%s'" % node)
3989
3990     runningon_source = instance.name in ins_l[source_node].data
3991     runningon_target = instance.name in ins_l[target_node].data
3992
3993     if runningon_source and runningon_target:
3994       raise errors.OpExecError("Instance seems to be running on two nodes,"
3995                                " or the hypervisor is confused. You will have"
3996                                " to ensure manually that it runs only on one"
3997                                " and restart this operation.")
3998
3999     if not (runningon_source or runningon_target):
4000       raise errors.OpExecError("Instance does not seem to be running at all."
4001                                " In this case, it's safer to repair by"
4002                                " running 'gnt-instance stop' to ensure disk"
4003                                " shutdown, and then restarting it.")
4004
4005     if runningon_target:
4006       # the migration has actually succeeded, we need to update the config
4007       self.feedback_fn("* instance running on secondary node (%s),"
4008                        " updating config" % target_node)
4009       instance.primary_node = target_node
4010       self.cfg.Update(instance)
4011       demoted_node = source_node
4012     else:
4013       self.feedback_fn("* instance confirmed to be running on its"
4014                        " primary node (%s)" % source_node)
4015       demoted_node = target_node
4016
4017     self._EnsureSecondary(demoted_node)
4018     try:
4019       self._WaitUntilSync()
4020     except errors.OpExecError:
4021       # we ignore here errors, since if the device is standalone, it
4022       # won't be able to sync
4023       pass
4024     self._GoStandalone()
4025     self._GoReconnect(False)
4026     self._WaitUntilSync()
4027
4028     self.feedback_fn("* done")
4029
4030   def _RevertDiskStatus(self):
4031     """Try to revert the disk status after a failed migration.
4032
4033     """
4034     target_node = self.target_node
4035     try:
4036       self._EnsureSecondary(target_node)
4037       self._GoStandalone()
4038       self._GoReconnect(False)
4039       self._WaitUntilSync()
4040     except errors.OpExecError, err:
4041       self.LogWarning("Migration failed and I can't reconnect the"
4042                       " drives: error '%s'\n"
4043                       "Please look and recover the instance status" %
4044                       str(err))
4045
4046   def _AbortMigration(self):
4047     """Call the hypervisor code to abort a started migration.
4048
4049     """
4050     instance = self.instance
4051     target_node = self.target_node
4052     migration_info = self.migration_info
4053
4054     abort_result = self.rpc.call_finalize_migration(target_node,
4055                                                     instance,
4056                                                     migration_info,
4057                                                     False)
4058     abort_msg = abort_result.RemoteFailMsg()
4059     if abort_msg:
4060       logging.error("Aborting migration failed on target node %s: %s" %
4061                     (target_node, abort_msg))
4062       # Don't raise an exception here, as we stil have to try to revert the
4063       # disk status, even if this step failed.
4064
4065   def _ExecMigration(self):
4066     """Migrate an instance.
4067
4068     The migrate is done by:
4069       - change the disks into dual-master mode
4070       - wait until disks are fully synchronized again
4071       - migrate the instance
4072       - change disks on the new secondary node (the old primary) to secondary
4073       - wait until disks are fully synchronized
4074       - change disks into single-master mode
4075
4076     """
4077     instance = self.instance
4078     target_node = self.target_node
4079     source_node = self.source_node
4080
4081     self.feedback_fn("* checking disk consistency between source and target")
4082     for dev in instance.disks:
4083       if not _CheckDiskConsistency(self, dev, target_node, False):
4084         raise errors.OpExecError("Disk %s is degraded or not fully"
4085                                  " synchronized on target node,"
4086                                  " aborting migrate." % dev.iv_name)
4087
4088     # First get the migration information from the remote node
4089     result = self.rpc.call_migration_info(source_node, instance)
4090     msg = result.RemoteFailMsg()
4091     if msg:
4092       log_err = ("Failed fetching source migration information from %s: %s" %
4093                  (source_node, msg))
4094       logging.error(log_err)
4095       raise errors.OpExecError(log_err)
4096
4097     self.migration_info = migration_info = result.payload
4098
4099     # Then switch the disks to master/master mode
4100     self._EnsureSecondary(target_node)
4101     self._GoStandalone()
4102     self._GoReconnect(True)
4103     self._WaitUntilSync()
4104
4105     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4106     result = self.rpc.call_accept_instance(target_node,
4107                                            instance,
4108                                            migration_info,
4109                                            self.nodes_ip[target_node])
4110
4111     msg = result.RemoteFailMsg()
4112     if msg:
4113       logging.error("Instance pre-migration failed, trying to revert"
4114                     " disk status: %s", msg)
4115       self._AbortMigration()
4116       self._RevertDiskStatus()
4117       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4118                                (instance.name, msg))
4119
4120     self.feedback_fn("* migrating instance to %s" % target_node)
4121     time.sleep(10)
4122     result = self.rpc.call_instance_migrate(source_node, instance,
4123                                             self.nodes_ip[target_node],
4124                                             self.op.live)
4125     msg = result.RemoteFailMsg()
4126     if msg:
4127       logging.error("Instance migration failed, trying to revert"
4128                     " disk status: %s", msg)
4129       self._AbortMigration()
4130       self._RevertDiskStatus()
4131       raise errors.OpExecError("Could not migrate instance %s: %s" %
4132                                (instance.name, msg))
4133     time.sleep(10)
4134
4135     instance.primary_node = target_node
4136     # distribute new instance config to the other nodes
4137     self.cfg.Update(instance)
4138
4139     result = self.rpc.call_finalize_migration(target_node,
4140                                               instance,
4141                                               migration_info,
4142                                               True)
4143     msg = result.RemoteFailMsg()
4144     if msg:
4145       logging.error("Instance migration succeeded, but finalization failed:"
4146                     " %s" % msg)
4147       raise errors.OpExecError("Could not finalize instance migration: %s" %
4148                                msg)
4149
4150     self._EnsureSecondary(source_node)
4151     self._WaitUntilSync()
4152     self._GoStandalone()
4153     self._GoReconnect(False)
4154     self._WaitUntilSync()
4155
4156     self.feedback_fn("* done")
4157
4158   def Exec(self, feedback_fn):
4159     """Perform the migration.
4160
4161     """
4162     self.feedback_fn = feedback_fn
4163
4164     self.source_node = self.instance.primary_node
4165     self.target_node = self.instance.secondary_nodes[0]
4166     self.all_nodes = [self.source_node, self.target_node]
4167     self.nodes_ip = {
4168       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4169       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4170       }
4171     if self.op.cleanup:
4172       return self._ExecCleanup()
4173     else:
4174       return self._ExecMigration()
4175
4176
4177 def _CreateBlockDev(lu, node, instance, device, force_create,
4178                     info, force_open):
4179   """Create a tree of block devices on a given node.
4180
4181   If this device type has to be created on secondaries, create it and
4182   all its children.
4183
4184   If not, just recurse to children keeping the same 'force' value.
4185
4186   @param lu: the lu on whose behalf we execute
4187   @param node: the node on which to create the device
4188   @type instance: L{objects.Instance}
4189   @param instance: the instance which owns the device
4190   @type device: L{objects.Disk}
4191   @param device: the device to create
4192   @type force_create: boolean
4193   @param force_create: whether to force creation of this device; this
4194       will be change to True whenever we find a device which has
4195       CreateOnSecondary() attribute
4196   @param info: the extra 'metadata' we should attach to the device
4197       (this will be represented as a LVM tag)
4198   @type force_open: boolean
4199   @param force_open: this parameter will be passes to the
4200       L{backend.BlockdevCreate} function where it specifies
4201       whether we run on primary or not, and it affects both
4202       the child assembly and the device own Open() execution
4203
4204   """
4205   if device.CreateOnSecondary():
4206     force_create = True
4207
4208   if device.children:
4209     for child in device.children:
4210       _CreateBlockDev(lu, node, instance, child, force_create,
4211                       info, force_open)
4212
4213   if not force_create:
4214     return
4215
4216   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4217
4218
4219 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4220   """Create a single block device on a given node.
4221
4222   This will not recurse over children of the device, so they must be
4223   created in advance.
4224
4225   @param lu: the lu on whose behalf we execute
4226   @param node: the node on which to create the device
4227   @type instance: L{objects.Instance}
4228   @param instance: the instance which owns the device
4229   @type device: L{objects.Disk}
4230   @param device: the device to create
4231   @param info: the extra 'metadata' we should attach to the device
4232       (this will be represented as a LVM tag)
4233   @type force_open: boolean
4234   @param force_open: this parameter will be passes to the
4235       L{backend.BlockdevCreate} function where it specifies
4236       whether we run on primary or not, and it affects both
4237       the child assembly and the device own Open() execution
4238
4239   """
4240   lu.cfg.SetDiskID(device, node)
4241   result = lu.rpc.call_blockdev_create(node, device, device.size,
4242                                        instance.name, force_open, info)
4243   msg = result.RemoteFailMsg()
4244   if msg:
4245     raise errors.OpExecError("Can't create block device %s on"
4246                              " node %s for instance %s: %s" %
4247                              (device, node, instance.name, msg))
4248   if device.physical_id is None:
4249     device.physical_id = result.payload
4250
4251
4252 def _GenerateUniqueNames(lu, exts):
4253   """Generate a suitable LV name.
4254
4255   This will generate a logical volume name for the given instance.
4256
4257   """
4258   results = []
4259   for val in exts:
4260     new_id = lu.cfg.GenerateUniqueID()
4261     results.append("%s%s" % (new_id, val))
4262   return results
4263
4264
4265 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4266                          p_minor, s_minor):
4267   """Generate a drbd8 device complete with its children.
4268
4269   """
4270   port = lu.cfg.AllocatePort()
4271   vgname = lu.cfg.GetVGName()
4272   shared_secret = lu.cfg.GenerateDRBDSecret()
4273   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4274                           logical_id=(vgname, names[0]))
4275   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4276                           logical_id=(vgname, names[1]))
4277   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4278                           logical_id=(primary, secondary, port,
4279                                       p_minor, s_minor,
4280                                       shared_secret),
4281                           children=[dev_data, dev_meta],
4282                           iv_name=iv_name)
4283   return drbd_dev
4284
4285
4286 def _GenerateDiskTemplate(lu, template_name,
4287                           instance_name, primary_node,
4288                           secondary_nodes, disk_info,
4289                           file_storage_dir, file_driver,
4290                           base_index):
4291   """Generate the entire disk layout for a given template type.
4292
4293   """
4294   #TODO: compute space requirements
4295
4296   vgname = lu.cfg.GetVGName()
4297   disk_count = len(disk_info)
4298   disks = []
4299   if template_name == constants.DT_DISKLESS:
4300     pass
4301   elif template_name == constants.DT_PLAIN:
4302     if len(secondary_nodes) != 0:
4303       raise errors.ProgrammerError("Wrong template configuration")
4304
4305     names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4306                                       for i in range(disk_count)])
4307     for idx, disk in enumerate(disk_info):
4308       disk_index = idx + base_index
4309       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4310                               logical_id=(vgname, names[idx]),
4311                               iv_name="disk/%d" % disk_index,
4312                               mode=disk["mode"])
4313       disks.append(disk_dev)
4314   elif template_name == constants.DT_DRBD8:
4315     if len(secondary_nodes) != 1:
4316       raise errors.ProgrammerError("Wrong template configuration")
4317     remote_node = secondary_nodes[0]
4318     minors = lu.cfg.AllocateDRBDMinor(
4319       [primary_node, remote_node] * len(disk_info), instance_name)
4320
4321     names = []
4322     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4323                                                for i in range(disk_count)]):
4324       names.append(lv_prefix + "_data")
4325       names.append(lv_prefix + "_meta")
4326     for idx, disk in enumerate(disk_info):
4327       disk_index = idx + base_index
4328       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4329                                       disk["size"], names[idx*2:idx*2+2],
4330                                       "disk/%d" % disk_index,
4331                                       minors[idx*2], minors[idx*2+1])
4332       disk_dev.mode = disk["mode"]
4333       disks.append(disk_dev)
4334   elif template_name == constants.DT_FILE:
4335     if len(secondary_nodes) != 0:
4336       raise errors.ProgrammerError("Wrong template configuration")
4337
4338     for idx, disk in enumerate(disk_info):
4339       disk_index = idx + base_index
4340       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4341                               iv_name="disk/%d" % disk_index,
4342                               logical_id=(file_driver,
4343                                           "%s/disk%d" % (file_storage_dir,
4344                                                          disk_index)),
4345                               mode=disk["mode"])
4346       disks.append(disk_dev)
4347   else:
4348     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4349   return disks
4350
4351
4352 def _GetInstanceInfoText(instance):
4353   """Compute that text that should be added to the disk's metadata.
4354
4355   """
4356   return "originstname+%s" % instance.name
4357
4358
4359 def _CreateDisks(lu, instance):
4360   """Create all disks for an instance.
4361
4362   This abstracts away some work from AddInstance.
4363
4364   @type lu: L{LogicalUnit}
4365   @param lu: the logical unit on whose behalf we execute
4366   @type instance: L{objects.Instance}
4367   @param instance: the instance whose disks we should create
4368   @rtype: boolean
4369   @return: the success of the creation
4370
4371   """
4372   info = _GetInstanceInfoText(instance)
4373   pnode = instance.primary_node
4374
4375   if instance.disk_template == constants.DT_FILE:
4376     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4377     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4378
4379     if result.failed or not result.data:
4380       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4381
4382     if not result.data[0]:
4383       raise errors.OpExecError("Failed to create directory '%s'" %
4384                                file_storage_dir)
4385
4386   # Note: this needs to be kept in sync with adding of disks in
4387   # LUSetInstanceParams
4388   for device in instance.disks:
4389     logging.info("Creating volume %s for instance %s",
4390                  device.iv_name, instance.name)
4391     #HARDCODE
4392     for node in instance.all_nodes:
4393       f_create = node == pnode
4394       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4395
4396
4397 def _RemoveDisks(lu, instance):
4398   """Remove all disks for an instance.
4399
4400   This abstracts away some work from `AddInstance()` and
4401   `RemoveInstance()`. Note that in case some of the devices couldn't
4402   be removed, the removal will continue with the other ones (compare
4403   with `_CreateDisks()`).
4404
4405   @type lu: L{LogicalUnit}
4406   @param lu: the logical unit on whose behalf we execute
4407   @type instance: L{objects.Instance}
4408   @param instance: the instance whose disks we should remove
4409   @rtype: boolean
4410   @return: the success of the removal
4411
4412   """
4413   logging.info("Removing block devices for instance %s", instance.name)
4414
4415   all_result = True
4416   for device in instance.disks:
4417     for node, disk in device.ComputeNodeTree(instance.primary_node):
4418       lu.cfg.SetDiskID(disk, node)
4419       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4420       if msg:
4421         lu.LogWarning("Could not remove block device %s on node %s,"
4422                       " continuing anyway: %s", device.iv_name, node, msg)
4423         all_result = False
4424
4425   if instance.disk_template == constants.DT_FILE:
4426     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4427     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4428                                                  file_storage_dir)
4429     if result.failed or not result.data:
4430       logging.error("Could not remove directory '%s'", file_storage_dir)
4431       all_result = False
4432
4433   return all_result
4434
4435
4436 def _ComputeDiskSize(disk_template, disks):
4437   """Compute disk size requirements in the volume group
4438
4439   """
4440   # Required free disk space as a function of disk and swap space
4441   req_size_dict = {
4442     constants.DT_DISKLESS: None,
4443     constants.DT_PLAIN: sum(d["size"] for d in disks),
4444     # 128 MB are added for drbd metadata for each disk
4445     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4446     constants.DT_FILE: None,
4447   }
4448
4449   if disk_template not in req_size_dict:
4450     raise errors.ProgrammerError("Disk template '%s' size requirement"
4451                                  " is unknown" %  disk_template)
4452
4453   return req_size_dict[disk_template]
4454
4455
4456 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4457   """Hypervisor parameter validation.
4458
4459   This function abstract the hypervisor parameter validation to be
4460   used in both instance create and instance modify.
4461
4462   @type lu: L{LogicalUnit}
4463   @param lu: the logical unit for which we check
4464   @type nodenames: list
4465   @param nodenames: the list of nodes on which we should check
4466   @type hvname: string
4467   @param hvname: the name of the hypervisor we should use
4468   @type hvparams: dict
4469   @param hvparams: the parameters which we need to check
4470   @raise errors.OpPrereqError: if the parameters are not valid
4471
4472   """
4473   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4474                                                   hvname,
4475                                                   hvparams)
4476   for node in nodenames:
4477     info = hvinfo[node]
4478     if info.offline:
4479       continue
4480     msg = info.RemoteFailMsg()
4481     if msg:
4482       raise errors.OpPrereqError("Hypervisor parameter validation"
4483                                  " failed on node %s: %s" % (node, msg))
4484
4485
4486 class LUCreateInstance(LogicalUnit):
4487   """Create an instance.
4488
4489   """
4490   HPATH = "instance-add"
4491   HTYPE = constants.HTYPE_INSTANCE
4492   _OP_REQP = ["instance_name", "disks", "disk_template",
4493               "mode", "start",
4494               "wait_for_sync", "ip_check", "nics",
4495               "hvparams", "beparams"]
4496   REQ_BGL = False
4497
4498   def _ExpandNode(self, node):
4499     """Expands and checks one node name.
4500
4501     """
4502     node_full = self.cfg.ExpandNodeName(node)
4503     if node_full is None:
4504       raise errors.OpPrereqError("Unknown node %s" % node)
4505     return node_full
4506
4507   def ExpandNames(self):
4508     """ExpandNames for CreateInstance.
4509
4510     Figure out the right locks for instance creation.
4511
4512     """
4513     self.needed_locks = {}
4514
4515     # set optional parameters to none if they don't exist
4516     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4517       if not hasattr(self.op, attr):
4518         setattr(self.op, attr, None)
4519
4520     # cheap checks, mostly valid constants given
4521
4522     # verify creation mode
4523     if self.op.mode not in (constants.INSTANCE_CREATE,
4524                             constants.INSTANCE_IMPORT):
4525       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4526                                  self.op.mode)
4527
4528     # disk template and mirror node verification
4529     if self.op.disk_template not in constants.DISK_TEMPLATES:
4530       raise errors.OpPrereqError("Invalid disk template name")
4531
4532     if self.op.hypervisor is None:
4533       self.op.hypervisor = self.cfg.GetHypervisorType()
4534
4535     cluster = self.cfg.GetClusterInfo()
4536     enabled_hvs = cluster.enabled_hypervisors
4537     if self.op.hypervisor not in enabled_hvs:
4538       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4539                                  " cluster (%s)" % (self.op.hypervisor,
4540                                   ",".join(enabled_hvs)))
4541
4542     # check hypervisor parameter syntax (locally)
4543     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4544     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4545                                   self.op.hvparams)
4546     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4547     hv_type.CheckParameterSyntax(filled_hvp)
4548     self.hv_full = filled_hvp
4549
4550     # fill and remember the beparams dict
4551     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4552     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4553                                     self.op.beparams)
4554
4555     #### instance parameters check
4556
4557     # instance name verification
4558     hostname1 = utils.HostInfo(self.op.instance_name)
4559     self.op.instance_name = instance_name = hostname1.name
4560
4561     # this is just a preventive check, but someone might still add this
4562     # instance in the meantime, and creation will fail at lock-add time
4563     if instance_name in self.cfg.GetInstanceList():
4564       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4565                                  instance_name)
4566
4567     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4568
4569     # NIC buildup
4570     self.nics = []
4571     for nic in self.op.nics:
4572       # ip validity checks
4573       ip = nic.get("ip", None)
4574       if ip is None or ip.lower() == "none":
4575         nic_ip = None
4576       elif ip.lower() == constants.VALUE_AUTO:
4577         nic_ip = hostname1.ip
4578       else:
4579         if not utils.IsValidIP(ip):
4580           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4581                                      " like a valid IP" % ip)
4582         nic_ip = ip
4583
4584       # MAC address verification
4585       mac = nic.get("mac", constants.VALUE_AUTO)
4586       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4587         if not utils.IsValidMac(mac.lower()):
4588           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4589                                      mac)
4590       # bridge verification
4591       bridge = nic.get("bridge", None)
4592       if bridge is None:
4593         bridge = self.cfg.GetDefBridge()
4594       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4595
4596     # disk checks/pre-build
4597     self.disks = []
4598     for disk in self.op.disks:
4599       mode = disk.get("mode", constants.DISK_RDWR)
4600       if mode not in constants.DISK_ACCESS_SET:
4601         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4602                                    mode)
4603       size = disk.get("size", None)
4604       if size is None:
4605         raise errors.OpPrereqError("Missing disk size")
4606       try:
4607         size = int(size)
4608       except ValueError:
4609         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4610       self.disks.append({"size": size, "mode": mode})
4611
4612     # used in CheckPrereq for ip ping check
4613     self.check_ip = hostname1.ip
4614
4615     # file storage checks
4616     if (self.op.file_driver and
4617         not self.op.file_driver in constants.FILE_DRIVER):
4618       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4619                                  self.op.file_driver)
4620
4621     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4622       raise errors.OpPrereqError("File storage directory path not absolute")
4623
4624     ### Node/iallocator related checks
4625     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4626       raise errors.OpPrereqError("One and only one of iallocator and primary"
4627                                  " node must be given")
4628
4629     if self.op.iallocator:
4630       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4631     else:
4632       self.op.pnode = self._ExpandNode(self.op.pnode)
4633       nodelist = [self.op.pnode]
4634       if self.op.snode is not None:
4635         self.op.snode = self._ExpandNode(self.op.snode)
4636         nodelist.append(self.op.snode)
4637       self.needed_locks[locking.LEVEL_NODE] = nodelist
4638
4639     # in case of import lock the source node too
4640     if self.op.mode == constants.INSTANCE_IMPORT:
4641       src_node = getattr(self.op, "src_node", None)
4642       src_path = getattr(self.op, "src_path", None)
4643
4644       if src_path is None:
4645         self.op.src_path = src_path = self.op.instance_name
4646
4647       if src_node is None:
4648         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4649         self.op.src_node = None
4650         if os.path.isabs(src_path):
4651           raise errors.OpPrereqError("Importing an instance from an absolute"
4652                                      " path requires a source node option.")
4653       else:
4654         self.op.src_node = src_node = self._ExpandNode(src_node)
4655         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4656           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4657         if not os.path.isabs(src_path):
4658           self.op.src_path = src_path = \
4659             os.path.join(constants.EXPORT_DIR, src_path)
4660
4661     else: # INSTANCE_CREATE
4662       if getattr(self.op, "os_type", None) is None:
4663         raise errors.OpPrereqError("No guest OS specified")
4664
4665   def _RunAllocator(self):
4666     """Run the allocator based on input opcode.
4667
4668     """
4669     nics = [n.ToDict() for n in self.nics]
4670     ial = IAllocator(self,
4671                      mode=constants.IALLOCATOR_MODE_ALLOC,
4672                      name=self.op.instance_name,
4673                      disk_template=self.op.disk_template,
4674                      tags=[],
4675                      os=self.op.os_type,
4676                      vcpus=self.be_full[constants.BE_VCPUS],
4677                      mem_size=self.be_full[constants.BE_MEMORY],
4678                      disks=self.disks,
4679                      nics=nics,
4680                      hypervisor=self.op.hypervisor,
4681                      )
4682
4683     ial.Run(self.op.iallocator)
4684
4685     if not ial.success:
4686       raise errors.OpPrereqError("Can't compute nodes using"
4687                                  " iallocator '%s': %s" % (self.op.iallocator,
4688                                                            ial.info))
4689     if len(ial.nodes) != ial.required_nodes:
4690       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4691                                  " of nodes (%s), required %s" %
4692                                  (self.op.iallocator, len(ial.nodes),
4693                                   ial.required_nodes))
4694     self.op.pnode = ial.nodes[0]
4695     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4696                  self.op.instance_name, self.op.iallocator,
4697                  ", ".join(ial.nodes))
4698     if ial.required_nodes == 2:
4699       self.op.snode = ial.nodes[1]
4700
4701   def BuildHooksEnv(self):
4702     """Build hooks env.
4703
4704     This runs on master, primary and secondary nodes of the instance.
4705
4706     """
4707     env = {
4708       "ADD_MODE": self.op.mode,
4709       }
4710     if self.op.mode == constants.INSTANCE_IMPORT:
4711       env["SRC_NODE"] = self.op.src_node
4712       env["SRC_PATH"] = self.op.src_path
4713       env["SRC_IMAGES"] = self.src_images
4714
4715     env.update(_BuildInstanceHookEnv(
4716       name=self.op.instance_name,
4717       primary_node=self.op.pnode,
4718       secondary_nodes=self.secondaries,
4719       status=self.op.start,
4720       os_type=self.op.os_type,
4721       memory=self.be_full[constants.BE_MEMORY],
4722       vcpus=self.be_full[constants.BE_VCPUS],
4723       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4724       disk_template=self.op.disk_template,
4725       disks=[(d["size"], d["mode"]) for d in self.disks],
4726       bep=self.be_full,
4727       hvp=self.hv_full,
4728       hypervisor_name=self.op.hypervisor,
4729     ))
4730
4731     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4732           self.secondaries)
4733     return env, nl, nl
4734
4735
4736   def CheckPrereq(self):
4737     """Check prerequisites.
4738
4739     """
4740     if (not self.cfg.GetVGName() and
4741         self.op.disk_template not in constants.DTS_NOT_LVM):
4742       raise errors.OpPrereqError("Cluster does not support lvm-based"
4743                                  " instances")
4744
4745     if self.op.mode == constants.INSTANCE_IMPORT:
4746       src_node = self.op.src_node
4747       src_path = self.op.src_path
4748
4749       if src_node is None:
4750         exp_list = self.rpc.call_export_list(
4751           self.acquired_locks[locking.LEVEL_NODE])
4752         found = False
4753         for node in exp_list:
4754           if not exp_list[node].failed and src_path in exp_list[node].data:
4755             found = True
4756             self.op.src_node = src_node = node
4757             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4758                                                        src_path)
4759             break
4760         if not found:
4761           raise errors.OpPrereqError("No export found for relative path %s" %
4762                                       src_path)
4763
4764       _CheckNodeOnline(self, src_node)
4765       result = self.rpc.call_export_info(src_node, src_path)
4766       result.Raise()
4767       if not result.data:
4768         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4769
4770       export_info = result.data
4771       if not export_info.has_section(constants.INISECT_EXP):
4772         raise errors.ProgrammerError("Corrupted export config")
4773
4774       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4775       if (int(ei_version) != constants.EXPORT_VERSION):
4776         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4777                                    (ei_version, constants.EXPORT_VERSION))
4778
4779       # Check that the new instance doesn't have less disks than the export
4780       instance_disks = len(self.disks)
4781       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4782       if instance_disks < export_disks:
4783         raise errors.OpPrereqError("Not enough disks to import."
4784                                    " (instance: %d, export: %d)" %
4785                                    (instance_disks, export_disks))
4786
4787       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4788       disk_images = []
4789       for idx in range(export_disks):
4790         option = 'disk%d_dump' % idx
4791         if export_info.has_option(constants.INISECT_INS, option):
4792           # FIXME: are the old os-es, disk sizes, etc. useful?
4793           export_name = export_info.get(constants.INISECT_INS, option)
4794           image = os.path.join(src_path, export_name)
4795           disk_images.append(image)
4796         else:
4797           disk_images.append(False)
4798
4799       self.src_images = disk_images
4800
4801       old_name = export_info.get(constants.INISECT_INS, 'name')
4802       # FIXME: int() here could throw a ValueError on broken exports
4803       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4804       if self.op.instance_name == old_name:
4805         for idx, nic in enumerate(self.nics):
4806           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4807             nic_mac_ini = 'nic%d_mac' % idx
4808             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4809
4810     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4811     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4812     if self.op.start and not self.op.ip_check:
4813       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4814                                  " adding an instance in start mode")
4815
4816     if self.op.ip_check:
4817       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4818         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4819                                    (self.check_ip, self.op.instance_name))
4820
4821     #### mac address generation
4822     # By generating here the mac address both the allocator and the hooks get
4823     # the real final mac address rather than the 'auto' or 'generate' value.
4824     # There is a race condition between the generation and the instance object
4825     # creation, which means that we know the mac is valid now, but we're not
4826     # sure it will be when we actually add the instance. If things go bad
4827     # adding the instance will abort because of a duplicate mac, and the
4828     # creation job will fail.
4829     for nic in self.nics:
4830       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4831         nic.mac = self.cfg.GenerateMAC()
4832
4833     #### allocator run
4834
4835     if self.op.iallocator is not None:
4836       self._RunAllocator()
4837
4838     #### node related checks
4839
4840     # check primary node
4841     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4842     assert self.pnode is not None, \
4843       "Cannot retrieve locked node %s" % self.op.pnode
4844     if pnode.offline:
4845       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4846                                  pnode.name)
4847     if pnode.drained:
4848       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4849                                  pnode.name)
4850
4851     self.secondaries = []
4852
4853     # mirror node verification
4854     if self.op.disk_template in constants.DTS_NET_MIRROR:
4855       if self.op.snode is None:
4856         raise errors.OpPrereqError("The networked disk templates need"
4857                                    " a mirror node")
4858       if self.op.snode == pnode.name:
4859         raise errors.OpPrereqError("The secondary node cannot be"
4860                                    " the primary node.")
4861       _CheckNodeOnline(self, self.op.snode)
4862       _CheckNodeNotDrained(self, self.op.snode)
4863       self.secondaries.append(self.op.snode)
4864
4865     nodenames = [pnode.name] + self.secondaries
4866
4867     req_size = _ComputeDiskSize(self.op.disk_template,
4868                                 self.disks)
4869
4870     # Check lv size requirements
4871     if req_size is not None:
4872       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4873                                          self.op.hypervisor)
4874       for node in nodenames:
4875         info = nodeinfo[node]
4876         info.Raise()
4877         info = info.data
4878         if not info:
4879           raise errors.OpPrereqError("Cannot get current information"
4880                                      " from node '%s'" % node)
4881         vg_free = info.get('vg_free', None)
4882         if not isinstance(vg_free, int):
4883           raise errors.OpPrereqError("Can't compute free disk space on"
4884                                      " node %s" % node)
4885         if req_size > info['vg_free']:
4886           raise errors.OpPrereqError("Not enough disk space on target node %s."
4887                                      " %d MB available, %d MB required" %
4888                                      (node, info['vg_free'], req_size))
4889
4890     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4891
4892     # os verification
4893     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4894     result.Raise()
4895     if not isinstance(result.data, objects.OS) or not result.data:
4896       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4897                                  " primary node"  % self.op.os_type)
4898
4899     # bridge check on primary node
4900     bridges = [n.bridge for n in self.nics]
4901     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4902     result.Raise()
4903     if not result.data:
4904       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4905                                  " exist on destination node '%s'" %
4906                                  (",".join(bridges), pnode.name))
4907
4908     # memory check on primary node
4909     if self.op.start:
4910       _CheckNodeFreeMemory(self, self.pnode.name,
4911                            "creating instance %s" % self.op.instance_name,
4912                            self.be_full[constants.BE_MEMORY],
4913                            self.op.hypervisor)
4914
4915   def Exec(self, feedback_fn):
4916     """Create and add the instance to the cluster.
4917
4918     """
4919     instance = self.op.instance_name
4920     pnode_name = self.pnode.name
4921
4922     ht_kind = self.op.hypervisor
4923     if ht_kind in constants.HTS_REQ_PORT:
4924       network_port = self.cfg.AllocatePort()
4925     else:
4926       network_port = None
4927
4928     ##if self.op.vnc_bind_address is None:
4929     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4930
4931     # this is needed because os.path.join does not accept None arguments
4932     if self.op.file_storage_dir is None:
4933       string_file_storage_dir = ""
4934     else:
4935       string_file_storage_dir = self.op.file_storage_dir
4936
4937     # build the full file storage dir path
4938     file_storage_dir = os.path.normpath(os.path.join(
4939                                         self.cfg.GetFileStorageDir(),
4940                                         string_file_storage_dir, instance))
4941
4942
4943     disks = _GenerateDiskTemplate(self,
4944                                   self.op.disk_template,
4945                                   instance, pnode_name,
4946                                   self.secondaries,
4947                                   self.disks,
4948                                   file_storage_dir,
4949                                   self.op.file_driver,
4950                                   0)
4951
4952     iobj = objects.Instance(name=instance, os=self.op.os_type,
4953                             primary_node=pnode_name,
4954                             nics=self.nics, disks=disks,
4955                             disk_template=self.op.disk_template,
4956                             admin_up=False,
4957                             network_port=network_port,
4958                             beparams=self.op.beparams,
4959                             hvparams=self.op.hvparams,
4960                             hypervisor=self.op.hypervisor,
4961                             )
4962
4963     feedback_fn("* creating instance disks...")
4964     try:
4965       _CreateDisks(self, iobj)
4966     except errors.OpExecError:
4967       self.LogWarning("Device creation failed, reverting...")
4968       try:
4969         _RemoveDisks(self, iobj)
4970       finally:
4971         self.cfg.ReleaseDRBDMinors(instance)
4972         raise
4973
4974     feedback_fn("adding instance %s to cluster config" % instance)
4975
4976     self.cfg.AddInstance(iobj)
4977     # Declare that we don't want to remove the instance lock anymore, as we've
4978     # added the instance to the config
4979     del self.remove_locks[locking.LEVEL_INSTANCE]
4980     # Unlock all the nodes
4981     if self.op.mode == constants.INSTANCE_IMPORT:
4982       nodes_keep = [self.op.src_node]
4983       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4984                        if node != self.op.src_node]
4985       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4986       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4987     else:
4988       self.context.glm.release(locking.LEVEL_NODE)
4989       del self.acquired_locks[locking.LEVEL_NODE]
4990
4991     if self.op.wait_for_sync:
4992       disk_abort = not _WaitForSync(self, iobj)
4993     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4994       # make sure the disks are not degraded (still sync-ing is ok)
4995       time.sleep(15)
4996       feedback_fn("* checking mirrors status")
4997       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4998     else:
4999       disk_abort = False
5000
5001     if disk_abort:
5002       _RemoveDisks(self, iobj)
5003       self.cfg.RemoveInstance(iobj.name)
5004       # Make sure the instance lock gets removed
5005       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5006       raise errors.OpExecError("There are some degraded disks for"
5007                                " this instance")
5008
5009     feedback_fn("creating os for instance %s on node %s" %
5010                 (instance, pnode_name))
5011
5012     if iobj.disk_template != constants.DT_DISKLESS:
5013       if self.op.mode == constants.INSTANCE_CREATE:
5014         feedback_fn("* running the instance OS create scripts...")
5015         result = self.rpc.call_instance_os_add(pnode_name, iobj)
5016         msg = result.RemoteFailMsg()
5017         if msg:
5018           raise errors.OpExecError("Could not add os for instance %s"
5019                                    " on node %s: %s" %
5020                                    (instance, pnode_name, msg))
5021
5022       elif self.op.mode == constants.INSTANCE_IMPORT:
5023         feedback_fn("* running the instance OS import scripts...")
5024         src_node = self.op.src_node
5025         src_images = self.src_images
5026         cluster_name = self.cfg.GetClusterName()
5027         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5028                                                          src_node, src_images,
5029                                                          cluster_name)
5030         import_result.Raise()
5031         for idx, result in enumerate(import_result.data):
5032           if not result:
5033             self.LogWarning("Could not import the image %s for instance"
5034                             " %s, disk %d, on node %s" %
5035                             (src_images[idx], instance, idx, pnode_name))
5036       else:
5037         # also checked in the prereq part
5038         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5039                                      % self.op.mode)
5040
5041     if self.op.start:
5042       iobj.admin_up = True
5043       self.cfg.Update(iobj)
5044       logging.info("Starting instance %s on node %s", instance, pnode_name)
5045       feedback_fn("* starting instance...")
5046       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5047       msg = result.RemoteFailMsg()
5048       if msg:
5049         raise errors.OpExecError("Could not start instance: %s" % msg)
5050
5051
5052 class LUConnectConsole(NoHooksLU):
5053   """Connect to an instance's console.
5054
5055   This is somewhat special in that it returns the command line that
5056   you need to run on the master node in order to connect to the
5057   console.
5058
5059   """
5060   _OP_REQP = ["instance_name"]
5061   REQ_BGL = False
5062
5063   def ExpandNames(self):
5064     self._ExpandAndLockInstance()
5065
5066   def CheckPrereq(self):
5067     """Check prerequisites.
5068
5069     This checks that the instance is in the cluster.
5070
5071     """
5072     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5073     assert self.instance is not None, \
5074       "Cannot retrieve locked instance %s" % self.op.instance_name
5075     _CheckNodeOnline(self, self.instance.primary_node)
5076
5077   def Exec(self, feedback_fn):
5078     """Connect to the console of an instance
5079
5080     """
5081     instance = self.instance
5082     node = instance.primary_node
5083
5084     node_insts = self.rpc.call_instance_list([node],
5085                                              [instance.hypervisor])[node]
5086     node_insts.Raise()
5087
5088     if instance.name not in node_insts.data:
5089       raise errors.OpExecError("Instance %s is not running." % instance.name)
5090
5091     logging.debug("Connecting to console of %s on %s", instance.name, node)
5092
5093     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5094     cluster = self.cfg.GetClusterInfo()
5095     # beparams and hvparams are passed separately, to avoid editing the
5096     # instance and then saving the defaults in the instance itself.
5097     hvparams = cluster.FillHV(instance)
5098     beparams = cluster.FillBE(instance)
5099     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5100
5101     # build ssh cmdline
5102     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5103
5104
5105 class LUReplaceDisks(LogicalUnit):
5106   """Replace the disks of an instance.
5107
5108   """
5109   HPATH = "mirrors-replace"
5110   HTYPE = constants.HTYPE_INSTANCE
5111   _OP_REQP = ["instance_name", "mode", "disks"]
5112   REQ_BGL = False
5113
5114   def CheckArguments(self):
5115     if not hasattr(self.op, "remote_node"):
5116       self.op.remote_node = None
5117     if not hasattr(self.op, "iallocator"):
5118       self.op.iallocator = None
5119
5120     # check for valid parameter combination
5121     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5122     if self.op.mode == constants.REPLACE_DISK_CHG:
5123       if cnt == 2:
5124         raise errors.OpPrereqError("When changing the secondary either an"
5125                                    " iallocator script must be used or the"
5126                                    " new node given")
5127       elif cnt == 0:
5128         raise errors.OpPrereqError("Give either the iallocator or the new"
5129                                    " secondary, not both")
5130     else: # not replacing the secondary
5131       if cnt != 2:
5132         raise errors.OpPrereqError("The iallocator and new node options can"
5133                                    " be used only when changing the"
5134                                    " secondary node")
5135
5136   def ExpandNames(self):
5137     self._ExpandAndLockInstance()
5138
5139     if self.op.iallocator is not None:
5140       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5141     elif self.op.remote_node is not None:
5142       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5143       if remote_node is None:
5144         raise errors.OpPrereqError("Node '%s' not known" %
5145                                    self.op.remote_node)
5146       self.op.remote_node = remote_node
5147       # Warning: do not remove the locking of the new secondary here
5148       # unless DRBD8.AddChildren is changed to work in parallel;
5149       # currently it doesn't since parallel invocations of
5150       # FindUnusedMinor will conflict
5151       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5152       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5153     else:
5154       self.needed_locks[locking.LEVEL_NODE] = []
5155       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5156
5157   def DeclareLocks(self, level):
5158     # If we're not already locking all nodes in the set we have to declare the
5159     # instance's primary/secondary nodes.
5160     if (level == locking.LEVEL_NODE and
5161         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5162       self._LockInstancesNodes()
5163
5164   def _RunAllocator(self):
5165     """Compute a new secondary node using an IAllocator.
5166
5167     """
5168     ial = IAllocator(self,
5169                      mode=constants.IALLOCATOR_MODE_RELOC,
5170                      name=self.op.instance_name,
5171                      relocate_from=[self.sec_node])
5172
5173     ial.Run(self.op.iallocator)
5174
5175     if not ial.success:
5176       raise errors.OpPrereqError("Can't compute nodes using"
5177                                  " iallocator '%s': %s" % (self.op.iallocator,
5178                                                            ial.info))
5179     if len(ial.nodes) != ial.required_nodes:
5180       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5181                                  " of nodes (%s), required %s" %
5182                                  (len(ial.nodes), ial.required_nodes))
5183     self.op.remote_node = ial.nodes[0]
5184     self.LogInfo("Selected new secondary for the instance: %s",
5185                  self.op.remote_node)
5186
5187   def BuildHooksEnv(self):
5188     """Build hooks env.
5189
5190     This runs on the master, the primary and all the secondaries.
5191
5192     """
5193     env = {
5194       "MODE": self.op.mode,
5195       "NEW_SECONDARY": self.op.remote_node,
5196       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5197       }
5198     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5199     nl = [
5200       self.cfg.GetMasterNode(),
5201       self.instance.primary_node,
5202       ]
5203     if self.op.remote_node is not None:
5204       nl.append(self.op.remote_node)
5205     return env, nl, nl
5206
5207   def CheckPrereq(self):
5208     """Check prerequisites.
5209
5210     This checks that the instance is in the cluster.
5211
5212     """
5213     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5214     assert instance is not None, \
5215       "Cannot retrieve locked instance %s" % self.op.instance_name
5216     self.instance = instance
5217
5218     if instance.disk_template != constants.DT_DRBD8:
5219       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5220                                  " instances")
5221
5222     if len(instance.secondary_nodes) != 1:
5223       raise errors.OpPrereqError("The instance has a strange layout,"
5224                                  " expected one secondary but found %d" %
5225                                  len(instance.secondary_nodes))
5226
5227     self.sec_node = instance.secondary_nodes[0]
5228
5229     if self.op.iallocator is not None:
5230       self._RunAllocator()
5231
5232     remote_node = self.op.remote_node
5233     if remote_node is not None:
5234       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5235       assert self.remote_node_info is not None, \
5236         "Cannot retrieve locked node %s" % remote_node
5237     else:
5238       self.remote_node_info = None
5239     if remote_node == instance.primary_node:
5240       raise errors.OpPrereqError("The specified node is the primary node of"
5241                                  " the instance.")
5242     elif remote_node == self.sec_node:
5243       raise errors.OpPrereqError("The specified node is already the"
5244                                  " secondary node of the instance.")
5245
5246     if self.op.mode == constants.REPLACE_DISK_PRI:
5247       n1 = self.tgt_node = instance.primary_node
5248       n2 = self.oth_node = self.sec_node
5249     elif self.op.mode == constants.REPLACE_DISK_SEC:
5250       n1 = self.tgt_node = self.sec_node
5251       n2 = self.oth_node = instance.primary_node
5252     elif self.op.mode == constants.REPLACE_DISK_CHG:
5253       n1 = self.new_node = remote_node
5254       n2 = self.oth_node = instance.primary_node
5255       self.tgt_node = self.sec_node
5256       _CheckNodeNotDrained(self, remote_node)
5257     else:
5258       raise errors.ProgrammerError("Unhandled disk replace mode")
5259
5260     _CheckNodeOnline(self, n1)
5261     _CheckNodeOnline(self, n2)
5262
5263     if not self.op.disks:
5264       self.op.disks = range(len(instance.disks))
5265
5266     for disk_idx in self.op.disks:
5267       instance.FindDisk(disk_idx)
5268
5269   def _ExecD8DiskOnly(self, feedback_fn):
5270     """Replace a disk on the primary or secondary for dbrd8.
5271
5272     The algorithm for replace is quite complicated:
5273
5274       1. for each disk to be replaced:
5275
5276         1. create new LVs on the target node with unique names
5277         1. detach old LVs from the drbd device
5278         1. rename old LVs to name_replaced.<time_t>
5279         1. rename new LVs to old LVs
5280         1. attach the new LVs (with the old names now) to the drbd device
5281
5282       1. wait for sync across all devices
5283
5284       1. for each modified disk:
5285
5286         1. remove old LVs (which have the name name_replaces.<time_t>)
5287
5288     Failures are not very well handled.
5289
5290     """
5291     steps_total = 6
5292     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5293     instance = self.instance
5294     iv_names = {}
5295     vgname = self.cfg.GetVGName()
5296     # start of work
5297     cfg = self.cfg
5298     tgt_node = self.tgt_node
5299     oth_node = self.oth_node
5300
5301     # Step: check device activation
5302     self.proc.LogStep(1, steps_total, "check device existence")
5303     info("checking volume groups")
5304     my_vg = cfg.GetVGName()
5305     results = self.rpc.call_vg_list([oth_node, tgt_node])
5306     if not results:
5307       raise errors.OpExecError("Can't list volume groups on the nodes")
5308     for node in oth_node, tgt_node:
5309       res = results[node]
5310       if res.failed or not res.data or my_vg not in res.data:
5311         raise errors.OpExecError("Volume group '%s' not found on %s" %
5312                                  (my_vg, node))
5313     for idx, dev in enumerate(instance.disks):
5314       if idx not in self.op.disks:
5315         continue
5316       for node in tgt_node, oth_node:
5317         info("checking disk/%d on %s" % (idx, node))
5318         cfg.SetDiskID(dev, node)
5319         result = self.rpc.call_blockdev_find(node, dev)
5320         msg = result.RemoteFailMsg()
5321         if not msg and not result.payload:
5322           msg = "disk not found"
5323         if msg:
5324           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5325                                    (idx, node, msg))
5326
5327     # Step: check other node consistency
5328     self.proc.LogStep(2, steps_total, "check peer consistency")
5329     for idx, dev in enumerate(instance.disks):
5330       if idx not in self.op.disks:
5331         continue
5332       info("checking disk/%d consistency on %s" % (idx, oth_node))
5333       if not _CheckDiskConsistency(self, dev, oth_node,
5334                                    oth_node==instance.primary_node):
5335         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5336                                  " to replace disks on this node (%s)" %
5337                                  (oth_node, tgt_node))
5338
5339     # Step: create new storage
5340     self.proc.LogStep(3, steps_total, "allocate new storage")
5341     for idx, dev in enumerate(instance.disks):
5342       if idx not in self.op.disks:
5343         continue
5344       size = dev.size
5345       cfg.SetDiskID(dev, tgt_node)
5346       lv_names = [".disk%d_%s" % (idx, suf)
5347                   for suf in ["data", "meta"]]
5348       names = _GenerateUniqueNames(self, lv_names)
5349       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5350                              logical_id=(vgname, names[0]))
5351       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5352                              logical_id=(vgname, names[1]))
5353       new_lvs = [lv_data, lv_meta]
5354       old_lvs = dev.children
5355       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5356       info("creating new local storage on %s for %s" %
5357            (tgt_node, dev.iv_name))
5358       # we pass force_create=True to force the LVM creation
5359       for new_lv in new_lvs:
5360         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5361                         _GetInstanceInfoText(instance), False)
5362
5363     # Step: for each lv, detach+rename*2+attach
5364     self.proc.LogStep(4, steps_total, "change drbd configuration")
5365     for dev, old_lvs, new_lvs in iv_names.itervalues():
5366       info("detaching %s drbd from local storage" % dev.iv_name)
5367       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5368       result.Raise()
5369       if not result.data:
5370         raise errors.OpExecError("Can't detach drbd from local storage on node"
5371                                  " %s for device %s" % (tgt_node, dev.iv_name))
5372       #dev.children = []
5373       #cfg.Update(instance)
5374
5375       # ok, we created the new LVs, so now we know we have the needed
5376       # storage; as such, we proceed on the target node to rename
5377       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5378       # using the assumption that logical_id == physical_id (which in
5379       # turn is the unique_id on that node)
5380
5381       # FIXME(iustin): use a better name for the replaced LVs
5382       temp_suffix = int(time.time())
5383       ren_fn = lambda d, suff: (d.physical_id[0],
5384                                 d.physical_id[1] + "_replaced-%s" % suff)
5385       # build the rename list based on what LVs exist on the node
5386       rlist = []
5387       for to_ren in old_lvs:
5388         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5389         if not result.RemoteFailMsg() and result.payload:
5390           # device exists
5391           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5392
5393       info("renaming the old LVs on the target node")
5394       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5395       result.Raise()
5396       if not result.data:
5397         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5398       # now we rename the new LVs to the old LVs
5399       info("renaming the new LVs on the target node")
5400       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5401       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5402       result.Raise()
5403       if not result.data:
5404         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5405
5406       for old, new in zip(old_lvs, new_lvs):
5407         new.logical_id = old.logical_id
5408         cfg.SetDiskID(new, tgt_node)
5409
5410       for disk in old_lvs:
5411         disk.logical_id = ren_fn(disk, temp_suffix)
5412         cfg.SetDiskID(disk, tgt_node)
5413
5414       # now that the new lvs have the old name, we can add them to the device
5415       info("adding new mirror component on %s" % tgt_node)
5416       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5417       if result.failed or not result.data:
5418         for new_lv in new_lvs:
5419           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5420           if msg:
5421             warning("Can't rollback device %s: %s", dev, msg,
5422                     hint="cleanup manually the unused logical volumes")
5423         raise errors.OpExecError("Can't add local storage to drbd")
5424
5425       dev.children = new_lvs
5426       cfg.Update(instance)
5427
5428     # Step: wait for sync
5429
5430     # this can fail as the old devices are degraded and _WaitForSync
5431     # does a combined result over all disks, so we don't check its
5432     # return value
5433     self.proc.LogStep(5, steps_total, "sync devices")
5434     _WaitForSync(self, instance, unlock=True)
5435
5436     # so check manually all the devices
5437     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5438       cfg.SetDiskID(dev, instance.primary_node)
5439       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5440       msg = result.RemoteFailMsg()
5441       if not msg and not result.payload:
5442         msg = "disk not found"
5443       if msg:
5444         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5445                                  (name, msg))
5446       if result.payload[5]:
5447         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5448
5449     # Step: remove old storage
5450     self.proc.LogStep(6, steps_total, "removing old storage")
5451     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5452       info("remove logical volumes for %s" % name)
5453       for lv in old_lvs:
5454         cfg.SetDiskID(lv, tgt_node)
5455         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5456         if msg:
5457           warning("Can't remove old LV: %s" % msg,
5458                   hint="manually remove unused LVs")
5459           continue
5460
5461   def _ExecD8Secondary(self, feedback_fn):
5462     """Replace the secondary node for drbd8.
5463
5464     The algorithm for replace is quite complicated:
5465       - for all disks of the instance:
5466         - create new LVs on the new node with same names
5467         - shutdown the drbd device on the old secondary
5468         - disconnect the drbd network on the primary
5469         - create the drbd device on the new secondary
5470         - network attach the drbd on the primary, using an artifice:
5471           the drbd code for Attach() will connect to the network if it
5472           finds a device which is connected to the good local disks but
5473           not network enabled
5474       - wait for sync across all devices
5475       - remove all disks from the old secondary
5476
5477     Failures are not very well handled.
5478
5479     """
5480     steps_total = 6
5481     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5482     instance = self.instance
5483     iv_names = {}
5484     # start of work
5485     cfg = self.cfg
5486     old_node = self.tgt_node
5487     new_node = self.new_node
5488     pri_node = instance.primary_node
5489     nodes_ip = {
5490       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5491       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5492       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5493       }
5494
5495     # Step: check device activation
5496     self.proc.LogStep(1, steps_total, "check device existence")
5497     info("checking volume groups")
5498     my_vg = cfg.GetVGName()
5499     results = self.rpc.call_vg_list([pri_node, new_node])
5500     for node in pri_node, new_node:
5501       res = results[node]
5502       if res.failed or not res.data or my_vg not in res.data:
5503         raise errors.OpExecError("Volume group '%s' not found on %s" %
5504                                  (my_vg, node))
5505     for idx, dev in enumerate(instance.disks):
5506       if idx not in self.op.disks:
5507         continue
5508       info("checking disk/%d on %s" % (idx, pri_node))
5509       cfg.SetDiskID(dev, pri_node)
5510       result = self.rpc.call_blockdev_find(pri_node, dev)
5511       msg = result.RemoteFailMsg()
5512       if not msg and not result.payload:
5513         msg = "disk not found"
5514       if msg:
5515         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5516                                  (idx, pri_node, msg))
5517
5518     # Step: check other node consistency
5519     self.proc.LogStep(2, steps_total, "check peer consistency")
5520     for idx, dev in enumerate(instance.disks):
5521       if idx not in self.op.disks:
5522         continue
5523       info("checking disk/%d consistency on %s" % (idx, pri_node))
5524       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5525         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5526                                  " unsafe to replace the secondary" %
5527                                  pri_node)
5528
5529     # Step: create new storage
5530     self.proc.LogStep(3, steps_total, "allocate new storage")
5531     for idx, dev in enumerate(instance.disks):
5532       info("adding new local storage on %s for disk/%d" %
5533            (new_node, idx))
5534       # we pass force_create=True to force LVM creation
5535       for new_lv in dev.children:
5536         _CreateBlockDev(self, new_node, instance, new_lv, True,
5537                         _GetInstanceInfoText(instance), False)
5538
5539     # Step 4: dbrd minors and drbd setups changes
5540     # after this, we must manually remove the drbd minors on both the
5541     # error and the success paths
5542     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5543                                    instance.name)
5544     logging.debug("Allocated minors %s" % (minors,))
5545     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5546     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5547       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5548       # create new devices on new_node; note that we create two IDs:
5549       # one without port, so the drbd will be activated without
5550       # networking information on the new node at this stage, and one
5551       # with network, for the latter activation in step 4
5552       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5553       if pri_node == o_node1:
5554         p_minor = o_minor1
5555       else:
5556         p_minor = o_minor2
5557
5558       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5559       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5560
5561       iv_names[idx] = (dev, dev.children, new_net_id)
5562       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5563                     new_net_id)
5564       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5565                               logical_id=new_alone_id,
5566                               children=dev.children,
5567                               size=dev.size)
5568       try:
5569         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5570                               _GetInstanceInfoText(instance), False)
5571       except errors.GenericError:
5572         self.cfg.ReleaseDRBDMinors(instance.name)
5573         raise
5574
5575     for idx, dev in enumerate(instance.disks):
5576       # we have new devices, shutdown the drbd on the old secondary
5577       info("shutting down drbd for disk/%d on old node" % idx)
5578       cfg.SetDiskID(dev, old_node)
5579       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5580       if msg:
5581         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5582                 (idx, msg),
5583                 hint="Please cleanup this device manually as soon as possible")
5584
5585     info("detaching primary drbds from the network (=> standalone)")
5586     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5587                                                instance.disks)[pri_node]
5588
5589     msg = result.RemoteFailMsg()
5590     if msg:
5591       # detaches didn't succeed (unlikely)
5592       self.cfg.ReleaseDRBDMinors(instance.name)
5593       raise errors.OpExecError("Can't detach the disks from the network on"
5594                                " old node: %s" % (msg,))
5595
5596     # if we managed to detach at least one, we update all the disks of
5597     # the instance to point to the new secondary
5598     info("updating instance configuration")
5599     for dev, _, new_logical_id in iv_names.itervalues():
5600       dev.logical_id = new_logical_id
5601       cfg.SetDiskID(dev, pri_node)
5602     cfg.Update(instance)
5603
5604     # and now perform the drbd attach
5605     info("attaching primary drbds to new secondary (standalone => connected)")
5606     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5607                                            instance.disks, instance.name,
5608                                            False)
5609     for to_node, to_result in result.items():
5610       msg = to_result.RemoteFailMsg()
5611       if msg:
5612         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5613                 hint="please do a gnt-instance info to see the"
5614                 " status of disks")
5615
5616     # this can fail as the old devices are degraded and _WaitForSync
5617     # does a combined result over all disks, so we don't check its
5618     # return value
5619     self.proc.LogStep(5, steps_total, "sync devices")
5620     _WaitForSync(self, instance, unlock=True)
5621
5622     # so check manually all the devices
5623     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5624       cfg.SetDiskID(dev, pri_node)
5625       result = self.rpc.call_blockdev_find(pri_node, dev)
5626       msg = result.RemoteFailMsg()
5627       if not msg and not result.payload:
5628         msg = "disk not found"
5629       if msg:
5630         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5631                                  (idx, msg))
5632       if result.payload[5]:
5633         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5634
5635     self.proc.LogStep(6, steps_total, "removing old storage")
5636     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5637       info("remove logical volumes for disk/%d" % idx)
5638       for lv in old_lvs:
5639         cfg.SetDiskID(lv, old_node)
5640         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5641         if msg:
5642           warning("Can't remove LV on old secondary: %s", msg,
5643                   hint="Cleanup stale volumes by hand")
5644
5645   def Exec(self, feedback_fn):
5646     """Execute disk replacement.
5647
5648     This dispatches the disk replacement to the appropriate handler.
5649
5650     """
5651     instance = self.instance
5652
5653     # Activate the instance disks if we're replacing them on a down instance
5654     if not instance.admin_up:
5655       _StartInstanceDisks(self, instance, True)
5656
5657     if self.op.mode == constants.REPLACE_DISK_CHG:
5658       fn = self._ExecD8Secondary
5659     else:
5660       fn = self._ExecD8DiskOnly
5661
5662     ret = fn(feedback_fn)
5663
5664     # Deactivate the instance disks if we're replacing them on a down instance
5665     if not instance.admin_up:
5666       _SafeShutdownInstanceDisks(self, instance)
5667
5668     return ret
5669
5670
5671 class LUGrowDisk(LogicalUnit):
5672   """Grow a disk of an instance.
5673
5674   """
5675   HPATH = "disk-grow"
5676   HTYPE = constants.HTYPE_INSTANCE
5677   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5678   REQ_BGL = False
5679
5680   def ExpandNames(self):
5681     self._ExpandAndLockInstance()
5682     self.needed_locks[locking.LEVEL_NODE] = []
5683     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5684
5685   def DeclareLocks(self, level):
5686     if level == locking.LEVEL_NODE:
5687       self._LockInstancesNodes()
5688
5689   def BuildHooksEnv(self):
5690     """Build hooks env.
5691
5692     This runs on the master, the primary and all the secondaries.
5693
5694     """
5695     env = {
5696       "DISK": self.op.disk,
5697       "AMOUNT": self.op.amount,
5698       }
5699     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5700     nl = [
5701       self.cfg.GetMasterNode(),
5702       self.instance.primary_node,
5703       ]
5704     return env, nl, nl
5705
5706   def CheckPrereq(self):
5707     """Check prerequisites.
5708
5709     This checks that the instance is in the cluster.
5710
5711     """
5712     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5713     assert instance is not None, \
5714       "Cannot retrieve locked instance %s" % self.op.instance_name
5715     nodenames = list(instance.all_nodes)
5716     for node in nodenames:
5717       _CheckNodeOnline(self, node)
5718
5719
5720     self.instance = instance
5721
5722     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5723       raise errors.OpPrereqError("Instance's disk layout does not support"
5724                                  " growing.")
5725
5726     self.disk = instance.FindDisk(self.op.disk)
5727
5728     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5729                                        instance.hypervisor)
5730     for node in nodenames:
5731       info = nodeinfo[node]
5732       if info.failed or not info.data:
5733         raise errors.OpPrereqError("Cannot get current information"
5734                                    " from node '%s'" % node)
5735       vg_free = info.data.get('vg_free', None)
5736       if not isinstance(vg_free, int):
5737         raise errors.OpPrereqError("Can't compute free disk space on"
5738                                    " node %s" % node)
5739       if self.op.amount > vg_free:
5740         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5741                                    " %d MiB available, %d MiB required" %
5742                                    (node, vg_free, self.op.amount))
5743
5744   def Exec(self, feedback_fn):
5745     """Execute disk grow.
5746
5747     """
5748     instance = self.instance
5749     disk = self.disk
5750     for node in instance.all_nodes:
5751       self.cfg.SetDiskID(disk, node)
5752       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5753       msg = result.RemoteFailMsg()
5754       if msg:
5755         raise errors.OpExecError("Grow request failed to node %s: %s" %
5756                                  (node, msg))
5757     disk.RecordGrow(self.op.amount)
5758     self.cfg.Update(instance)
5759     if self.op.wait_for_sync:
5760       disk_abort = not _WaitForSync(self, instance)
5761       if disk_abort:
5762         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5763                              " status.\nPlease check the instance.")
5764
5765
5766 class LUQueryInstanceData(NoHooksLU):
5767   """Query runtime instance data.
5768
5769   """
5770   _OP_REQP = ["instances", "static"]
5771   REQ_BGL = False
5772
5773   def ExpandNames(self):
5774     self.needed_locks = {}
5775     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5776
5777     if not isinstance(self.op.instances, list):
5778       raise errors.OpPrereqError("Invalid argument type 'instances'")
5779
5780     if self.op.instances:
5781       self.wanted_names = []
5782       for name in self.op.instances:
5783         full_name = self.cfg.ExpandInstanceName(name)
5784         if full_name is None:
5785           raise errors.OpPrereqError("Instance '%s' not known" % name)
5786         self.wanted_names.append(full_name)
5787       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5788     else:
5789       self.wanted_names = None
5790       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5791
5792     self.needed_locks[locking.LEVEL_NODE] = []
5793     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5794
5795   def DeclareLocks(self, level):
5796     if level == locking.LEVEL_NODE:
5797       self._LockInstancesNodes()
5798
5799   def CheckPrereq(self):
5800     """Check prerequisites.
5801
5802     This only checks the optional instance list against the existing names.
5803
5804     """
5805     if self.wanted_names is None:
5806       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5807
5808     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5809                              in self.wanted_names]
5810     return
5811
5812   def _ComputeDiskStatus(self, instance, snode, dev):
5813     """Compute block device status.
5814
5815     """
5816     static = self.op.static
5817     if not static:
5818       self.cfg.SetDiskID(dev, instance.primary_node)
5819       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5820       if dev_pstatus.offline:
5821         dev_pstatus = None
5822       else:
5823         msg = dev_pstatus.RemoteFailMsg()
5824         if msg:
5825           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5826                                    (instance.name, msg))
5827         dev_pstatus = dev_pstatus.payload
5828     else:
5829       dev_pstatus = None
5830
5831     if dev.dev_type in constants.LDS_DRBD:
5832       # we change the snode then (otherwise we use the one passed in)
5833       if dev.logical_id[0] == instance.primary_node:
5834         snode = dev.logical_id[1]
5835       else:
5836         snode = dev.logical_id[0]
5837
5838     if snode and not static:
5839       self.cfg.SetDiskID(dev, snode)
5840       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5841       if dev_sstatus.offline:
5842         dev_sstatus = None
5843       else:
5844         msg = dev_sstatus.RemoteFailMsg()
5845         if msg:
5846           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5847                                    (instance.name, msg))
5848         dev_sstatus = dev_sstatus.payload
5849     else:
5850       dev_sstatus = None
5851
5852     if dev.children:
5853       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5854                       for child in dev.children]
5855     else:
5856       dev_children = []
5857
5858     data = {
5859       "iv_name": dev.iv_name,
5860       "dev_type": dev.dev_type,
5861       "logical_id": dev.logical_id,
5862       "physical_id": dev.physical_id,
5863       "pstatus": dev_pstatus,
5864       "sstatus": dev_sstatus,
5865       "children": dev_children,
5866       "mode": dev.mode,
5867       "size": dev.size,
5868       }
5869
5870     return data
5871
5872   def Exec(self, feedback_fn):
5873     """Gather and return data"""
5874     result = {}
5875
5876     cluster = self.cfg.GetClusterInfo()
5877
5878     for instance in self.wanted_instances:
5879       if not self.op.static:
5880         remote_info = self.rpc.call_instance_info(instance.primary_node,
5881                                                   instance.name,
5882                                                   instance.hypervisor)
5883         remote_info.Raise()
5884         remote_info = remote_info.data
5885         if remote_info and "state" in remote_info:
5886           remote_state = "up"
5887         else:
5888           remote_state = "down"
5889       else:
5890         remote_state = None
5891       if instance.admin_up:
5892         config_state = "up"
5893       else:
5894         config_state = "down"
5895
5896       disks = [self._ComputeDiskStatus(instance, None, device)
5897                for device in instance.disks]
5898
5899       idict = {
5900         "name": instance.name,
5901         "config_state": config_state,
5902         "run_state": remote_state,
5903         "pnode": instance.primary_node,
5904         "snodes": instance.secondary_nodes,
5905         "os": instance.os,
5906         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5907         "disks": disks,
5908         "hypervisor": instance.hypervisor,
5909         "network_port": instance.network_port,
5910         "hv_instance": instance.hvparams,
5911         "hv_actual": cluster.FillHV(instance),
5912         "be_instance": instance.beparams,
5913         "be_actual": cluster.FillBE(instance),
5914         }
5915
5916       result[instance.name] = idict
5917
5918     return result
5919
5920
5921 class LUSetInstanceParams(LogicalUnit):
5922   """Modifies an instances's parameters.
5923
5924   """
5925   HPATH = "instance-modify"
5926   HTYPE = constants.HTYPE_INSTANCE
5927   _OP_REQP = ["instance_name"]
5928   REQ_BGL = False
5929
5930   def CheckArguments(self):
5931     if not hasattr(self.op, 'nics'):
5932       self.op.nics = []
5933     if not hasattr(self.op, 'disks'):
5934       self.op.disks = []
5935     if not hasattr(self.op, 'beparams'):
5936       self.op.beparams = {}
5937     if not hasattr(self.op, 'hvparams'):
5938       self.op.hvparams = {}
5939     self.op.force = getattr(self.op, "force", False)
5940     if not (self.op.nics or self.op.disks or
5941             self.op.hvparams or self.op.beparams):
5942       raise errors.OpPrereqError("No changes submitted")
5943
5944     # Disk validation
5945     disk_addremove = 0
5946     for disk_op, disk_dict in self.op.disks:
5947       if disk_op == constants.DDM_REMOVE:
5948         disk_addremove += 1
5949         continue
5950       elif disk_op == constants.DDM_ADD:
5951         disk_addremove += 1
5952       else:
5953         if not isinstance(disk_op, int):
5954           raise errors.OpPrereqError("Invalid disk index")
5955       if disk_op == constants.DDM_ADD:
5956         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5957         if mode not in constants.DISK_ACCESS_SET:
5958           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5959         size = disk_dict.get('size', None)
5960         if size is None:
5961           raise errors.OpPrereqError("Required disk parameter size missing")
5962         try:
5963           size = int(size)
5964         except ValueError, err:
5965           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5966                                      str(err))
5967         disk_dict['size'] = size
5968       else:
5969         # modification of disk
5970         if 'size' in disk_dict:
5971           raise errors.OpPrereqError("Disk size change not possible, use"
5972                                      " grow-disk")
5973
5974     if disk_addremove > 1:
5975       raise errors.OpPrereqError("Only one disk add or remove operation"
5976                                  " supported at a time")
5977
5978     # NIC validation
5979     nic_addremove = 0
5980     for nic_op, nic_dict in self.op.nics:
5981       if nic_op == constants.DDM_REMOVE:
5982         nic_addremove += 1
5983         continue
5984       elif nic_op == constants.DDM_ADD:
5985         nic_addremove += 1
5986       else:
5987         if not isinstance(nic_op, int):
5988           raise errors.OpPrereqError("Invalid nic index")
5989
5990       # nic_dict should be a dict
5991       nic_ip = nic_dict.get('ip', None)
5992       if nic_ip is not None:
5993         if nic_ip.lower() == constants.VALUE_NONE:
5994           nic_dict['ip'] = None
5995         else:
5996           if not utils.IsValidIP(nic_ip):
5997             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5998
5999       if nic_op == constants.DDM_ADD:
6000         nic_bridge = nic_dict.get('bridge', None)
6001         if nic_bridge is None:
6002           nic_dict['bridge'] = self.cfg.GetDefBridge()
6003         nic_mac = nic_dict.get('mac', None)
6004         if nic_mac is None:
6005           nic_dict['mac'] = constants.VALUE_AUTO
6006
6007       if 'mac' in nic_dict:
6008         nic_mac = nic_dict['mac']
6009         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6010           if not utils.IsValidMac(nic_mac):
6011             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6012         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6013           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6014                                      " modifying an existing nic")
6015
6016     if nic_addremove > 1:
6017       raise errors.OpPrereqError("Only one NIC add or remove operation"
6018                                  " supported at a time")
6019
6020   def ExpandNames(self):
6021     self._ExpandAndLockInstance()
6022     self.needed_locks[locking.LEVEL_NODE] = []
6023     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6024
6025   def DeclareLocks(self, level):
6026     if level == locking.LEVEL_NODE:
6027       self._LockInstancesNodes()
6028
6029   def BuildHooksEnv(self):
6030     """Build hooks env.
6031
6032     This runs on the master, primary and secondaries.
6033
6034     """
6035     args = dict()
6036     if constants.BE_MEMORY in self.be_new:
6037       args['memory'] = self.be_new[constants.BE_MEMORY]
6038     if constants.BE_VCPUS in self.be_new:
6039       args['vcpus'] = self.be_new[constants.BE_VCPUS]
6040     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6041     # information at all.
6042     if self.op.nics:
6043       args['nics'] = []
6044       nic_override = dict(self.op.nics)
6045       for idx, nic in enumerate(self.instance.nics):
6046         if idx in nic_override:
6047           this_nic_override = nic_override[idx]
6048         else:
6049           this_nic_override = {}
6050         if 'ip' in this_nic_override:
6051           ip = this_nic_override['ip']
6052         else:
6053           ip = nic.ip
6054         if 'bridge' in this_nic_override:
6055           bridge = this_nic_override['bridge']
6056         else:
6057           bridge = nic.bridge
6058         if 'mac' in this_nic_override:
6059           mac = this_nic_override['mac']
6060         else:
6061           mac = nic.mac
6062         args['nics'].append((ip, bridge, mac))
6063       if constants.DDM_ADD in nic_override:
6064         ip = nic_override[constants.DDM_ADD].get('ip', None)
6065         bridge = nic_override[constants.DDM_ADD]['bridge']
6066         mac = nic_override[constants.DDM_ADD]['mac']
6067         args['nics'].append((ip, bridge, mac))
6068       elif constants.DDM_REMOVE in nic_override:
6069         del args['nics'][-1]
6070
6071     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6072     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6073     return env, nl, nl
6074
6075   def CheckPrereq(self):
6076     """Check prerequisites.
6077
6078     This only checks the instance list against the existing names.
6079
6080     """
6081     self.force = self.op.force
6082
6083     # checking the new params on the primary/secondary nodes
6084
6085     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6086     assert self.instance is not None, \
6087       "Cannot retrieve locked instance %s" % self.op.instance_name
6088     pnode = instance.primary_node
6089     nodelist = list(instance.all_nodes)
6090
6091     # hvparams processing
6092     if self.op.hvparams:
6093       i_hvdict = copy.deepcopy(instance.hvparams)
6094       for key, val in self.op.hvparams.iteritems():
6095         if val == constants.VALUE_DEFAULT:
6096           try:
6097             del i_hvdict[key]
6098           except KeyError:
6099             pass
6100         else:
6101           i_hvdict[key] = val
6102       cluster = self.cfg.GetClusterInfo()
6103       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
6104       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
6105                                 i_hvdict)
6106       # local check
6107       hypervisor.GetHypervisor(
6108         instance.hypervisor).CheckParameterSyntax(hv_new)
6109       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6110       self.hv_new = hv_new # the new actual values
6111       self.hv_inst = i_hvdict # the new dict (without defaults)
6112     else:
6113       self.hv_new = self.hv_inst = {}
6114
6115     # beparams processing
6116     if self.op.beparams:
6117       i_bedict = copy.deepcopy(instance.beparams)
6118       for key, val in self.op.beparams.iteritems():
6119         if val == constants.VALUE_DEFAULT:
6120           try:
6121             del i_bedict[key]
6122           except KeyError:
6123             pass
6124         else:
6125           i_bedict[key] = val
6126       cluster = self.cfg.GetClusterInfo()
6127       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
6128       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
6129                                 i_bedict)
6130       self.be_new = be_new # the new actual values
6131       self.be_inst = i_bedict # the new dict (without defaults)
6132     else:
6133       self.be_new = self.be_inst = {}
6134
6135     self.warn = []
6136
6137     if constants.BE_MEMORY in self.op.beparams and not self.force:
6138       mem_check_list = [pnode]
6139       if be_new[constants.BE_AUTO_BALANCE]:
6140         # either we changed auto_balance to yes or it was from before
6141         mem_check_list.extend(instance.secondary_nodes)
6142       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6143                                                   instance.hypervisor)
6144       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6145                                          instance.hypervisor)
6146       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6147         # Assume the primary node is unreachable and go ahead
6148         self.warn.append("Can't get info from primary node %s" % pnode)
6149       else:
6150         if not instance_info.failed and instance_info.data:
6151           current_mem = int(instance_info.data['memory'])
6152         else:
6153           # Assume instance not running
6154           # (there is a slight race condition here, but it's not very probable,
6155           # and we have no other way to check)
6156           current_mem = 0
6157         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6158                     nodeinfo[pnode].data['memory_free'])
6159         if miss_mem > 0:
6160           raise errors.OpPrereqError("This change will prevent the instance"
6161                                      " from starting, due to %d MB of memory"
6162                                      " missing on its primary node" % miss_mem)
6163
6164       if be_new[constants.BE_AUTO_BALANCE]:
6165         for node, nres in nodeinfo.iteritems():
6166           if node not in instance.secondary_nodes:
6167             continue
6168           if nres.failed or not isinstance(nres.data, dict):
6169             self.warn.append("Can't get info from secondary node %s" % node)
6170           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6171             self.warn.append("Not enough memory to failover instance to"
6172                              " secondary node %s" % node)
6173
6174     # NIC processing
6175     for nic_op, nic_dict in self.op.nics:
6176       if nic_op == constants.DDM_REMOVE:
6177         if not instance.nics:
6178           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6179         continue
6180       if nic_op != constants.DDM_ADD:
6181         # an existing nic
6182         if nic_op < 0 or nic_op >= len(instance.nics):
6183           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6184                                      " are 0 to %d" %
6185                                      (nic_op, len(instance.nics)))
6186       if 'bridge' in nic_dict:
6187         nic_bridge = nic_dict['bridge']
6188         if nic_bridge is None:
6189           raise errors.OpPrereqError('Cannot set the nic bridge to None')
6190         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6191           msg = ("Bridge '%s' doesn't exist on one of"
6192                  " the instance nodes" % nic_bridge)
6193           if self.force:
6194             self.warn.append(msg)
6195           else:
6196             raise errors.OpPrereqError(msg)
6197       if 'mac' in nic_dict:
6198         nic_mac = nic_dict['mac']
6199         if nic_mac is None:
6200           raise errors.OpPrereqError('Cannot set the nic mac to None')
6201         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6202           # otherwise generate the mac
6203           nic_dict['mac'] = self.cfg.GenerateMAC()
6204         else:
6205           # or validate/reserve the current one
6206           if self.cfg.IsMacInUse(nic_mac):
6207             raise errors.OpPrereqError("MAC address %s already in use"
6208                                        " in cluster" % nic_mac)
6209
6210     # DISK processing
6211     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6212       raise errors.OpPrereqError("Disk operations not supported for"
6213                                  " diskless instances")
6214     for disk_op, disk_dict in self.op.disks:
6215       if disk_op == constants.DDM_REMOVE:
6216         if len(instance.disks) == 1:
6217           raise errors.OpPrereqError("Cannot remove the last disk of"
6218                                      " an instance")
6219         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6220         ins_l = ins_l[pnode]
6221         if ins_l.failed or not isinstance(ins_l.data, list):
6222           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6223         if instance.name in ins_l.data:
6224           raise errors.OpPrereqError("Instance is running, can't remove"
6225                                      " disks.")
6226
6227       if (disk_op == constants.DDM_ADD and
6228           len(instance.nics) >= constants.MAX_DISKS):
6229         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6230                                    " add more" % constants.MAX_DISKS)
6231       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6232         # an existing disk
6233         if disk_op < 0 or disk_op >= len(instance.disks):
6234           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6235                                      " are 0 to %d" %
6236                                      (disk_op, len(instance.disks)))
6237
6238     return
6239
6240   def Exec(self, feedback_fn):
6241     """Modifies an instance.
6242
6243     All parameters take effect only at the next restart of the instance.
6244
6245     """
6246     # Process here the warnings from CheckPrereq, as we don't have a
6247     # feedback_fn there.
6248     for warn in self.warn:
6249       feedback_fn("WARNING: %s" % warn)
6250
6251     result = []
6252     instance = self.instance
6253     # disk changes
6254     for disk_op, disk_dict in self.op.disks:
6255       if disk_op == constants.DDM_REMOVE:
6256         # remove the last disk
6257         device = instance.disks.pop()
6258         device_idx = len(instance.disks)
6259         for node, disk in device.ComputeNodeTree(instance.primary_node):
6260           self.cfg.SetDiskID(disk, node)
6261           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6262           if msg:
6263             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6264                             " continuing anyway", device_idx, node, msg)
6265         result.append(("disk/%d" % device_idx, "remove"))
6266       elif disk_op == constants.DDM_ADD:
6267         # add a new disk
6268         if instance.disk_template == constants.DT_FILE:
6269           file_driver, file_path = instance.disks[0].logical_id
6270           file_path = os.path.dirname(file_path)
6271         else:
6272           file_driver = file_path = None
6273         disk_idx_base = len(instance.disks)
6274         new_disk = _GenerateDiskTemplate(self,
6275                                          instance.disk_template,
6276                                          instance.name, instance.primary_node,
6277                                          instance.secondary_nodes,
6278                                          [disk_dict],
6279                                          file_path,
6280                                          file_driver,
6281                                          disk_idx_base)[0]
6282         instance.disks.append(new_disk)
6283         info = _GetInstanceInfoText(instance)
6284
6285         logging.info("Creating volume %s for instance %s",
6286                      new_disk.iv_name, instance.name)
6287         # Note: this needs to be kept in sync with _CreateDisks
6288         #HARDCODE
6289         for node in instance.all_nodes:
6290           f_create = node == instance.primary_node
6291           try:
6292             _CreateBlockDev(self, node, instance, new_disk,
6293                             f_create, info, f_create)
6294           except errors.OpExecError, err:
6295             self.LogWarning("Failed to create volume %s (%s) on"
6296                             " node %s: %s",
6297                             new_disk.iv_name, new_disk, node, err)
6298         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6299                        (new_disk.size, new_disk.mode)))
6300       else:
6301         # change a given disk
6302         instance.disks[disk_op].mode = disk_dict['mode']
6303         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6304     # NIC changes
6305     for nic_op, nic_dict in self.op.nics:
6306       if nic_op == constants.DDM_REMOVE:
6307         # remove the last nic
6308         del instance.nics[-1]
6309         result.append(("nic.%d" % len(instance.nics), "remove"))
6310       elif nic_op == constants.DDM_ADD:
6311         # mac and bridge should be set, by now
6312         mac = nic_dict['mac']
6313         bridge = nic_dict['bridge']
6314         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6315                               bridge=bridge)
6316         instance.nics.append(new_nic)
6317         result.append(("nic.%d" % (len(instance.nics) - 1),
6318                        "add:mac=%s,ip=%s,bridge=%s" %
6319                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6320       else:
6321         # change a given nic
6322         for key in 'mac', 'ip', 'bridge':
6323           if key in nic_dict:
6324             setattr(instance.nics[nic_op], key, nic_dict[key])
6325             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6326
6327     # hvparams changes
6328     if self.op.hvparams:
6329       instance.hvparams = self.hv_inst
6330       for key, val in self.op.hvparams.iteritems():
6331         result.append(("hv/%s" % key, val))
6332
6333     # beparams changes
6334     if self.op.beparams:
6335       instance.beparams = self.be_inst
6336       for key, val in self.op.beparams.iteritems():
6337         result.append(("be/%s" % key, val))
6338
6339     self.cfg.Update(instance)
6340
6341     return result
6342
6343
6344 class LUQueryExports(NoHooksLU):
6345   """Query the exports list
6346
6347   """
6348   _OP_REQP = ['nodes']
6349   REQ_BGL = False
6350
6351   def ExpandNames(self):
6352     self.needed_locks = {}
6353     self.share_locks[locking.LEVEL_NODE] = 1
6354     if not self.op.nodes:
6355       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6356     else:
6357       self.needed_locks[locking.LEVEL_NODE] = \
6358         _GetWantedNodes(self, self.op.nodes)
6359
6360   def CheckPrereq(self):
6361     """Check prerequisites.
6362
6363     """
6364     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6365
6366   def Exec(self, feedback_fn):
6367     """Compute the list of all the exported system images.
6368
6369     @rtype: dict
6370     @return: a dictionary with the structure node->(export-list)
6371         where export-list is a list of the instances exported on
6372         that node.
6373
6374     """
6375     rpcresult = self.rpc.call_export_list(self.nodes)
6376     result = {}
6377     for node in rpcresult:
6378       if rpcresult[node].failed:
6379         result[node] = False
6380       else:
6381         result[node] = rpcresult[node].data
6382
6383     return result
6384
6385
6386 class LUExportInstance(LogicalUnit):
6387   """Export an instance to an image in the cluster.
6388
6389   """
6390   HPATH = "instance-export"
6391   HTYPE = constants.HTYPE_INSTANCE
6392   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6393   REQ_BGL = False
6394
6395   def ExpandNames(self):
6396     self._ExpandAndLockInstance()
6397     # FIXME: lock only instance primary and destination node
6398     #
6399     # Sad but true, for now we have do lock all nodes, as we don't know where
6400     # the previous export might be, and and in this LU we search for it and
6401     # remove it from its current node. In the future we could fix this by:
6402     #  - making a tasklet to search (share-lock all), then create the new one,
6403     #    then one to remove, after
6404     #  - removing the removal operation altogether
6405     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6406
6407   def DeclareLocks(self, level):
6408     """Last minute lock declaration."""
6409     # All nodes are locked anyway, so nothing to do here.
6410
6411   def BuildHooksEnv(self):
6412     """Build hooks env.
6413
6414     This will run on the master, primary node and target node.
6415
6416     """
6417     env = {
6418       "EXPORT_NODE": self.op.target_node,
6419       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6420       }
6421     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6422     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6423           self.op.target_node]
6424     return env, nl, nl
6425
6426   def CheckPrereq(self):
6427     """Check prerequisites.
6428
6429     This checks that the instance and node names are valid.
6430
6431     """
6432     instance_name = self.op.instance_name
6433     self.instance = self.cfg.GetInstanceInfo(instance_name)
6434     assert self.instance is not None, \
6435           "Cannot retrieve locked instance %s" % self.op.instance_name
6436     _CheckNodeOnline(self, self.instance.primary_node)
6437
6438     self.dst_node = self.cfg.GetNodeInfo(
6439       self.cfg.ExpandNodeName(self.op.target_node))
6440
6441     if self.dst_node is None:
6442       # This is wrong node name, not a non-locked node
6443       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6444     _CheckNodeOnline(self, self.dst_node.name)
6445     _CheckNodeNotDrained(self, self.dst_node.name)
6446
6447     # instance disk type verification
6448     for disk in self.instance.disks:
6449       if disk.dev_type == constants.LD_FILE:
6450         raise errors.OpPrereqError("Export not supported for instances with"
6451                                    " file-based disks")
6452
6453   def Exec(self, feedback_fn):
6454     """Export an instance to an image in the cluster.
6455
6456     """
6457     instance = self.instance
6458     dst_node = self.dst_node
6459     src_node = instance.primary_node
6460     if self.op.shutdown:
6461       # shutdown the instance, but not the disks
6462       result = self.rpc.call_instance_shutdown(src_node, instance)
6463       msg = result.RemoteFailMsg()
6464       if msg:
6465         raise errors.OpExecError("Could not shutdown instance %s on"
6466                                  " node %s: %s" %
6467                                  (instance.name, src_node, msg))
6468
6469     vgname = self.cfg.GetVGName()
6470
6471     snap_disks = []
6472
6473     # set the disks ID correctly since call_instance_start needs the
6474     # correct drbd minor to create the symlinks
6475     for disk in instance.disks:
6476       self.cfg.SetDiskID(disk, src_node)
6477
6478     # per-disk results
6479     dresults = []
6480     try:
6481       for idx, disk in enumerate(instance.disks):
6482         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6483         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6484         if new_dev_name.failed or not new_dev_name.data:
6485           self.LogWarning("Could not snapshot disk/%d on node %s",
6486                           idx, src_node)
6487           snap_disks.append(False)
6488         else:
6489           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6490                                  logical_id=(vgname, new_dev_name.data),
6491                                  physical_id=(vgname, new_dev_name.data),
6492                                  iv_name=disk.iv_name)
6493           snap_disks.append(new_dev)
6494
6495     finally:
6496       if self.op.shutdown and instance.admin_up:
6497         result = self.rpc.call_instance_start(src_node, instance, None, None)
6498         msg = result.RemoteFailMsg()
6499         if msg:
6500           _ShutdownInstanceDisks(self, instance)
6501           raise errors.OpExecError("Could not start instance: %s" % msg)
6502
6503     # TODO: check for size
6504
6505     cluster_name = self.cfg.GetClusterName()
6506     for idx, dev in enumerate(snap_disks):
6507       if dev:
6508         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6509                                                instance, cluster_name, idx)
6510         if result.failed or not result.data:
6511           self.LogWarning("Could not export disk/%d from node %s to"
6512                           " node %s", idx, src_node, dst_node.name)
6513           dresults.append(False)
6514         else:
6515           dresults.append(True)
6516         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6517         if msg:
6518           self.LogWarning("Could not remove snapshot for disk/%d from node"
6519                           " %s: %s", idx, src_node, msg)
6520       else:
6521         dresults.append(False)
6522
6523     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6524     fin_resu = True
6525     if result.failed or not result.data:
6526       self.LogWarning("Could not finalize export for instance %s on node %s",
6527                       instance.name, dst_node.name)
6528       fin_resu = False
6529
6530     nodelist = self.cfg.GetNodeList()
6531     nodelist.remove(dst_node.name)
6532
6533     # on one-node clusters nodelist will be empty after the removal
6534     # if we proceed the backup would be removed because OpQueryExports
6535     # substitutes an empty list with the full cluster node list.
6536     if nodelist:
6537       exportlist = self.rpc.call_export_list(nodelist)
6538       for node in exportlist:
6539         if exportlist[node].failed:
6540           continue
6541         if instance.name in exportlist[node].data:
6542           if not self.rpc.call_export_remove(node, instance.name):
6543             self.LogWarning("Could not remove older export for instance %s"
6544                             " on node %s", instance.name, node)
6545     return fin_resu, dresults
6546
6547
6548 class LURemoveExport(NoHooksLU):
6549   """Remove exports related to the named instance.
6550
6551   """
6552   _OP_REQP = ["instance_name"]
6553   REQ_BGL = False
6554
6555   def ExpandNames(self):
6556     self.needed_locks = {}
6557     # We need all nodes to be locked in order for RemoveExport to work, but we
6558     # don't need to lock the instance itself, as nothing will happen to it (and
6559     # we can remove exports also for a removed instance)
6560     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6561
6562   def CheckPrereq(self):
6563     """Check prerequisites.
6564     """
6565     pass
6566
6567   def Exec(self, feedback_fn):
6568     """Remove any export.
6569
6570     """
6571     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6572     # If the instance was not found we'll try with the name that was passed in.
6573     # This will only work if it was an FQDN, though.
6574     fqdn_warn = False
6575     if not instance_name:
6576       fqdn_warn = True
6577       instance_name = self.op.instance_name
6578
6579     exportlist = self.rpc.call_export_list(self.acquired_locks[
6580       locking.LEVEL_NODE])
6581     found = False
6582     for node in exportlist:
6583       if exportlist[node].failed:
6584         self.LogWarning("Failed to query node %s, continuing" % node)
6585         continue
6586       if instance_name in exportlist[node].data:
6587         found = True
6588         result = self.rpc.call_export_remove(node, instance_name)
6589         if result.failed or not result.data:
6590           logging.error("Could not remove export for instance %s"
6591                         " on node %s", instance_name, node)
6592
6593     if fqdn_warn and not found:
6594       feedback_fn("Export not found. If trying to remove an export belonging"
6595                   " to a deleted instance please use its Fully Qualified"
6596                   " Domain Name.")
6597
6598
6599 class TagsLU(NoHooksLU):
6600   """Generic tags LU.
6601
6602   This is an abstract class which is the parent of all the other tags LUs.
6603
6604   """
6605
6606   def ExpandNames(self):
6607     self.needed_locks = {}
6608     if self.op.kind == constants.TAG_NODE:
6609       name = self.cfg.ExpandNodeName(self.op.name)
6610       if name is None:
6611         raise errors.OpPrereqError("Invalid node name (%s)" %
6612                                    (self.op.name,))
6613       self.op.name = name
6614       self.needed_locks[locking.LEVEL_NODE] = name
6615     elif self.op.kind == constants.TAG_INSTANCE:
6616       name = self.cfg.ExpandInstanceName(self.op.name)
6617       if name is None:
6618         raise errors.OpPrereqError("Invalid instance name (%s)" %
6619                                    (self.op.name,))
6620       self.op.name = name
6621       self.needed_locks[locking.LEVEL_INSTANCE] = name
6622
6623   def CheckPrereq(self):
6624     """Check prerequisites.
6625
6626     """
6627     if self.op.kind == constants.TAG_CLUSTER:
6628       self.target = self.cfg.GetClusterInfo()
6629     elif self.op.kind == constants.TAG_NODE:
6630       self.target = self.cfg.GetNodeInfo(self.op.name)
6631     elif self.op.kind == constants.TAG_INSTANCE:
6632       self.target = self.cfg.GetInstanceInfo(self.op.name)
6633     else:
6634       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6635                                  str(self.op.kind))
6636
6637
6638 class LUGetTags(TagsLU):
6639   """Returns the tags of a given object.
6640
6641   """
6642   _OP_REQP = ["kind", "name"]
6643   REQ_BGL = False
6644
6645   def Exec(self, feedback_fn):
6646     """Returns the tag list.
6647
6648     """
6649     return list(self.target.GetTags())
6650
6651
6652 class LUSearchTags(NoHooksLU):
6653   """Searches the tags for a given pattern.
6654
6655   """
6656   _OP_REQP = ["pattern"]
6657   REQ_BGL = False
6658
6659   def ExpandNames(self):
6660     self.needed_locks = {}
6661
6662   def CheckPrereq(self):
6663     """Check prerequisites.
6664
6665     This checks the pattern passed for validity by compiling it.
6666
6667     """
6668     try:
6669       self.re = re.compile(self.op.pattern)
6670     except re.error, err:
6671       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6672                                  (self.op.pattern, err))
6673
6674   def Exec(self, feedback_fn):
6675     """Returns the tag list.
6676
6677     """
6678     cfg = self.cfg
6679     tgts = [("/cluster", cfg.GetClusterInfo())]
6680     ilist = cfg.GetAllInstancesInfo().values()
6681     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6682     nlist = cfg.GetAllNodesInfo().values()
6683     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6684     results = []
6685     for path, target in tgts:
6686       for tag in target.GetTags():
6687         if self.re.search(tag):
6688           results.append((path, tag))
6689     return results
6690
6691
6692 class LUAddTags(TagsLU):
6693   """Sets a tag on a given object.
6694
6695   """
6696   _OP_REQP = ["kind", "name", "tags"]
6697   REQ_BGL = False
6698
6699   def CheckPrereq(self):
6700     """Check prerequisites.
6701
6702     This checks the type and length of the tag name and value.
6703
6704     """
6705     TagsLU.CheckPrereq(self)
6706     for tag in self.op.tags:
6707       objects.TaggableObject.ValidateTag(tag)
6708
6709   def Exec(self, feedback_fn):
6710     """Sets the tag.
6711
6712     """
6713     try:
6714       for tag in self.op.tags:
6715         self.target.AddTag(tag)
6716     except errors.TagError, err:
6717       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6718     try:
6719       self.cfg.Update(self.target)
6720     except errors.ConfigurationError:
6721       raise errors.OpRetryError("There has been a modification to the"
6722                                 " config file and the operation has been"
6723                                 " aborted. Please retry.")
6724
6725
6726 class LUDelTags(TagsLU):
6727   """Delete a list of tags from a given object.
6728
6729   """
6730   _OP_REQP = ["kind", "name", "tags"]
6731   REQ_BGL = False
6732
6733   def CheckPrereq(self):
6734     """Check prerequisites.
6735
6736     This checks that we have the given tag.
6737
6738     """
6739     TagsLU.CheckPrereq(self)
6740     for tag in self.op.tags:
6741       objects.TaggableObject.ValidateTag(tag)
6742     del_tags = frozenset(self.op.tags)
6743     cur_tags = self.target.GetTags()
6744     if not del_tags <= cur_tags:
6745       diff_tags = del_tags - cur_tags
6746       diff_names = ["'%s'" % tag for tag in diff_tags]
6747       diff_names.sort()
6748       raise errors.OpPrereqError("Tag(s) %s not found" %
6749                                  (",".join(diff_names)))
6750
6751   def Exec(self, feedback_fn):
6752     """Remove the tag from the object.
6753
6754     """
6755     for tag in self.op.tags:
6756       self.target.RemoveTag(tag)
6757     try:
6758       self.cfg.Update(self.target)
6759     except errors.ConfigurationError:
6760       raise errors.OpRetryError("There has been a modification to the"
6761                                 " config file and the operation has been"
6762                                 " aborted. Please retry.")
6763
6764
6765 class LUTestDelay(NoHooksLU):
6766   """Sleep for a specified amount of time.
6767
6768   This LU sleeps on the master and/or nodes for a specified amount of
6769   time.
6770
6771   """
6772   _OP_REQP = ["duration", "on_master", "on_nodes"]
6773   REQ_BGL = False
6774
6775   def ExpandNames(self):
6776     """Expand names and set required locks.
6777
6778     This expands the node list, if any.
6779
6780     """
6781     self.needed_locks = {}
6782     if self.op.on_nodes:
6783       # _GetWantedNodes can be used here, but is not always appropriate to use
6784       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6785       # more information.
6786       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6787       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6788
6789   def CheckPrereq(self):
6790     """Check prerequisites.
6791
6792     """
6793
6794   def Exec(self, feedback_fn):
6795     """Do the actual sleep.
6796
6797     """
6798     if self.op.on_master:
6799       if not utils.TestDelay(self.op.duration):
6800         raise errors.OpExecError("Error during master delay test")
6801     if self.op.on_nodes:
6802       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6803       if not result:
6804         raise errors.OpExecError("Complete failure from rpc call")
6805       for node, node_result in result.items():
6806         node_result.Raise()
6807         if not node_result.data:
6808           raise errors.OpExecError("Failure during rpc call to node %s,"
6809                                    " result: %s" % (node, node_result.data))
6810
6811
6812 class IAllocator(object):
6813   """IAllocator framework.
6814
6815   An IAllocator instance has three sets of attributes:
6816     - cfg that is needed to query the cluster
6817     - input data (all members of the _KEYS class attribute are required)
6818     - four buffer attributes (in|out_data|text), that represent the
6819       input (to the external script) in text and data structure format,
6820       and the output from it, again in two formats
6821     - the result variables from the script (success, info, nodes) for
6822       easy usage
6823
6824   """
6825   _ALLO_KEYS = [
6826     "mem_size", "disks", "disk_template",
6827     "os", "tags", "nics", "vcpus", "hypervisor",
6828     ]
6829   _RELO_KEYS = [
6830     "relocate_from",
6831     ]
6832
6833   def __init__(self, lu, mode, name, **kwargs):
6834     self.lu = lu
6835     # init buffer variables
6836     self.in_text = self.out_text = self.in_data = self.out_data = None
6837     # init all input fields so that pylint is happy
6838     self.mode = mode
6839     self.name = name
6840     self.mem_size = self.disks = self.disk_template = None
6841     self.os = self.tags = self.nics = self.vcpus = None
6842     self.hypervisor = None
6843     self.relocate_from = None
6844     # computed fields
6845     self.required_nodes = None
6846     # init result fields
6847     self.success = self.info = self.nodes = None
6848     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6849       keyset = self._ALLO_KEYS
6850     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6851       keyset = self._RELO_KEYS
6852     else:
6853       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6854                                    " IAllocator" % self.mode)
6855     for key in kwargs:
6856       if key not in keyset:
6857         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6858                                      " IAllocator" % key)
6859       setattr(self, key, kwargs[key])
6860     for key in keyset:
6861       if key not in kwargs:
6862         raise errors.ProgrammerError("Missing input parameter '%s' to"
6863                                      " IAllocator" % key)
6864     self._BuildInputData()
6865
6866   def _ComputeClusterData(self):
6867     """Compute the generic allocator input data.
6868
6869     This is the data that is independent of the actual operation.
6870
6871     """
6872     cfg = self.lu.cfg
6873     cluster_info = cfg.GetClusterInfo()
6874     # cluster data
6875     data = {
6876       "version": constants.IALLOCATOR_VERSION,
6877       "cluster_name": cfg.GetClusterName(),
6878       "cluster_tags": list(cluster_info.GetTags()),
6879       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6880       # we don't have job IDs
6881       }
6882     iinfo = cfg.GetAllInstancesInfo().values()
6883     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6884
6885     # node data
6886     node_results = {}
6887     node_list = cfg.GetNodeList()
6888
6889     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6890       hypervisor_name = self.hypervisor
6891     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6892       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6893
6894     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6895                                            hypervisor_name)
6896     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6897                        cluster_info.enabled_hypervisors)
6898     for nname, nresult in node_data.items():
6899       # first fill in static (config-based) values
6900       ninfo = cfg.GetNodeInfo(nname)
6901       pnr = {
6902         "tags": list(ninfo.GetTags()),
6903         "primary_ip": ninfo.primary_ip,
6904         "secondary_ip": ninfo.secondary_ip,
6905         "offline": ninfo.offline,
6906         "drained": ninfo.drained,
6907         "master_candidate": ninfo.master_candidate,
6908         }
6909
6910       if not ninfo.offline:
6911         nresult.Raise()
6912         if not isinstance(nresult.data, dict):
6913           raise errors.OpExecError("Can't get data for node %s" % nname)
6914         remote_info = nresult.data
6915         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6916                      'vg_size', 'vg_free', 'cpu_total']:
6917           if attr not in remote_info:
6918             raise errors.OpExecError("Node '%s' didn't return attribute"
6919                                      " '%s'" % (nname, attr))
6920           try:
6921             remote_info[attr] = int(remote_info[attr])
6922           except ValueError, err:
6923             raise errors.OpExecError("Node '%s' returned invalid value"
6924                                      " for '%s': %s" % (nname, attr, err))
6925         # compute memory used by primary instances
6926         i_p_mem = i_p_up_mem = 0
6927         for iinfo, beinfo in i_list:
6928           if iinfo.primary_node == nname:
6929             i_p_mem += beinfo[constants.BE_MEMORY]
6930             if iinfo.name not in node_iinfo[nname].data:
6931               i_used_mem = 0
6932             else:
6933               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6934             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6935             remote_info['memory_free'] -= max(0, i_mem_diff)
6936
6937             if iinfo.admin_up:
6938               i_p_up_mem += beinfo[constants.BE_MEMORY]
6939
6940         # compute memory used by instances
6941         pnr_dyn = {
6942           "total_memory": remote_info['memory_total'],
6943           "reserved_memory": remote_info['memory_dom0'],
6944           "free_memory": remote_info['memory_free'],
6945           "total_disk": remote_info['vg_size'],
6946           "free_disk": remote_info['vg_free'],
6947           "total_cpus": remote_info['cpu_total'],
6948           "i_pri_memory": i_p_mem,
6949           "i_pri_up_memory": i_p_up_mem,
6950           }
6951         pnr.update(pnr_dyn)
6952
6953       node_results[nname] = pnr
6954     data["nodes"] = node_results
6955
6956     # instance data
6957     instance_data = {}
6958     for iinfo, beinfo in i_list:
6959       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6960                   for n in iinfo.nics]
6961       pir = {
6962         "tags": list(iinfo.GetTags()),
6963         "admin_up": iinfo.admin_up,
6964         "vcpus": beinfo[constants.BE_VCPUS],
6965         "memory": beinfo[constants.BE_MEMORY],
6966         "os": iinfo.os,
6967         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6968         "nics": nic_data,
6969         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6970         "disk_template": iinfo.disk_template,
6971         "hypervisor": iinfo.hypervisor,
6972         }
6973       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6974                                                  pir["disks"])
6975       instance_data[iinfo.name] = pir
6976
6977     data["instances"] = instance_data
6978
6979     self.in_data = data
6980
6981   def _AddNewInstance(self):
6982     """Add new instance data to allocator structure.
6983
6984     This in combination with _AllocatorGetClusterData will create the
6985     correct structure needed as input for the allocator.
6986
6987     The checks for the completeness of the opcode must have already been
6988     done.
6989
6990     """
6991     data = self.in_data
6992
6993     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6994
6995     if self.disk_template in constants.DTS_NET_MIRROR:
6996       self.required_nodes = 2
6997     else:
6998       self.required_nodes = 1
6999     request = {
7000       "type": "allocate",
7001       "name": self.name,
7002       "disk_template": self.disk_template,
7003       "tags": self.tags,
7004       "os": self.os,
7005       "vcpus": self.vcpus,
7006       "memory": self.mem_size,
7007       "disks": self.disks,
7008       "disk_space_total": disk_space,
7009       "nics": self.nics,
7010       "required_nodes": self.required_nodes,
7011       }
7012     data["request"] = request
7013
7014   def _AddRelocateInstance(self):
7015     """Add relocate instance data to allocator structure.
7016
7017     This in combination with _IAllocatorGetClusterData will create the
7018     correct structure needed as input for the allocator.
7019
7020     The checks for the completeness of the opcode must have already been
7021     done.
7022
7023     """
7024     instance = self.lu.cfg.GetInstanceInfo(self.name)
7025     if instance is None:
7026       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7027                                    " IAllocator" % self.name)
7028
7029     if instance.disk_template not in constants.DTS_NET_MIRROR:
7030       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7031
7032     if len(instance.secondary_nodes) != 1:
7033       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7034
7035     self.required_nodes = 1
7036     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7037     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7038
7039     request = {
7040       "type": "relocate",
7041       "name": self.name,
7042       "disk_space_total": disk_space,
7043       "required_nodes": self.required_nodes,
7044       "relocate_from": self.relocate_from,
7045       }
7046     self.in_data["request"] = request
7047
7048   def _BuildInputData(self):
7049     """Build input data structures.
7050
7051     """
7052     self._ComputeClusterData()
7053
7054     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7055       self._AddNewInstance()
7056     else:
7057       self._AddRelocateInstance()
7058
7059     self.in_text = serializer.Dump(self.in_data)
7060
7061   def Run(self, name, validate=True, call_fn=None):
7062     """Run an instance allocator and return the results.
7063
7064     """
7065     if call_fn is None:
7066       call_fn = self.lu.rpc.call_iallocator_runner
7067
7068     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7069     result.Raise()
7070
7071     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7072       raise errors.OpExecError("Invalid result from master iallocator runner")
7073
7074     rcode, stdout, stderr, fail = result.data
7075
7076     if rcode == constants.IARUN_NOTFOUND:
7077       raise errors.OpExecError("Can't find allocator '%s'" % name)
7078     elif rcode == constants.IARUN_FAILURE:
7079       raise errors.OpExecError("Instance allocator call failed: %s,"
7080                                " output: %s" % (fail, stdout+stderr))
7081     self.out_text = stdout
7082     if validate:
7083       self._ValidateResult()
7084
7085   def _ValidateResult(self):
7086     """Process the allocator results.
7087
7088     This will process and if successful save the result in
7089     self.out_data and the other parameters.
7090
7091     """
7092     try:
7093       rdict = serializer.Load(self.out_text)
7094     except Exception, err:
7095       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7096
7097     if not isinstance(rdict, dict):
7098       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7099
7100     for key in "success", "info", "nodes":
7101       if key not in rdict:
7102         raise errors.OpExecError("Can't parse iallocator results:"
7103                                  " missing key '%s'" % key)
7104       setattr(self, key, rdict[key])
7105
7106     if not isinstance(rdict["nodes"], list):
7107       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7108                                " is not a list")
7109     self.out_data = rdict
7110
7111
7112 class LUTestAllocator(NoHooksLU):
7113   """Run allocator tests.
7114
7115   This LU runs the allocator tests
7116
7117   """
7118   _OP_REQP = ["direction", "mode", "name"]
7119
7120   def CheckPrereq(self):
7121     """Check prerequisites.
7122
7123     This checks the opcode parameters depending on the director and mode test.
7124
7125     """
7126     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7127       for attr in ["name", "mem_size", "disks", "disk_template",
7128                    "os", "tags", "nics", "vcpus"]:
7129         if not hasattr(self.op, attr):
7130           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7131                                      attr)
7132       iname = self.cfg.ExpandInstanceName(self.op.name)
7133       if iname is not None:
7134         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7135                                    iname)
7136       if not isinstance(self.op.nics, list):
7137         raise errors.OpPrereqError("Invalid parameter 'nics'")
7138       for row in self.op.nics:
7139         if (not isinstance(row, dict) or
7140             "mac" not in row or
7141             "ip" not in row or
7142             "bridge" not in row):
7143           raise errors.OpPrereqError("Invalid contents of the"
7144                                      " 'nics' parameter")
7145       if not isinstance(self.op.disks, list):
7146         raise errors.OpPrereqError("Invalid parameter 'disks'")
7147       for row in self.op.disks:
7148         if (not isinstance(row, dict) or
7149             "size" not in row or
7150             not isinstance(row["size"], int) or
7151             "mode" not in row or
7152             row["mode"] not in ['r', 'w']):
7153           raise errors.OpPrereqError("Invalid contents of the"
7154                                      " 'disks' parameter")
7155       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7156         self.op.hypervisor = self.cfg.GetHypervisorType()
7157     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7158       if not hasattr(self.op, "name"):
7159         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7160       fname = self.cfg.ExpandInstanceName(self.op.name)
7161       if fname is None:
7162         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7163                                    self.op.name)
7164       self.op.name = fname
7165       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7166     else:
7167       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7168                                  self.op.mode)
7169
7170     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7171       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7172         raise errors.OpPrereqError("Missing allocator name")
7173     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7174       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7175                                  self.op.direction)
7176
7177   def Exec(self, feedback_fn):
7178     """Run the allocator test.
7179
7180     """
7181     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7182       ial = IAllocator(self,
7183                        mode=self.op.mode,
7184                        name=self.op.name,
7185                        mem_size=self.op.mem_size,
7186                        disks=self.op.disks,
7187                        disk_template=self.op.disk_template,
7188                        os=self.op.os,
7189                        tags=self.op.tags,
7190                        nics=self.op.nics,
7191                        vcpus=self.op.vcpus,
7192                        hypervisor=self.op.hypervisor,
7193                        )
7194     else:
7195       ial = IAllocator(self,
7196                        mode=self.op.mode,
7197                        name=self.op.name,
7198                        relocate_from=list(self.relocate_from),
7199                        )
7200
7201     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7202       result = ial.in_text
7203     else:
7204       ial.Run(self.op.allocator, validate=False)
7205       result = ial.out_text
7206     return result