Merge branch 'master' into next
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import re
30 import platform
31 import logging
32 import copy
33
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
43
44
45 class LogicalUnit(object):
46   """Logical Unit base class.
47
48   Subclasses must follow these rules:
49     - implement ExpandNames
50     - implement CheckPrereq
51     - implement Exec
52     - implement BuildHooksEnv
53     - redefine HPATH and HTYPE
54     - optionally redefine their run requirements:
55         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56
57   Note that all commands require root permissions.
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_BGL = True
64
65   def __init__(self, processor, op, context, rpc):
66     """Constructor for LogicalUnit.
67
68     This needs to be overridden in derived classes in order to check op
69     validity.
70
71     """
72     self.proc = processor
73     self.op = op
74     self.cfg = context.cfg
75     self.context = context
76     self.rpc = rpc
77     # Dicts used to declare locking needs to mcpu
78     self.needed_locks = None
79     self.acquired_locks = {}
80     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
81     self.add_locks = {}
82     self.remove_locks = {}
83     # Used to force good behavior when calling helper functions
84     self.recalculate_locks = {}
85     self.__ssh = None
86     # logging
87     self.LogWarning = processor.LogWarning
88     self.LogInfo = processor.LogInfo
89
90     for attr_name in self._OP_REQP:
91       attr_val = getattr(op, attr_name, None)
92       if attr_val is None:
93         raise errors.OpPrereqError("Required parameter '%s' missing" %
94                                    attr_name)
95     self.CheckArguments()
96
97   def __GetSSH(self):
98     """Returns the SshRunner object
99
100     """
101     if not self.__ssh:
102       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
103     return self.__ssh
104
105   ssh = property(fget=__GetSSH)
106
107   def CheckArguments(self):
108     """Check syntactic validity for the opcode arguments.
109
110     This method is for doing a simple syntactic check and ensure
111     validity of opcode parameters, without any cluster-related
112     checks. While the same can be accomplished in ExpandNames and/or
113     CheckPrereq, doing these separate is better because:
114
115       - ExpandNames is left as as purely a lock-related function
116       - CheckPrereq is run after we have acquired locks (and possible
117         waited for them)
118
119     The function is allowed to change the self.op attribute so that
120     later methods can no longer worry about missing parameters.
121
122     """
123     pass
124
125   def ExpandNames(self):
126     """Expand names for this LU.
127
128     This method is called before starting to execute the opcode, and it should
129     update all the parameters of the opcode to their canonical form (e.g. a
130     short node name must be fully expanded after this method has successfully
131     completed). This way locking, hooks, logging, ecc. can work correctly.
132
133     LUs which implement this method must also populate the self.needed_locks
134     member, as a dict with lock levels as keys, and a list of needed lock names
135     as values. Rules:
136
137       - use an empty dict if you don't need any lock
138       - if you don't need any lock at a particular level omit that level
139       - don't put anything for the BGL level
140       - if you want all locks at a level use locking.ALL_SET as a value
141
142     If you need to share locks (rather than acquire them exclusively) at one
143     level you can modify self.share_locks, setting a true value (usually 1) for
144     that level. By default locks are not shared.
145
146     Examples::
147
148       # Acquire all nodes and one instance
149       self.needed_locks = {
150         locking.LEVEL_NODE: locking.ALL_SET,
151         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
152       }
153       # Acquire just two nodes
154       self.needed_locks = {
155         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
156       }
157       # Acquire no locks
158       self.needed_locks = {} # No, you can't leave it to the default value None
159
160     """
161     # The implementation of this method is mandatory only if the new LU is
162     # concurrent, so that old LUs don't need to be changed all at the same
163     # time.
164     if self.REQ_BGL:
165       self.needed_locks = {} # Exclusive LUs don't need locks.
166     else:
167       raise NotImplementedError
168
169   def DeclareLocks(self, level):
170     """Declare LU locking needs for a level
171
172     While most LUs can just declare their locking needs at ExpandNames time,
173     sometimes there's the need to calculate some locks after having acquired
174     the ones before. This function is called just before acquiring locks at a
175     particular level, but after acquiring the ones at lower levels, and permits
176     such calculations. It can be used to modify self.needed_locks, and by
177     default it does nothing.
178
179     This function is only called if you have something already set in
180     self.needed_locks for the level.
181
182     @param level: Locking level which is going to be locked
183     @type level: member of ganeti.locking.LEVELS
184
185     """
186
187   def CheckPrereq(self):
188     """Check prerequisites for this LU.
189
190     This method should check that the prerequisites for the execution
191     of this LU are fulfilled. It can do internode communication, but
192     it should be idempotent - no cluster or system changes are
193     allowed.
194
195     The method should raise errors.OpPrereqError in case something is
196     not fulfilled. Its return value is ignored.
197
198     This method should also update all the parameters of the opcode to
199     their canonical form if it hasn't been done by ExpandNames before.
200
201     """
202     raise NotImplementedError
203
204   def Exec(self, feedback_fn):
205     """Execute the LU.
206
207     This method should implement the actual work. It should raise
208     errors.OpExecError for failures that are somewhat dealt with in
209     code, or expected.
210
211     """
212     raise NotImplementedError
213
214   def BuildHooksEnv(self):
215     """Build hooks environment for this LU.
216
217     This method should return a three-node tuple consisting of: a dict
218     containing the environment that will be used for running the
219     specific hook for this LU, a list of node names on which the hook
220     should run before the execution, and a list of node names on which
221     the hook should run after the execution.
222
223     The keys of the dict must not have 'GANETI_' prefixed as this will
224     be handled in the hooks runner. Also note additional keys will be
225     added by the hooks runner. If the LU doesn't define any
226     environment, an empty dict (and not None) should be returned.
227
228     No nodes should be returned as an empty list (and not None).
229
230     Note that if the HPATH for a LU class is None, this function will
231     not be called.
232
233     """
234     raise NotImplementedError
235
236   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
237     """Notify the LU about the results of its hooks.
238
239     This method is called every time a hooks phase is executed, and notifies
240     the Logical Unit about the hooks' result. The LU can then use it to alter
241     its result based on the hooks.  By default the method does nothing and the
242     previous result is passed back unchanged but any LU can define it if it
243     wants to use the local cluster hook-scripts somehow.
244
245     @param phase: one of L{constants.HOOKS_PHASE_POST} or
246         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
247     @param hook_results: the results of the multi-node hooks rpc call
248     @param feedback_fn: function used send feedback back to the caller
249     @param lu_result: the previous Exec result this LU had, or None
250         in the PRE phase
251     @return: the new Exec result, based on the previous result
252         and hook results
253
254     """
255     return lu_result
256
257   def _ExpandAndLockInstance(self):
258     """Helper function to expand and lock an instance.
259
260     Many LUs that work on an instance take its name in self.op.instance_name
261     and need to expand it and then declare the expanded name for locking. This
262     function does it, and then updates self.op.instance_name to the expanded
263     name. It also initializes needed_locks as a dict, if this hasn't been done
264     before.
265
266     """
267     if self.needed_locks is None:
268       self.needed_locks = {}
269     else:
270       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
271         "_ExpandAndLockInstance called with instance-level locks set"
272     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
273     if expanded_name is None:
274       raise errors.OpPrereqError("Instance '%s' not known" %
275                                   self.op.instance_name)
276     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
277     self.op.instance_name = expanded_name
278
279   def _LockInstancesNodes(self, primary_only=False):
280     """Helper function to declare instances' nodes for locking.
281
282     This function should be called after locking one or more instances to lock
283     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
284     with all primary or secondary nodes for instances already locked and
285     present in self.needed_locks[locking.LEVEL_INSTANCE].
286
287     It should be called from DeclareLocks, and for safety only works if
288     self.recalculate_locks[locking.LEVEL_NODE] is set.
289
290     In the future it may grow parameters to just lock some instance's nodes, or
291     to just lock primaries or secondary nodes, if needed.
292
293     If should be called in DeclareLocks in a way similar to::
294
295       if level == locking.LEVEL_NODE:
296         self._LockInstancesNodes()
297
298     @type primary_only: boolean
299     @param primary_only: only lock primary nodes of locked instances
300
301     """
302     assert locking.LEVEL_NODE in self.recalculate_locks, \
303       "_LockInstancesNodes helper function called with no nodes to recalculate"
304
305     # TODO: check if we're really been called with the instance locks held
306
307     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
308     # future we might want to have different behaviors depending on the value
309     # of self.recalculate_locks[locking.LEVEL_NODE]
310     wanted_nodes = []
311     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
312       instance = self.context.cfg.GetInstanceInfo(instance_name)
313       wanted_nodes.append(instance.primary_node)
314       if not primary_only:
315         wanted_nodes.extend(instance.secondary_nodes)
316
317     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
318       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
319     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
320       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
321
322     del self.recalculate_locks[locking.LEVEL_NODE]
323
324
325 class NoHooksLU(LogicalUnit):
326   """Simple LU which runs no hooks.
327
328   This LU is intended as a parent for other LogicalUnits which will
329   run no hooks, in order to reduce duplicate code.
330
331   """
332   HPATH = None
333   HTYPE = None
334
335
336 def _GetWantedNodes(lu, nodes):
337   """Returns list of checked and expanded node names.
338
339   @type lu: L{LogicalUnit}
340   @param lu: the logical unit on whose behalf we execute
341   @type nodes: list
342   @param nodes: list of node names or None for all nodes
343   @rtype: list
344   @return: the list of nodes, sorted
345   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
346
347   """
348   if not isinstance(nodes, list):
349     raise errors.OpPrereqError("Invalid argument type 'nodes'")
350
351   if not nodes:
352     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
353       " non-empty list of nodes whose name is to be expanded.")
354
355   wanted = []
356   for name in nodes:
357     node = lu.cfg.ExpandNodeName(name)
358     if node is None:
359       raise errors.OpPrereqError("No such node name '%s'" % name)
360     wanted.append(node)
361
362   return utils.NiceSort(wanted)
363
364
365 def _GetWantedInstances(lu, instances):
366   """Returns list of checked and expanded instance names.
367
368   @type lu: L{LogicalUnit}
369   @param lu: the logical unit on whose behalf we execute
370   @type instances: list
371   @param instances: list of instance names or None for all instances
372   @rtype: list
373   @return: the list of instances, sorted
374   @raise errors.OpPrereqError: if the instances parameter is wrong type
375   @raise errors.OpPrereqError: if any of the passed instances is not found
376
377   """
378   if not isinstance(instances, list):
379     raise errors.OpPrereqError("Invalid argument type 'instances'")
380
381   if instances:
382     wanted = []
383
384     for name in instances:
385       instance = lu.cfg.ExpandInstanceName(name)
386       if instance is None:
387         raise errors.OpPrereqError("No such instance name '%s'" % name)
388       wanted.append(instance)
389
390   else:
391     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
392   return wanted
393
394
395 def _CheckOutputFields(static, dynamic, selected):
396   """Checks whether all selected fields are valid.
397
398   @type static: L{utils.FieldSet}
399   @param static: static fields set
400   @type dynamic: L{utils.FieldSet}
401   @param dynamic: dynamic fields set
402
403   """
404   f = utils.FieldSet()
405   f.Extend(static)
406   f.Extend(dynamic)
407
408   delta = f.NonMatching(selected)
409   if delta:
410     raise errors.OpPrereqError("Unknown output fields selected: %s"
411                                % ",".join(delta))
412
413
414 def _CheckBooleanOpField(op, name):
415   """Validates boolean opcode parameters.
416
417   This will ensure that an opcode parameter is either a boolean value,
418   or None (but that it always exists).
419
420   """
421   val = getattr(op, name, None)
422   if not (val is None or isinstance(val, bool)):
423     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
424                                (name, str(val)))
425   setattr(op, name, val)
426
427
428 def _CheckNodeOnline(lu, node):
429   """Ensure that a given node is online.
430
431   @param lu: the LU on behalf of which we make the check
432   @param node: the node to check
433   @raise errors.OpPrereqError: if the node is offline
434
435   """
436   if lu.cfg.GetNodeInfo(node).offline:
437     raise errors.OpPrereqError("Can't use offline node %s" % node)
438
439
440 def _CheckNodeNotDrained(lu, node):
441   """Ensure that a given node is not drained.
442
443   @param lu: the LU on behalf of which we make the check
444   @param node: the node to check
445   @raise errors.OpPrereqError: if the node is drained
446
447   """
448   if lu.cfg.GetNodeInfo(node).drained:
449     raise errors.OpPrereqError("Can't use drained node %s" % node)
450
451
452 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
453                           memory, vcpus, nics, disk_template, disks,
454                           bep, hvp, hypervisor_name):
455   """Builds instance related env variables for hooks
456
457   This builds the hook environment from individual variables.
458
459   @type name: string
460   @param name: the name of the instance
461   @type primary_node: string
462   @param primary_node: the name of the instance's primary node
463   @type secondary_nodes: list
464   @param secondary_nodes: list of secondary nodes as strings
465   @type os_type: string
466   @param os_type: the name of the instance's OS
467   @type status: boolean
468   @param status: the should_run status of the instance
469   @type memory: string
470   @param memory: the memory size of the instance
471   @type vcpus: string
472   @param vcpus: the count of VCPUs the instance has
473   @type nics: list
474   @param nics: list of tuples (ip, bridge, mac) representing
475       the NICs the instance  has
476   @type disk_template: string
477   @param disk_template: the disk template of the instance
478   @type disks: list
479   @param disks: the list of (size, mode) pairs
480   @type bep: dict
481   @param bep: the backend parameters for the instance
482   @type hvp: dict
483   @param hvp: the hypervisor parameters for the instance
484   @type hypervisor_name: string
485   @param hypervisor_name: the hypervisor for the instance
486   @rtype: dict
487   @return: the hook environment for this instance
488
489   """
490   if status:
491     str_status = "up"
492   else:
493     str_status = "down"
494   env = {
495     "OP_TARGET": name,
496     "INSTANCE_NAME": name,
497     "INSTANCE_PRIMARY": primary_node,
498     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
499     "INSTANCE_OS_TYPE": os_type,
500     "INSTANCE_STATUS": str_status,
501     "INSTANCE_MEMORY": memory,
502     "INSTANCE_VCPUS": vcpus,
503     "INSTANCE_DISK_TEMPLATE": disk_template,
504     "INSTANCE_HYPERVISOR": hypervisor_name,
505   }
506
507   if nics:
508     nic_count = len(nics)
509     for idx, (ip, bridge, mac) in enumerate(nics):
510       if ip is None:
511         ip = ""
512       env["INSTANCE_NIC%d_IP" % idx] = ip
513       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
514       env["INSTANCE_NIC%d_MAC" % idx] = mac
515   else:
516     nic_count = 0
517
518   env["INSTANCE_NIC_COUNT"] = nic_count
519
520   if disks:
521     disk_count = len(disks)
522     for idx, (size, mode) in enumerate(disks):
523       env["INSTANCE_DISK%d_SIZE" % idx] = size
524       env["INSTANCE_DISK%d_MODE" % idx] = mode
525   else:
526     disk_count = 0
527
528   env["INSTANCE_DISK_COUNT"] = disk_count
529
530   for source, kind in [(bep, "BE"), (hvp, "HV")]:
531     for key, value in source.items():
532       env["INSTANCE_%s_%s" % (kind, key)] = value
533
534   return env
535
536
537 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
538   """Builds instance related env variables for hooks from an object.
539
540   @type lu: L{LogicalUnit}
541   @param lu: the logical unit on whose behalf we execute
542   @type instance: L{objects.Instance}
543   @param instance: the instance for which we should build the
544       environment
545   @type override: dict
546   @param override: dictionary with key/values that will override
547       our values
548   @rtype: dict
549   @return: the hook environment dictionary
550
551   """
552   cluster = lu.cfg.GetClusterInfo()
553   bep = cluster.FillBE(instance)
554   hvp = cluster.FillHV(instance)
555   args = {
556     'name': instance.name,
557     'primary_node': instance.primary_node,
558     'secondary_nodes': instance.secondary_nodes,
559     'os_type': instance.os,
560     'status': instance.admin_up,
561     'memory': bep[constants.BE_MEMORY],
562     'vcpus': bep[constants.BE_VCPUS],
563     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
564     'disk_template': instance.disk_template,
565     'disks': [(disk.size, disk.mode) for disk in instance.disks],
566     'bep': bep,
567     'hvp': hvp,
568     'hypervisor_name': instance.hypervisor,
569   }
570   if override:
571     args.update(override)
572   return _BuildInstanceHookEnv(**args)
573
574
575 def _AdjustCandidatePool(lu):
576   """Adjust the candidate pool after node operations.
577
578   """
579   mod_list = lu.cfg.MaintainCandidatePool()
580   if mod_list:
581     lu.LogInfo("Promoted nodes to master candidate role: %s",
582                ", ".join(node.name for node in mod_list))
583     for name in mod_list:
584       lu.context.ReaddNode(name)
585   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
586   if mc_now > mc_max:
587     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
588                (mc_now, mc_max))
589
590
591 def _CheckInstanceBridgesExist(lu, instance):
592   """Check that the bridges needed by an instance exist.
593
594   """
595   # check bridges existence
596   brlist = [nic.bridge for nic in instance.nics]
597   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
598   result.Raise()
599   if not result.data:
600     raise errors.OpPrereqError("One or more target bridges %s does not"
601                                " exist on destination node '%s'" %
602                                (brlist, instance.primary_node))
603
604
605 class LUDestroyCluster(NoHooksLU):
606   """Logical unit for destroying the cluster.
607
608   """
609   _OP_REQP = []
610
611   def CheckPrereq(self):
612     """Check prerequisites.
613
614     This checks whether the cluster is empty.
615
616     Any errors are signaled by raising errors.OpPrereqError.
617
618     """
619     master = self.cfg.GetMasterNode()
620
621     nodelist = self.cfg.GetNodeList()
622     if len(nodelist) != 1 or nodelist[0] != master:
623       raise errors.OpPrereqError("There are still %d node(s) in"
624                                  " this cluster." % (len(nodelist) - 1))
625     instancelist = self.cfg.GetInstanceList()
626     if instancelist:
627       raise errors.OpPrereqError("There are still %d instance(s) in"
628                                  " this cluster." % len(instancelist))
629
630   def Exec(self, feedback_fn):
631     """Destroys the cluster.
632
633     """
634     master = self.cfg.GetMasterNode()
635     result = self.rpc.call_node_stop_master(master, False)
636     result.Raise()
637     if not result.data:
638       raise errors.OpExecError("Could not disable the master role")
639     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
640     utils.CreateBackup(priv_key)
641     utils.CreateBackup(pub_key)
642     return master
643
644
645 class LUVerifyCluster(LogicalUnit):
646   """Verifies the cluster status.
647
648   """
649   HPATH = "cluster-verify"
650   HTYPE = constants.HTYPE_CLUSTER
651   _OP_REQP = ["skip_checks"]
652   REQ_BGL = False
653
654   def ExpandNames(self):
655     self.needed_locks = {
656       locking.LEVEL_NODE: locking.ALL_SET,
657       locking.LEVEL_INSTANCE: locking.ALL_SET,
658     }
659     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
660
661   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
662                   node_result, feedback_fn, master_files,
663                   drbd_map, vg_name):
664     """Run multiple tests against a node.
665
666     Test list:
667
668       - compares ganeti version
669       - checks vg existence and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     @type nodeinfo: L{objects.Node}
674     @param nodeinfo: the node to check
675     @param file_list: required list of files
676     @param local_cksum: dictionary of local files and their checksums
677     @param node_result: the results from the node
678     @param feedback_fn: function used to accumulate results
679     @param master_files: list of files that only masters should have
680     @param drbd_map: the useddrbd minors for this node, in
681         form of minor: (instance, must_exist) which correspond to instances
682         and their running status
683     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
684
685     """
686     node = nodeinfo.name
687
688     # main result, node_result should be a non-empty dict
689     if not node_result or not isinstance(node_result, dict):
690       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
691       return True
692
693     # compares ganeti version
694     local_version = constants.PROTOCOL_VERSION
695     remote_version = node_result.get('version', None)
696     if not (remote_version and isinstance(remote_version, (list, tuple)) and
697             len(remote_version) == 2):
698       feedback_fn("  - ERROR: connection to %s failed" % (node))
699       return True
700
701     if local_version != remote_version[0]:
702       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
703                   " node %s %s" % (local_version, node, remote_version[0]))
704       return True
705
706     # node seems compatible, we can actually try to look into its results
707
708     bad = False
709
710     # full package version
711     if constants.RELEASE_VERSION != remote_version[1]:
712       feedback_fn("  - WARNING: software version mismatch: master %s,"
713                   " node %s %s" %
714                   (constants.RELEASE_VERSION, node, remote_version[1]))
715
716     # checks vg existence and size > 20G
717     if vg_name is not None:
718       vglist = node_result.get(constants.NV_VGLIST, None)
719       if not vglist:
720         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
721                         (node,))
722         bad = True
723       else:
724         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
725                                               constants.MIN_VG_SIZE)
726         if vgstatus:
727           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
728           bad = True
729
730     # checks config file checksum
731
732     remote_cksum = node_result.get(constants.NV_FILELIST, None)
733     if not isinstance(remote_cksum, dict):
734       bad = True
735       feedback_fn("  - ERROR: node hasn't returned file checksum data")
736     else:
737       for file_name in file_list:
738         node_is_mc = nodeinfo.master_candidate
739         must_have_file = file_name not in master_files
740         if file_name not in remote_cksum:
741           if node_is_mc or must_have_file:
742             bad = True
743             feedback_fn("  - ERROR: file '%s' missing" % file_name)
744         elif remote_cksum[file_name] != local_cksum[file_name]:
745           if node_is_mc or must_have_file:
746             bad = True
747             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
748           else:
749             # not candidate and this is not a must-have file
750             bad = True
751             feedback_fn("  - ERROR: file '%s' should not exist on non master"
752                         " candidates (and the file is outdated)" % file_name)
753         else:
754           # all good, except non-master/non-must have combination
755           if not node_is_mc and not must_have_file:
756             feedback_fn("  - ERROR: file '%s' should not exist on non master"
757                         " candidates" % file_name)
758
759     # checks ssh to any
760
761     if constants.NV_NODELIST not in node_result:
762       bad = True
763       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
764     else:
765       if node_result[constants.NV_NODELIST]:
766         bad = True
767         for node in node_result[constants.NV_NODELIST]:
768           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
769                           (node, node_result[constants.NV_NODELIST][node]))
770
771     if constants.NV_NODENETTEST not in node_result:
772       bad = True
773       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
774     else:
775       if node_result[constants.NV_NODENETTEST]:
776         bad = True
777         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
778         for node in nlist:
779           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
780                           (node, node_result[constants.NV_NODENETTEST][node]))
781
782     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
783     if isinstance(hyp_result, dict):
784       for hv_name, hv_result in hyp_result.iteritems():
785         if hv_result is not None:
786           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
787                       (hv_name, hv_result))
788
789     # check used drbd list
790     if vg_name is not None:
791       used_minors = node_result.get(constants.NV_DRBDLIST, [])
792       if not isinstance(used_minors, (tuple, list)):
793         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
794                     str(used_minors))
795       else:
796         for minor, (iname, must_exist) in drbd_map.items():
797           if minor not in used_minors and must_exist:
798             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
799                         " not active" % (minor, iname))
800             bad = True
801         for minor in used_minors:
802           if minor not in drbd_map:
803             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
804                         minor)
805             bad = True
806
807     return bad
808
809   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
810                       node_instance, feedback_fn, n_offline):
811     """Verify an instance.
812
813     This function checks to see if the required block devices are
814     available on the instance's node.
815
816     """
817     bad = False
818
819     node_current = instanceconfig.primary_node
820
821     node_vol_should = {}
822     instanceconfig.MapLVsByNode(node_vol_should)
823
824     for node in node_vol_should:
825       if node in n_offline:
826         # ignore missing volumes on offline nodes
827         continue
828       for volume in node_vol_should[node]:
829         if node not in node_vol_is or volume not in node_vol_is[node]:
830           feedback_fn("  - ERROR: volume %s missing on node %s" %
831                           (volume, node))
832           bad = True
833
834     if instanceconfig.admin_up:
835       if ((node_current not in node_instance or
836           not instance in node_instance[node_current]) and
837           node_current not in n_offline):
838         feedback_fn("  - ERROR: instance %s not running on node %s" %
839                         (instance, node_current))
840         bad = True
841
842     for node in node_instance:
843       if (not node == node_current):
844         if instance in node_instance[node]:
845           feedback_fn("  - ERROR: instance %s should not run on node %s" %
846                           (instance, node))
847           bad = True
848
849     return bad
850
851   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
852     """Verify if there are any unknown volumes in the cluster.
853
854     The .os, .swap and backup volumes are ignored. All other volumes are
855     reported as unknown.
856
857     """
858     bad = False
859
860     for node in node_vol_is:
861       for volume in node_vol_is[node]:
862         if node not in node_vol_should or volume not in node_vol_should[node]:
863           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
864                       (volume, node))
865           bad = True
866     return bad
867
868   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
869     """Verify the list of running instances.
870
871     This checks what instances are running but unknown to the cluster.
872
873     """
874     bad = False
875     for node in node_instance:
876       for runninginstance in node_instance[node]:
877         if runninginstance not in instancelist:
878           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
879                           (runninginstance, node))
880           bad = True
881     return bad
882
883   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
884     """Verify N+1 Memory Resilience.
885
886     Check that if one single node dies we can still start all the instances it
887     was primary for.
888
889     """
890     bad = False
891
892     for node, nodeinfo in node_info.iteritems():
893       # This code checks that every node which is now listed as secondary has
894       # enough memory to host all instances it is supposed to should a single
895       # other node in the cluster fail.
896       # FIXME: not ready for failover to an arbitrary node
897       # FIXME: does not support file-backed instances
898       # WARNING: we currently take into account down instances as well as up
899       # ones, considering that even if they're down someone might want to start
900       # them even in the event of a node failure.
901       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
902         needed_mem = 0
903         for instance in instances:
904           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
905           if bep[constants.BE_AUTO_BALANCE]:
906             needed_mem += bep[constants.BE_MEMORY]
907         if nodeinfo['mfree'] < needed_mem:
908           feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
909                       " failovers should node %s fail" % (node, prinode))
910           bad = True
911     return bad
912
913   def CheckPrereq(self):
914     """Check prerequisites.
915
916     Transform the list of checks we're going to skip into a set and check that
917     all its members are valid.
918
919     """
920     self.skip_set = frozenset(self.op.skip_checks)
921     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
922       raise errors.OpPrereqError("Invalid checks to be skipped specified")
923
924   def BuildHooksEnv(self):
925     """Build hooks env.
926
927     Cluster-Verify hooks just ran in the post phase and their failure makes
928     the output be logged in the verify output and the verification to fail.
929
930     """
931     all_nodes = self.cfg.GetNodeList()
932     env = {
933       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
934       }
935     for node in self.cfg.GetAllNodesInfo().values():
936       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
937
938     return env, [], all_nodes
939
940   def Exec(self, feedback_fn):
941     """Verify integrity of cluster, performing various test on nodes.
942
943     """
944     bad = False
945     feedback_fn("* Verifying global settings")
946     for msg in self.cfg.VerifyConfig():
947       feedback_fn("  - ERROR: %s" % msg)
948
949     vg_name = self.cfg.GetVGName()
950     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
951     nodelist = utils.NiceSort(self.cfg.GetNodeList())
952     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
953     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
954     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
955                         for iname in instancelist)
956     i_non_redundant = [] # Non redundant instances
957     i_non_a_balanced = [] # Non auto-balanced instances
958     n_offline = [] # List of offline nodes
959     n_drained = [] # List of nodes being drained
960     node_volume = {}
961     node_instance = {}
962     node_info = {}
963     instance_cfg = {}
964
965     # FIXME: verify OS list
966     # do local checksums
967     master_files = [constants.CLUSTER_CONF_FILE]
968
969     file_names = ssconf.SimpleStore().GetFileList()
970     file_names.append(constants.SSL_CERT_FILE)
971     file_names.append(constants.RAPI_CERT_FILE)
972     file_names.extend(master_files)
973
974     local_checksums = utils.FingerprintFiles(file_names)
975
976     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
977     node_verify_param = {
978       constants.NV_FILELIST: file_names,
979       constants.NV_NODELIST: [node.name for node in nodeinfo
980                               if not node.offline],
981       constants.NV_HYPERVISOR: hypervisors,
982       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
983                                   node.secondary_ip) for node in nodeinfo
984                                  if not node.offline],
985       constants.NV_INSTANCELIST: hypervisors,
986       constants.NV_VERSION: None,
987       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
988       }
989     if vg_name is not None:
990       node_verify_param[constants.NV_VGLIST] = None
991       node_verify_param[constants.NV_LVLIST] = vg_name
992       node_verify_param[constants.NV_DRBDLIST] = None
993     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
994                                            self.cfg.GetClusterName())
995
996     cluster = self.cfg.GetClusterInfo()
997     master_node = self.cfg.GetMasterNode()
998     all_drbd_map = self.cfg.ComputeDRBDMap()
999
1000     for node_i in nodeinfo:
1001       node = node_i.name
1002       nresult = all_nvinfo[node].data
1003
1004       if node_i.offline:
1005         feedback_fn("* Skipping offline node %s" % (node,))
1006         n_offline.append(node)
1007         continue
1008
1009       if node == master_node:
1010         ntype = "master"
1011       elif node_i.master_candidate:
1012         ntype = "master candidate"
1013       elif node_i.drained:
1014         ntype = "drained"
1015         n_drained.append(node)
1016       else:
1017         ntype = "regular"
1018       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1019
1020       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1021         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1022         bad = True
1023         continue
1024
1025       node_drbd = {}
1026       for minor, instance in all_drbd_map[node].items():
1027         if instance not in instanceinfo:
1028           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1029                       instance)
1030           # ghost instance should not be running, but otherwise we
1031           # don't give double warnings (both ghost instance and
1032           # unallocated minor in use)
1033           node_drbd[minor] = (instance, False)
1034         else:
1035           instance = instanceinfo[instance]
1036           node_drbd[minor] = (instance.name, instance.admin_up)
1037       result = self._VerifyNode(node_i, file_names, local_checksums,
1038                                 nresult, feedback_fn, master_files,
1039                                 node_drbd, vg_name)
1040       bad = bad or result
1041
1042       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1043       if vg_name is None:
1044         node_volume[node] = {}
1045       elif isinstance(lvdata, basestring):
1046         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1047                     (node, utils.SafeEncode(lvdata)))
1048         bad = True
1049         node_volume[node] = {}
1050       elif not isinstance(lvdata, dict):
1051         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1052         bad = True
1053         continue
1054       else:
1055         node_volume[node] = lvdata
1056
1057       # node_instance
1058       idata = nresult.get(constants.NV_INSTANCELIST, None)
1059       if not isinstance(idata, list):
1060         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1061                     (node,))
1062         bad = True
1063         continue
1064
1065       node_instance[node] = idata
1066
1067       # node_info
1068       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1069       if not isinstance(nodeinfo, dict):
1070         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1071         bad = True
1072         continue
1073
1074       try:
1075         node_info[node] = {
1076           "mfree": int(nodeinfo['memory_free']),
1077           "pinst": [],
1078           "sinst": [],
1079           # dictionary holding all instances this node is secondary for,
1080           # grouped by their primary node. Each key is a cluster node, and each
1081           # value is a list of instances which have the key as primary and the
1082           # current node as secondary.  this is handy to calculate N+1 memory
1083           # availability if you can only failover from a primary to its
1084           # secondary.
1085           "sinst-by-pnode": {},
1086         }
1087         # FIXME: devise a free space model for file based instances as well
1088         if vg_name is not None:
1089           if (constants.NV_VGLIST not in nresult or
1090               vg_name not in nresult[constants.NV_VGLIST]):
1091             feedback_fn("  - ERROR: node %s didn't return data for the"
1092                         " volume group '%s' - it is either missing or broken" %
1093                         (node, vg_name))
1094             bad = True
1095             continue
1096           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1097       except (ValueError, KeyError):
1098         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1099                     " from node %s" % (node,))
1100         bad = True
1101         continue
1102
1103     node_vol_should = {}
1104
1105     for instance in instancelist:
1106       feedback_fn("* Verifying instance %s" % instance)
1107       inst_config = instanceinfo[instance]
1108       result =  self._VerifyInstance(instance, inst_config, node_volume,
1109                                      node_instance, feedback_fn, n_offline)
1110       bad = bad or result
1111       inst_nodes_offline = []
1112
1113       inst_config.MapLVsByNode(node_vol_should)
1114
1115       instance_cfg[instance] = inst_config
1116
1117       pnode = inst_config.primary_node
1118       if pnode in node_info:
1119         node_info[pnode]['pinst'].append(instance)
1120       elif pnode not in n_offline:
1121         feedback_fn("  - ERROR: instance %s, connection to primary node"
1122                     " %s failed" % (instance, pnode))
1123         bad = True
1124
1125       if pnode in n_offline:
1126         inst_nodes_offline.append(pnode)
1127
1128       # If the instance is non-redundant we cannot survive losing its primary
1129       # node, so we are not N+1 compliant. On the other hand we have no disk
1130       # templates with more than one secondary so that situation is not well
1131       # supported either.
1132       # FIXME: does not support file-backed instances
1133       if len(inst_config.secondary_nodes) == 0:
1134         i_non_redundant.append(instance)
1135       elif len(inst_config.secondary_nodes) > 1:
1136         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1137                     % instance)
1138
1139       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1140         i_non_a_balanced.append(instance)
1141
1142       for snode in inst_config.secondary_nodes:
1143         if snode in node_info:
1144           node_info[snode]['sinst'].append(instance)
1145           if pnode not in node_info[snode]['sinst-by-pnode']:
1146             node_info[snode]['sinst-by-pnode'][pnode] = []
1147           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1148         elif snode not in n_offline:
1149           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1150                       " %s failed" % (instance, snode))
1151           bad = True
1152         if snode in n_offline:
1153           inst_nodes_offline.append(snode)
1154
1155       if inst_nodes_offline:
1156         # warn that the instance lives on offline nodes, and set bad=True
1157         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1158                     ", ".join(inst_nodes_offline))
1159         bad = True
1160
1161     feedback_fn("* Verifying orphan volumes")
1162     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1163                                        feedback_fn)
1164     bad = bad or result
1165
1166     feedback_fn("* Verifying remaining instances")
1167     result = self._VerifyOrphanInstances(instancelist, node_instance,
1168                                          feedback_fn)
1169     bad = bad or result
1170
1171     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1172       feedback_fn("* Verifying N+1 Memory redundancy")
1173       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1174       bad = bad or result
1175
1176     feedback_fn("* Other Notes")
1177     if i_non_redundant:
1178       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1179                   % len(i_non_redundant))
1180
1181     if i_non_a_balanced:
1182       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1183                   % len(i_non_a_balanced))
1184
1185     if n_offline:
1186       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1187
1188     if n_drained:
1189       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1190
1191     return not bad
1192
1193   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1194     """Analyze the post-hooks' result
1195
1196     This method analyses the hook result, handles it, and sends some
1197     nicely-formatted feedback back to the user.
1198
1199     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1200         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1201     @param hooks_results: the results of the multi-node hooks rpc call
1202     @param feedback_fn: function used send feedback back to the caller
1203     @param lu_result: previous Exec result
1204     @return: the new Exec result, based on the previous result
1205         and hook results
1206
1207     """
1208     # We only really run POST phase hooks, and are only interested in
1209     # their results
1210     if phase == constants.HOOKS_PHASE_POST:
1211       # Used to change hooks' output to proper indentation
1212       indent_re = re.compile('^', re.M)
1213       feedback_fn("* Hooks Results")
1214       if not hooks_results:
1215         feedback_fn("  - ERROR: general communication failure")
1216         lu_result = 1
1217       else:
1218         for node_name in hooks_results:
1219           show_node_header = True
1220           res = hooks_results[node_name]
1221           if res.failed or res.data is False or not isinstance(res.data, list):
1222             if res.offline:
1223               # no need to warn or set fail return value
1224               continue
1225             feedback_fn("    Communication failure in hooks execution")
1226             lu_result = 1
1227             continue
1228           for script, hkr, output in res.data:
1229             if hkr == constants.HKR_FAIL:
1230               # The node header is only shown once, if there are
1231               # failing hooks on that node
1232               if show_node_header:
1233                 feedback_fn("  Node %s:" % node_name)
1234                 show_node_header = False
1235               feedback_fn("    ERROR: Script %s failed, output:" % script)
1236               output = indent_re.sub('      ', output)
1237               feedback_fn("%s" % output)
1238               lu_result = 1
1239
1240       return lu_result
1241
1242
1243 class LUVerifyDisks(NoHooksLU):
1244   """Verifies the cluster disks status.
1245
1246   """
1247   _OP_REQP = []
1248   REQ_BGL = False
1249
1250   def ExpandNames(self):
1251     self.needed_locks = {
1252       locking.LEVEL_NODE: locking.ALL_SET,
1253       locking.LEVEL_INSTANCE: locking.ALL_SET,
1254     }
1255     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1256
1257   def CheckPrereq(self):
1258     """Check prerequisites.
1259
1260     This has no prerequisites.
1261
1262     """
1263     pass
1264
1265   def Exec(self, feedback_fn):
1266     """Verify integrity of cluster disks.
1267
1268     """
1269     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1270
1271     vg_name = self.cfg.GetVGName()
1272     nodes = utils.NiceSort(self.cfg.GetNodeList())
1273     instances = [self.cfg.GetInstanceInfo(name)
1274                  for name in self.cfg.GetInstanceList()]
1275
1276     nv_dict = {}
1277     for inst in instances:
1278       inst_lvs = {}
1279       if (not inst.admin_up or
1280           inst.disk_template not in constants.DTS_NET_MIRROR):
1281         continue
1282       inst.MapLVsByNode(inst_lvs)
1283       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1284       for node, vol_list in inst_lvs.iteritems():
1285         for vol in vol_list:
1286           nv_dict[(node, vol)] = inst
1287
1288     if not nv_dict:
1289       return result
1290
1291     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1292
1293     for node in nodes:
1294       # node_volume
1295       lvs = node_lvs[node]
1296       if lvs.failed:
1297         if not lvs.offline:
1298           self.LogWarning("Connection to node %s failed: %s" %
1299                           (node, lvs.data))
1300         continue
1301       lvs = lvs.data
1302       if isinstance(lvs, basestring):
1303         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1304         res_nlvm[node] = lvs
1305         continue
1306       elif not isinstance(lvs, dict):
1307         logging.warning("Connection to node %s failed or invalid data"
1308                         " returned", node)
1309         res_nodes.append(node)
1310         continue
1311
1312       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1313         inst = nv_dict.pop((node, lv_name), None)
1314         if (not lv_online and inst is not None
1315             and inst.name not in res_instances):
1316           res_instances.append(inst.name)
1317
1318     # any leftover items in nv_dict are missing LVs, let's arrange the
1319     # data better
1320     for key, inst in nv_dict.iteritems():
1321       if inst.name not in res_missing:
1322         res_missing[inst.name] = []
1323       res_missing[inst.name].append(key)
1324
1325     return result
1326
1327
1328 class LURepairDiskSizes(NoHooksLU):
1329   """Verifies the cluster disks sizes.
1330
1331   """
1332   _OP_REQP = ["instances"]
1333   REQ_BGL = False
1334
1335   def ExpandNames(self):
1336
1337     if not isinstance(self.op.instances, list):
1338       raise errors.OpPrereqError("Invalid argument type 'instances'")
1339
1340     if self.op.instances:
1341       self.wanted_names = []
1342       for name in self.op.instances:
1343         full_name = self.cfg.ExpandInstanceName(name)
1344         if full_name is None:
1345           raise errors.OpPrereqError("Instance '%s' not known" % name)
1346         self.wanted_names.append(full_name)
1347       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
1348       self.needed_locks = {
1349         locking.LEVEL_NODE: [],
1350         locking.LEVEL_INSTANCE: self.wanted_names,
1351         }
1352       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1353     else:
1354       self.wanted_names = None
1355       self.needed_locks = {
1356         locking.LEVEL_NODE: locking.ALL_SET,
1357         locking.LEVEL_INSTANCE: locking.ALL_SET,
1358         }
1359     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1360
1361   def DeclareLocks(self, level):
1362     if level == locking.LEVEL_NODE and self.wanted_names is not None:
1363       self._LockInstancesNodes(primary_only=True)
1364
1365   def CheckPrereq(self):
1366     """Check prerequisites.
1367
1368     This only checks the optional instance list against the existing names.
1369
1370     """
1371     if self.wanted_names is None:
1372       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1373
1374     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1375                              in self.wanted_names]
1376
1377   def Exec(self, feedback_fn):
1378     """Verify the size of cluster disks.
1379
1380     """
1381     # TODO: check child disks too
1382     # TODO: check differences in size between primary/secondary nodes
1383     per_node_disks = {}
1384     for instance in self.wanted_instances:
1385       pnode = instance.primary_node
1386       if pnode not in per_node_disks:
1387         per_node_disks[pnode] = []
1388       for idx, disk in enumerate(instance.disks):
1389         per_node_disks[pnode].append((instance, idx, disk))
1390
1391     changed = []
1392     for node, dskl in per_node_disks.items():
1393       result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl])
1394       if result.failed:
1395         self.LogWarning("Failure in blockdev_getsizes call to node"
1396                         " %s, ignoring", node)
1397         continue
1398       if len(result.data) != len(dskl):
1399         self.LogWarning("Invalid result from node %s, ignoring node results",
1400                         node)
1401         continue
1402       for ((instance, idx, disk), size) in zip(dskl, result.data):
1403         if size is None:
1404           self.LogWarning("Disk %d of instance %s did not return size"
1405                           " information, ignoring", idx, instance.name)
1406           continue
1407         if not isinstance(size, (int, long)):
1408           self.LogWarning("Disk %d of instance %s did not return valid"
1409                           " size information, ignoring", idx, instance.name)
1410           continue
1411         size = size >> 20
1412         if size != disk.size:
1413           self.LogInfo("Disk %d of instance %s has mismatched size,"
1414                        " correcting: recorded %d, actual %d", idx,
1415                        instance.name, disk.size, size)
1416           disk.size = size
1417           self.cfg.Update(instance)
1418           changed.append((instance.name, idx, size))
1419     return changed
1420
1421
1422 class LURenameCluster(LogicalUnit):
1423   """Rename the cluster.
1424
1425   """
1426   HPATH = "cluster-rename"
1427   HTYPE = constants.HTYPE_CLUSTER
1428   _OP_REQP = ["name"]
1429
1430   def BuildHooksEnv(self):
1431     """Build hooks env.
1432
1433     """
1434     env = {
1435       "OP_TARGET": self.cfg.GetClusterName(),
1436       "NEW_NAME": self.op.name,
1437       }
1438     mn = self.cfg.GetMasterNode()
1439     return env, [mn], [mn]
1440
1441   def CheckPrereq(self):
1442     """Verify that the passed name is a valid one.
1443
1444     """
1445     hostname = utils.HostInfo(self.op.name)
1446
1447     new_name = hostname.name
1448     self.ip = new_ip = hostname.ip
1449     old_name = self.cfg.GetClusterName()
1450     old_ip = self.cfg.GetMasterIP()
1451     if new_name == old_name and new_ip == old_ip:
1452       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1453                                  " cluster has changed")
1454     if new_ip != old_ip:
1455       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1456         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1457                                    " reachable on the network. Aborting." %
1458                                    new_ip)
1459
1460     self.op.name = new_name
1461
1462   def Exec(self, feedback_fn):
1463     """Rename the cluster.
1464
1465     """
1466     clustername = self.op.name
1467     ip = self.ip
1468
1469     # shutdown the master IP
1470     master = self.cfg.GetMasterNode()
1471     result = self.rpc.call_node_stop_master(master, False)
1472     if result.failed or not result.data:
1473       raise errors.OpExecError("Could not disable the master role")
1474
1475     try:
1476       cluster = self.cfg.GetClusterInfo()
1477       cluster.cluster_name = clustername
1478       cluster.master_ip = ip
1479       self.cfg.Update(cluster)
1480
1481       # update the known hosts file
1482       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1483       node_list = self.cfg.GetNodeList()
1484       try:
1485         node_list.remove(master)
1486       except ValueError:
1487         pass
1488       result = self.rpc.call_upload_file(node_list,
1489                                          constants.SSH_KNOWN_HOSTS_FILE)
1490       for to_node, to_result in result.iteritems():
1491         if to_result.failed or not to_result.data:
1492           logging.error("Copy of file %s to node %s failed",
1493                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1494
1495     finally:
1496       result = self.rpc.call_node_start_master(master, False, False)
1497       if result.failed or not result.data:
1498         self.LogWarning("Could not re-enable the master role on"
1499                         " the master, please restart manually.")
1500
1501
1502 def _RecursiveCheckIfLVMBased(disk):
1503   """Check if the given disk or its children are lvm-based.
1504
1505   @type disk: L{objects.Disk}
1506   @param disk: the disk to check
1507   @rtype: boolean
1508   @return: boolean indicating whether a LD_LV dev_type was found or not
1509
1510   """
1511   if disk.children:
1512     for chdisk in disk.children:
1513       if _RecursiveCheckIfLVMBased(chdisk):
1514         return True
1515   return disk.dev_type == constants.LD_LV
1516
1517
1518 class LUSetClusterParams(LogicalUnit):
1519   """Change the parameters of the cluster.
1520
1521   """
1522   HPATH = "cluster-modify"
1523   HTYPE = constants.HTYPE_CLUSTER
1524   _OP_REQP = []
1525   REQ_BGL = False
1526
1527   def CheckArguments(self):
1528     """Check parameters
1529
1530     """
1531     if not hasattr(self.op, "candidate_pool_size"):
1532       self.op.candidate_pool_size = None
1533     if self.op.candidate_pool_size is not None:
1534       try:
1535         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1536       except (ValueError, TypeError), err:
1537         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1538                                    str(err))
1539       if self.op.candidate_pool_size < 1:
1540         raise errors.OpPrereqError("At least one master candidate needed")
1541
1542   def ExpandNames(self):
1543     # FIXME: in the future maybe other cluster params won't require checking on
1544     # all nodes to be modified.
1545     self.needed_locks = {
1546       locking.LEVEL_NODE: locking.ALL_SET,
1547     }
1548     self.share_locks[locking.LEVEL_NODE] = 1
1549
1550   def BuildHooksEnv(self):
1551     """Build hooks env.
1552
1553     """
1554     env = {
1555       "OP_TARGET": self.cfg.GetClusterName(),
1556       "NEW_VG_NAME": self.op.vg_name,
1557       }
1558     mn = self.cfg.GetMasterNode()
1559     return env, [mn], [mn]
1560
1561   def CheckPrereq(self):
1562     """Check prerequisites.
1563
1564     This checks whether the given params don't conflict and
1565     if the given volume group is valid.
1566
1567     """
1568     if self.op.vg_name is not None and not self.op.vg_name:
1569       instances = self.cfg.GetAllInstancesInfo().values()
1570       for inst in instances:
1571         for disk in inst.disks:
1572           if _RecursiveCheckIfLVMBased(disk):
1573             raise errors.OpPrereqError("Cannot disable lvm storage while"
1574                                        " lvm-based instances exist")
1575
1576     node_list = self.acquired_locks[locking.LEVEL_NODE]
1577
1578     # if vg_name not None, checks given volume group on all nodes
1579     if self.op.vg_name:
1580       vglist = self.rpc.call_vg_list(node_list)
1581       for node in node_list:
1582         if vglist[node].failed:
1583           # ignoring down node
1584           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1585           continue
1586         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1587                                               self.op.vg_name,
1588                                               constants.MIN_VG_SIZE)
1589         if vgstatus:
1590           raise errors.OpPrereqError("Error on node '%s': %s" %
1591                                      (node, vgstatus))
1592
1593     self.cluster = cluster = self.cfg.GetClusterInfo()
1594     # validate beparams changes
1595     if self.op.beparams:
1596       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1597       self.new_beparams = cluster.FillDict(
1598         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1599
1600     # hypervisor list/parameters
1601     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1602     if self.op.hvparams:
1603       if not isinstance(self.op.hvparams, dict):
1604         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1605       for hv_name, hv_dict in self.op.hvparams.items():
1606         if hv_name not in self.new_hvparams:
1607           self.new_hvparams[hv_name] = hv_dict
1608         else:
1609           self.new_hvparams[hv_name].update(hv_dict)
1610
1611     if self.op.enabled_hypervisors is not None:
1612       self.hv_list = self.op.enabled_hypervisors
1613       if not self.hv_list:
1614         raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1615                                    " least one member")
1616       invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1617       if invalid_hvs:
1618         raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1619                                    " entries: %s" % invalid_hvs)
1620     else:
1621       self.hv_list = cluster.enabled_hypervisors
1622
1623     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1624       # either the enabled list has changed, or the parameters have, validate
1625       for hv_name, hv_params in self.new_hvparams.items():
1626         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1627             (self.op.enabled_hypervisors and
1628              hv_name in self.op.enabled_hypervisors)):
1629           # either this is a new hypervisor, or its parameters have changed
1630           hv_class = hypervisor.GetHypervisor(hv_name)
1631           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1632           hv_class.CheckParameterSyntax(hv_params)
1633           _CheckHVParams(self, node_list, hv_name, hv_params)
1634
1635   def Exec(self, feedback_fn):
1636     """Change the parameters of the cluster.
1637
1638     """
1639     if self.op.vg_name is not None:
1640       new_volume = self.op.vg_name
1641       if not new_volume:
1642         new_volume = None
1643       if new_volume != self.cfg.GetVGName():
1644         self.cfg.SetVGName(new_volume)
1645       else:
1646         feedback_fn("Cluster LVM configuration already in desired"
1647                     " state, not changing")
1648     if self.op.hvparams:
1649       self.cluster.hvparams = self.new_hvparams
1650     if self.op.enabled_hypervisors is not None:
1651       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1652     if self.op.beparams:
1653       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1654     if self.op.candidate_pool_size is not None:
1655       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1656       # we need to update the pool size here, otherwise the save will fail
1657       _AdjustCandidatePool(self)
1658
1659     self.cfg.Update(self.cluster)
1660
1661
1662 class LURedistributeConfig(NoHooksLU):
1663   """Force the redistribution of cluster configuration.
1664
1665   This is a very simple LU.
1666
1667   """
1668   _OP_REQP = []
1669   REQ_BGL = False
1670
1671   def ExpandNames(self):
1672     self.needed_locks = {
1673       locking.LEVEL_NODE: locking.ALL_SET,
1674     }
1675     self.share_locks[locking.LEVEL_NODE] = 1
1676
1677   def CheckPrereq(self):
1678     """Check prerequisites.
1679
1680     """
1681
1682   def Exec(self, feedback_fn):
1683     """Redistribute the configuration.
1684
1685     """
1686     self.cfg.Update(self.cfg.GetClusterInfo())
1687
1688
1689 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1690   """Sleep and poll for an instance's disk to sync.
1691
1692   """
1693   if not instance.disks:
1694     return True
1695
1696   if not oneshot:
1697     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1698
1699   node = instance.primary_node
1700
1701   for dev in instance.disks:
1702     lu.cfg.SetDiskID(dev, node)
1703
1704   retries = 0
1705   degr_retries = 10 # in seconds, as we sleep 1 second each time
1706   while True:
1707     max_time = 0
1708     done = True
1709     cumul_degraded = False
1710     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1711     if rstats.failed or not rstats.data:
1712       lu.LogWarning("Can't get any data from node %s", node)
1713       retries += 1
1714       if retries >= 10:
1715         raise errors.RemoteError("Can't contact node %s for mirror data,"
1716                                  " aborting." % node)
1717       time.sleep(6)
1718       continue
1719     rstats = rstats.data
1720     retries = 0
1721     for i, mstat in enumerate(rstats):
1722       if mstat is None:
1723         lu.LogWarning("Can't compute data for node %s/%s",
1724                            node, instance.disks[i].iv_name)
1725         continue
1726       # we ignore the ldisk parameter
1727       perc_done, est_time, is_degraded, _ = mstat
1728       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1729       if perc_done is not None:
1730         done = False
1731         if est_time is not None:
1732           rem_time = "%d estimated seconds remaining" % est_time
1733           max_time = est_time
1734         else:
1735           rem_time = "no time estimate"
1736         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1737                         (instance.disks[i].iv_name, perc_done, rem_time))
1738
1739     # if we're done but degraded, let's do a few small retries, to
1740     # make sure we see a stable and not transient situation; therefore
1741     # we force restart of the loop
1742     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1743       logging.info("Degraded disks found, %d retries left", degr_retries)
1744       degr_retries -= 1
1745       time.sleep(1)
1746       continue
1747
1748     if done or oneshot:
1749       break
1750
1751     time.sleep(min(60, max_time))
1752
1753   if done:
1754     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1755   return not cumul_degraded
1756
1757
1758 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1759   """Check that mirrors are not degraded.
1760
1761   The ldisk parameter, if True, will change the test from the
1762   is_degraded attribute (which represents overall non-ok status for
1763   the device(s)) to the ldisk (representing the local storage status).
1764
1765   """
1766   lu.cfg.SetDiskID(dev, node)
1767   if ldisk:
1768     idx = 6
1769   else:
1770     idx = 5
1771
1772   result = True
1773   if on_primary or dev.AssembleOnSecondary():
1774     rstats = lu.rpc.call_blockdev_find(node, dev)
1775     msg = rstats.RemoteFailMsg()
1776     if msg:
1777       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1778       result = False
1779     elif not rstats.payload:
1780       lu.LogWarning("Can't find disk on node %s", node)
1781       result = False
1782     else:
1783       result = result and (not rstats.payload[idx])
1784   if dev.children:
1785     for child in dev.children:
1786       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1787
1788   return result
1789
1790
1791 class LUDiagnoseOS(NoHooksLU):
1792   """Logical unit for OS diagnose/query.
1793
1794   """
1795   _OP_REQP = ["output_fields", "names"]
1796   REQ_BGL = False
1797   _FIELDS_STATIC = utils.FieldSet()
1798   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1799
1800   def ExpandNames(self):
1801     if self.op.names:
1802       raise errors.OpPrereqError("Selective OS query not supported")
1803
1804     _CheckOutputFields(static=self._FIELDS_STATIC,
1805                        dynamic=self._FIELDS_DYNAMIC,
1806                        selected=self.op.output_fields)
1807
1808     # Lock all nodes, in shared mode
1809     # Temporary removal of locks, should be reverted later
1810     # TODO: reintroduce locks when they are lighter-weight
1811     self.needed_locks = {}
1812     #self.share_locks[locking.LEVEL_NODE] = 1
1813     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1814
1815   def CheckPrereq(self):
1816     """Check prerequisites.
1817
1818     """
1819
1820   @staticmethod
1821   def _DiagnoseByOS(node_list, rlist):
1822     """Remaps a per-node return list into an a per-os per-node dictionary
1823
1824     @param node_list: a list with the names of all nodes
1825     @param rlist: a map with node names as keys and OS objects as values
1826
1827     @rtype: dict
1828     @return: a dictionary with osnames as keys and as value another map, with
1829         nodes as keys and list of OS objects as values, eg::
1830
1831           {"debian-etch": {"node1": [<object>,...],
1832                            "node2": [<object>,]}
1833           }
1834
1835     """
1836     all_os = {}
1837     # we build here the list of nodes that didn't fail the RPC (at RPC
1838     # level), so that nodes with a non-responding node daemon don't
1839     # make all OSes invalid
1840     good_nodes = [node_name for node_name in rlist
1841                   if not rlist[node_name].failed]
1842     for node_name, nr in rlist.iteritems():
1843       if nr.failed or not nr.data:
1844         continue
1845       for os_obj in nr.data:
1846         if os_obj.name not in all_os:
1847           # build a list of nodes for this os containing empty lists
1848           # for each node in node_list
1849           all_os[os_obj.name] = {}
1850           for nname in good_nodes:
1851             all_os[os_obj.name][nname] = []
1852         all_os[os_obj.name][node_name].append(os_obj)
1853     return all_os
1854
1855   def Exec(self, feedback_fn):
1856     """Compute the list of OSes.
1857
1858     """
1859     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1860     node_data = self.rpc.call_os_diagnose(valid_nodes)
1861     if node_data == False:
1862       raise errors.OpExecError("Can't gather the list of OSes")
1863     pol = self._DiagnoseByOS(valid_nodes, node_data)
1864     output = []
1865     for os_name, os_data in pol.iteritems():
1866       row = []
1867       for field in self.op.output_fields:
1868         if field == "name":
1869           val = os_name
1870         elif field == "valid":
1871           val = utils.all([osl and osl[0] for osl in os_data.values()])
1872         elif field == "node_status":
1873           val = {}
1874           for node_name, nos_list in os_data.iteritems():
1875             val[node_name] = [(v.status, v.path) for v in nos_list]
1876         else:
1877           raise errors.ParameterError(field)
1878         row.append(val)
1879       output.append(row)
1880
1881     return output
1882
1883
1884 class LURemoveNode(LogicalUnit):
1885   """Logical unit for removing a node.
1886
1887   """
1888   HPATH = "node-remove"
1889   HTYPE = constants.HTYPE_NODE
1890   _OP_REQP = ["node_name"]
1891
1892   def BuildHooksEnv(self):
1893     """Build hooks env.
1894
1895     This doesn't run on the target node in the pre phase as a failed
1896     node would then be impossible to remove.
1897
1898     """
1899     env = {
1900       "OP_TARGET": self.op.node_name,
1901       "NODE_NAME": self.op.node_name,
1902       }
1903     all_nodes = self.cfg.GetNodeList()
1904     all_nodes.remove(self.op.node_name)
1905     return env, all_nodes, all_nodes
1906
1907   def CheckPrereq(self):
1908     """Check prerequisites.
1909
1910     This checks:
1911      - the node exists in the configuration
1912      - it does not have primary or secondary instances
1913      - it's not the master
1914
1915     Any errors are signaled by raising errors.OpPrereqError.
1916
1917     """
1918     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1919     if node is None:
1920       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1921
1922     instance_list = self.cfg.GetInstanceList()
1923
1924     masternode = self.cfg.GetMasterNode()
1925     if node.name == masternode:
1926       raise errors.OpPrereqError("Node is the master node,"
1927                                  " you need to failover first.")
1928
1929     for instance_name in instance_list:
1930       instance = self.cfg.GetInstanceInfo(instance_name)
1931       if node.name in instance.all_nodes:
1932         raise errors.OpPrereqError("Instance %s is still running on the node,"
1933                                    " please remove first." % instance_name)
1934     self.op.node_name = node.name
1935     self.node = node
1936
1937   def Exec(self, feedback_fn):
1938     """Removes the node from the cluster.
1939
1940     """
1941     node = self.node
1942     logging.info("Stopping the node daemon and removing configs from node %s",
1943                  node.name)
1944
1945     self.context.RemoveNode(node.name)
1946
1947     self.rpc.call_node_leave_cluster(node.name)
1948
1949     # Promote nodes to master candidate as needed
1950     _AdjustCandidatePool(self)
1951
1952
1953 class LUQueryNodes(NoHooksLU):
1954   """Logical unit for querying nodes.
1955
1956   """
1957   _OP_REQP = ["output_fields", "names", "use_locking"]
1958   REQ_BGL = False
1959   _FIELDS_DYNAMIC = utils.FieldSet(
1960     "dtotal", "dfree",
1961     "mtotal", "mnode", "mfree",
1962     "bootid",
1963     "ctotal", "cnodes", "csockets",
1964     )
1965
1966   _FIELDS_STATIC = utils.FieldSet(
1967     "name", "pinst_cnt", "sinst_cnt",
1968     "pinst_list", "sinst_list",
1969     "pip", "sip", "tags",
1970     "serial_no",
1971     "master_candidate",
1972     "master",
1973     "offline",
1974     "drained",
1975     "role",
1976     )
1977
1978   def ExpandNames(self):
1979     _CheckOutputFields(static=self._FIELDS_STATIC,
1980                        dynamic=self._FIELDS_DYNAMIC,
1981                        selected=self.op.output_fields)
1982
1983     self.needed_locks = {}
1984     self.share_locks[locking.LEVEL_NODE] = 1
1985
1986     if self.op.names:
1987       self.wanted = _GetWantedNodes(self, self.op.names)
1988     else:
1989       self.wanted = locking.ALL_SET
1990
1991     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1992     self.do_locking = self.do_node_query and self.op.use_locking
1993     if self.do_locking:
1994       # if we don't request only static fields, we need to lock the nodes
1995       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1996
1997
1998   def CheckPrereq(self):
1999     """Check prerequisites.
2000
2001     """
2002     # The validation of the node list is done in the _GetWantedNodes,
2003     # if non empty, and if empty, there's no validation to do
2004     pass
2005
2006   def Exec(self, feedback_fn):
2007     """Computes the list of nodes and their attributes.
2008
2009     """
2010     all_info = self.cfg.GetAllNodesInfo()
2011     if self.do_locking:
2012       nodenames = self.acquired_locks[locking.LEVEL_NODE]
2013     elif self.wanted != locking.ALL_SET:
2014       nodenames = self.wanted
2015       missing = set(nodenames).difference(all_info.keys())
2016       if missing:
2017         raise errors.OpExecError(
2018           "Some nodes were removed before retrieving their data: %s" % missing)
2019     else:
2020       nodenames = all_info.keys()
2021
2022     nodenames = utils.NiceSort(nodenames)
2023     nodelist = [all_info[name] for name in nodenames]
2024
2025     # begin data gathering
2026
2027     if self.do_node_query:
2028       live_data = {}
2029       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2030                                           self.cfg.GetHypervisorType())
2031       for name in nodenames:
2032         nodeinfo = node_data[name]
2033         if not nodeinfo.failed and nodeinfo.data:
2034           nodeinfo = nodeinfo.data
2035           fn = utils.TryConvert
2036           live_data[name] = {
2037             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2038             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2039             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2040             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2041             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2042             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2043             "bootid": nodeinfo.get('bootid', None),
2044             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2045             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2046             }
2047         else:
2048           live_data[name] = {}
2049     else:
2050       live_data = dict.fromkeys(nodenames, {})
2051
2052     node_to_primary = dict([(name, set()) for name in nodenames])
2053     node_to_secondary = dict([(name, set()) for name in nodenames])
2054
2055     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2056                              "sinst_cnt", "sinst_list"))
2057     if inst_fields & frozenset(self.op.output_fields):
2058       instancelist = self.cfg.GetInstanceList()
2059
2060       for instance_name in instancelist:
2061         inst = self.cfg.GetInstanceInfo(instance_name)
2062         if inst.primary_node in node_to_primary:
2063           node_to_primary[inst.primary_node].add(inst.name)
2064         for secnode in inst.secondary_nodes:
2065           if secnode in node_to_secondary:
2066             node_to_secondary[secnode].add(inst.name)
2067
2068     master_node = self.cfg.GetMasterNode()
2069
2070     # end data gathering
2071
2072     output = []
2073     for node in nodelist:
2074       node_output = []
2075       for field in self.op.output_fields:
2076         if field == "name":
2077           val = node.name
2078         elif field == "pinst_list":
2079           val = list(node_to_primary[node.name])
2080         elif field == "sinst_list":
2081           val = list(node_to_secondary[node.name])
2082         elif field == "pinst_cnt":
2083           val = len(node_to_primary[node.name])
2084         elif field == "sinst_cnt":
2085           val = len(node_to_secondary[node.name])
2086         elif field == "pip":
2087           val = node.primary_ip
2088         elif field == "sip":
2089           val = node.secondary_ip
2090         elif field == "tags":
2091           val = list(node.GetTags())
2092         elif field == "serial_no":
2093           val = node.serial_no
2094         elif field == "master_candidate":
2095           val = node.master_candidate
2096         elif field == "master":
2097           val = node.name == master_node
2098         elif field == "offline":
2099           val = node.offline
2100         elif field == "drained":
2101           val = node.drained
2102         elif self._FIELDS_DYNAMIC.Matches(field):
2103           val = live_data[node.name].get(field, None)
2104         elif field == "role":
2105           if node.name == master_node:
2106             val = "M"
2107           elif node.master_candidate:
2108             val = "C"
2109           elif node.drained:
2110             val = "D"
2111           elif node.offline:
2112             val = "O"
2113           else:
2114             val = "R"
2115         else:
2116           raise errors.ParameterError(field)
2117         node_output.append(val)
2118       output.append(node_output)
2119
2120     return output
2121
2122
2123 class LUQueryNodeVolumes(NoHooksLU):
2124   """Logical unit for getting volumes on node(s).
2125
2126   """
2127   _OP_REQP = ["nodes", "output_fields"]
2128   REQ_BGL = False
2129   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2130   _FIELDS_STATIC = utils.FieldSet("node")
2131
2132   def ExpandNames(self):
2133     _CheckOutputFields(static=self._FIELDS_STATIC,
2134                        dynamic=self._FIELDS_DYNAMIC,
2135                        selected=self.op.output_fields)
2136
2137     self.needed_locks = {}
2138     self.share_locks[locking.LEVEL_NODE] = 1
2139     if not self.op.nodes:
2140       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2141     else:
2142       self.needed_locks[locking.LEVEL_NODE] = \
2143         _GetWantedNodes(self, self.op.nodes)
2144
2145   def CheckPrereq(self):
2146     """Check prerequisites.
2147
2148     This checks that the fields required are valid output fields.
2149
2150     """
2151     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2152
2153   def Exec(self, feedback_fn):
2154     """Computes the list of nodes and their attributes.
2155
2156     """
2157     nodenames = self.nodes
2158     volumes = self.rpc.call_node_volumes(nodenames)
2159
2160     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2161              in self.cfg.GetInstanceList()]
2162
2163     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2164
2165     output = []
2166     for node in nodenames:
2167       if node not in volumes or volumes[node].failed or not volumes[node].data:
2168         continue
2169
2170       node_vols = volumes[node].data[:]
2171       node_vols.sort(key=lambda vol: vol['dev'])
2172
2173       for vol in node_vols:
2174         node_output = []
2175         for field in self.op.output_fields:
2176           if field == "node":
2177             val = node
2178           elif field == "phys":
2179             val = vol['dev']
2180           elif field == "vg":
2181             val = vol['vg']
2182           elif field == "name":
2183             val = vol['name']
2184           elif field == "size":
2185             val = int(float(vol['size']))
2186           elif field == "instance":
2187             for inst in ilist:
2188               if node not in lv_by_node[inst]:
2189                 continue
2190               if vol['name'] in lv_by_node[inst][node]:
2191                 val = inst.name
2192                 break
2193             else:
2194               val = '-'
2195           else:
2196             raise errors.ParameterError(field)
2197           node_output.append(str(val))
2198
2199         output.append(node_output)
2200
2201     return output
2202
2203
2204 class LUAddNode(LogicalUnit):
2205   """Logical unit for adding node to the cluster.
2206
2207   """
2208   HPATH = "node-add"
2209   HTYPE = constants.HTYPE_NODE
2210   _OP_REQP = ["node_name"]
2211
2212   def BuildHooksEnv(self):
2213     """Build hooks env.
2214
2215     This will run on all nodes before, and on all nodes + the new node after.
2216
2217     """
2218     env = {
2219       "OP_TARGET": self.op.node_name,
2220       "NODE_NAME": self.op.node_name,
2221       "NODE_PIP": self.op.primary_ip,
2222       "NODE_SIP": self.op.secondary_ip,
2223       }
2224     nodes_0 = self.cfg.GetNodeList()
2225     nodes_1 = nodes_0 + [self.op.node_name, ]
2226     return env, nodes_0, nodes_1
2227
2228   def CheckPrereq(self):
2229     """Check prerequisites.
2230
2231     This checks:
2232      - the new node is not already in the config
2233      - it is resolvable
2234      - its parameters (single/dual homed) matches the cluster
2235
2236     Any errors are signaled by raising errors.OpPrereqError.
2237
2238     """
2239     node_name = self.op.node_name
2240     cfg = self.cfg
2241
2242     dns_data = utils.HostInfo(node_name)
2243
2244     node = dns_data.name
2245     primary_ip = self.op.primary_ip = dns_data.ip
2246     secondary_ip = getattr(self.op, "secondary_ip", None)
2247     if secondary_ip is None:
2248       secondary_ip = primary_ip
2249     if not utils.IsValidIP(secondary_ip):
2250       raise errors.OpPrereqError("Invalid secondary IP given")
2251     self.op.secondary_ip = secondary_ip
2252
2253     node_list = cfg.GetNodeList()
2254     if not self.op.readd and node in node_list:
2255       raise errors.OpPrereqError("Node %s is already in the configuration" %
2256                                  node)
2257     elif self.op.readd and node not in node_list:
2258       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2259
2260     for existing_node_name in node_list:
2261       existing_node = cfg.GetNodeInfo(existing_node_name)
2262
2263       if self.op.readd and node == existing_node_name:
2264         if (existing_node.primary_ip != primary_ip or
2265             existing_node.secondary_ip != secondary_ip):
2266           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2267                                      " address configuration as before")
2268         continue
2269
2270       if (existing_node.primary_ip == primary_ip or
2271           existing_node.secondary_ip == primary_ip or
2272           existing_node.primary_ip == secondary_ip or
2273           existing_node.secondary_ip == secondary_ip):
2274         raise errors.OpPrereqError("New node ip address(es) conflict with"
2275                                    " existing node %s" % existing_node.name)
2276
2277     # check that the type of the node (single versus dual homed) is the
2278     # same as for the master
2279     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2280     master_singlehomed = myself.secondary_ip == myself.primary_ip
2281     newbie_singlehomed = secondary_ip == primary_ip
2282     if master_singlehomed != newbie_singlehomed:
2283       if master_singlehomed:
2284         raise errors.OpPrereqError("The master has no private ip but the"
2285                                    " new node has one")
2286       else:
2287         raise errors.OpPrereqError("The master has a private ip but the"
2288                                    " new node doesn't have one")
2289
2290     # checks reachability
2291     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2292       raise errors.OpPrereqError("Node not reachable by ping")
2293
2294     if not newbie_singlehomed:
2295       # check reachability from my secondary ip to newbie's secondary ip
2296       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2297                            source=myself.secondary_ip):
2298         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2299                                    " based ping to noded port")
2300
2301     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2302     if self.op.readd:
2303       exceptions = [node]
2304     else:
2305       exceptions = []
2306     mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2307     # the new node will increase mc_max with one, so:
2308     mc_max = min(mc_max + 1, cp_size)
2309     self.master_candidate = mc_now < mc_max
2310
2311     if self.op.readd:
2312       self.new_node = self.cfg.GetNodeInfo(node)
2313       assert self.new_node is not None, "Can't retrieve locked node %s" % node
2314     else:
2315       self.new_node = objects.Node(name=node,
2316                                    primary_ip=primary_ip,
2317                                    secondary_ip=secondary_ip,
2318                                    master_candidate=self.master_candidate,
2319                                    offline=False, drained=False)
2320
2321   def Exec(self, feedback_fn):
2322     """Adds the new node to the cluster.
2323
2324     """
2325     new_node = self.new_node
2326     node = new_node.name
2327
2328     # for re-adds, reset the offline/drained/master-candidate flags;
2329     # we need to reset here, otherwise offline would prevent RPC calls
2330     # later in the procedure; this also means that if the re-add
2331     # fails, we are left with a non-offlined, broken node
2332     if self.op.readd:
2333       new_node.drained = new_node.offline = False
2334       self.LogInfo("Readding a node, the offline/drained flags were reset")
2335       # if we demote the node, we do cleanup later in the procedure
2336       new_node.master_candidate = self.master_candidate
2337
2338     # notify the user about any possible mc promotion
2339     if new_node.master_candidate:
2340       self.LogInfo("Node will be a master candidate")
2341
2342     # check connectivity
2343     result = self.rpc.call_version([node])[node]
2344     result.Raise()
2345     if result.data:
2346       if constants.PROTOCOL_VERSION == result.data:
2347         logging.info("Communication to node %s fine, sw version %s match",
2348                      node, result.data)
2349       else:
2350         raise errors.OpExecError("Version mismatch master version %s,"
2351                                  " node version %s" %
2352                                  (constants.PROTOCOL_VERSION, result.data))
2353     else:
2354       raise errors.OpExecError("Cannot get version from the new node")
2355
2356     # setup ssh on node
2357     logging.info("Copy ssh key to node %s", node)
2358     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2359     keyarray = []
2360     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2361                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2362                 priv_key, pub_key]
2363
2364     for i in keyfiles:
2365       f = open(i, 'r')
2366       try:
2367         keyarray.append(f.read())
2368       finally:
2369         f.close()
2370
2371     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2372                                     keyarray[2],
2373                                     keyarray[3], keyarray[4], keyarray[5])
2374
2375     msg = result.RemoteFailMsg()
2376     if msg:
2377       raise errors.OpExecError("Cannot transfer ssh keys to the"
2378                                " new node: %s" % msg)
2379
2380     # Add node to our /etc/hosts, and add key to known_hosts
2381     utils.AddHostToEtcHosts(new_node.name)
2382
2383     if new_node.secondary_ip != new_node.primary_ip:
2384       result = self.rpc.call_node_has_ip_address(new_node.name,
2385                                                  new_node.secondary_ip)
2386       if result.failed or not result.data:
2387         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2388                                  " you gave (%s). Please fix and re-run this"
2389                                  " command." % new_node.secondary_ip)
2390
2391     node_verify_list = [self.cfg.GetMasterNode()]
2392     node_verify_param = {
2393       'nodelist': [node],
2394       # TODO: do a node-net-test as well?
2395     }
2396
2397     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2398                                        self.cfg.GetClusterName())
2399     for verifier in node_verify_list:
2400       if result[verifier].failed or not result[verifier].data:
2401         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2402                                  " for remote verification" % verifier)
2403       if result[verifier].data['nodelist']:
2404         for failed in result[verifier].data['nodelist']:
2405           feedback_fn("ssh/hostname verification failed %s -> %s" %
2406                       (verifier, result[verifier].data['nodelist'][failed]))
2407         raise errors.OpExecError("ssh/hostname verification failed.")
2408
2409     # Distribute updated /etc/hosts and known_hosts to all nodes,
2410     # including the node just added
2411     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2412     dist_nodes = self.cfg.GetNodeList()
2413     if not self.op.readd:
2414       dist_nodes.append(node)
2415     if myself.name in dist_nodes:
2416       dist_nodes.remove(myself.name)
2417
2418     logging.debug("Copying hosts and known_hosts to all nodes")
2419     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2420       result = self.rpc.call_upload_file(dist_nodes, fname)
2421       for to_node, to_result in result.iteritems():
2422         if to_result.failed or not to_result.data:
2423           logging.error("Copy of file %s to node %s failed", fname, to_node)
2424
2425     to_copy = []
2426     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2427     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2428       to_copy.append(constants.VNC_PASSWORD_FILE)
2429
2430     for fname in to_copy:
2431       result = self.rpc.call_upload_file([node], fname)
2432       if result[node].failed or not result[node]:
2433         logging.error("Could not copy file %s to node %s", fname, node)
2434
2435     if self.op.readd:
2436       self.context.ReaddNode(new_node)
2437       # make sure we redistribute the config
2438       self.cfg.Update(new_node)
2439       # and make sure the new node will not have old files around
2440       if not new_node.master_candidate:
2441         result = self.rpc.call_node_demote_from_mc(new_node.name)
2442         msg = result.RemoteFailMsg()
2443         if msg:
2444           self.LogWarning("Node failed to demote itself from master"
2445                           " candidate status: %s" % msg)
2446     else:
2447       self.context.AddNode(new_node)
2448
2449
2450 class LUSetNodeParams(LogicalUnit):
2451   """Modifies the parameters of a node.
2452
2453   """
2454   HPATH = "node-modify"
2455   HTYPE = constants.HTYPE_NODE
2456   _OP_REQP = ["node_name"]
2457   REQ_BGL = False
2458
2459   def CheckArguments(self):
2460     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2461     if node_name is None:
2462       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2463     self.op.node_name = node_name
2464     _CheckBooleanOpField(self.op, 'master_candidate')
2465     _CheckBooleanOpField(self.op, 'offline')
2466     _CheckBooleanOpField(self.op, 'drained')
2467     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2468     if all_mods.count(None) == 3:
2469       raise errors.OpPrereqError("Please pass at least one modification")
2470     if all_mods.count(True) > 1:
2471       raise errors.OpPrereqError("Can't set the node into more than one"
2472                                  " state at the same time")
2473
2474   def ExpandNames(self):
2475     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2476
2477   def BuildHooksEnv(self):
2478     """Build hooks env.
2479
2480     This runs on the master node.
2481
2482     """
2483     env = {
2484       "OP_TARGET": self.op.node_name,
2485       "MASTER_CANDIDATE": str(self.op.master_candidate),
2486       "OFFLINE": str(self.op.offline),
2487       "DRAINED": str(self.op.drained),
2488       }
2489     nl = [self.cfg.GetMasterNode(),
2490           self.op.node_name]
2491     return env, nl, nl
2492
2493   def CheckPrereq(self):
2494     """Check prerequisites.
2495
2496     This only checks the instance list against the existing names.
2497
2498     """
2499     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2500
2501     if ((self.op.master_candidate == False or self.op.offline == True or
2502          self.op.drained == True) and node.master_candidate):
2503       # we will demote the node from master_candidate
2504       if self.op.node_name == self.cfg.GetMasterNode():
2505         raise errors.OpPrereqError("The master node has to be a"
2506                                    " master candidate, online and not drained")
2507       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2508       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2509       if num_candidates <= cp_size:
2510         msg = ("Not enough master candidates (desired"
2511                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2512         if self.op.force:
2513           self.LogWarning(msg)
2514         else:
2515           raise errors.OpPrereqError(msg)
2516
2517     if (self.op.master_candidate == True and
2518         ((node.offline and not self.op.offline == False) or
2519          (node.drained and not self.op.drained == False))):
2520       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2521                                  " to master_candidate" % node.name)
2522
2523     return
2524
2525   def Exec(self, feedback_fn):
2526     """Modifies a node.
2527
2528     """
2529     node = self.node
2530
2531     result = []
2532     changed_mc = False
2533
2534     if self.op.offline is not None:
2535       node.offline = self.op.offline
2536       result.append(("offline", str(self.op.offline)))
2537       if self.op.offline == True:
2538         if node.master_candidate:
2539           node.master_candidate = False
2540           changed_mc = True
2541           result.append(("master_candidate", "auto-demotion due to offline"))
2542         if node.drained:
2543           node.drained = False
2544           result.append(("drained", "clear drained status due to offline"))
2545
2546     if self.op.master_candidate is not None:
2547       node.master_candidate = self.op.master_candidate
2548       changed_mc = True
2549       result.append(("master_candidate", str(self.op.master_candidate)))
2550       if self.op.master_candidate == False:
2551         rrc = self.rpc.call_node_demote_from_mc(node.name)
2552         msg = rrc.RemoteFailMsg()
2553         if msg:
2554           self.LogWarning("Node failed to demote itself: %s" % msg)
2555
2556     if self.op.drained is not None:
2557       node.drained = self.op.drained
2558       result.append(("drained", str(self.op.drained)))
2559       if self.op.drained == True:
2560         if node.master_candidate:
2561           node.master_candidate = False
2562           changed_mc = True
2563           result.append(("master_candidate", "auto-demotion due to drain"))
2564           rrc = self.rpc.call_node_demote_from_mc(node.name)
2565           msg = rrc.RemoteFailMsg()
2566           if msg:
2567             self.LogWarning("Node failed to demote itself: %s" % msg)
2568         if node.offline:
2569           node.offline = False
2570           result.append(("offline", "clear offline status due to drain"))
2571
2572     # this will trigger configuration file update, if needed
2573     self.cfg.Update(node)
2574     # this will trigger job queue propagation or cleanup
2575     if changed_mc:
2576       self.context.ReaddNode(node)
2577
2578     return result
2579
2580
2581 class LUQueryClusterInfo(NoHooksLU):
2582   """Query cluster configuration.
2583
2584   """
2585   _OP_REQP = []
2586   REQ_BGL = False
2587
2588   def ExpandNames(self):
2589     self.needed_locks = {}
2590
2591   def CheckPrereq(self):
2592     """No prerequsites needed for this LU.
2593
2594     """
2595     pass
2596
2597   def Exec(self, feedback_fn):
2598     """Return cluster config.
2599
2600     """
2601     cluster = self.cfg.GetClusterInfo()
2602     result = {
2603       "software_version": constants.RELEASE_VERSION,
2604       "protocol_version": constants.PROTOCOL_VERSION,
2605       "config_version": constants.CONFIG_VERSION,
2606       "os_api_version": constants.OS_API_VERSION,
2607       "export_version": constants.EXPORT_VERSION,
2608       "architecture": (platform.architecture()[0], platform.machine()),
2609       "name": cluster.cluster_name,
2610       "master": cluster.master_node,
2611       "default_hypervisor": cluster.default_hypervisor,
2612       "enabled_hypervisors": cluster.enabled_hypervisors,
2613       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2614                         for hypervisor_name in cluster.enabled_hypervisors]),
2615       "beparams": cluster.beparams,
2616       "candidate_pool_size": cluster.candidate_pool_size,
2617       "default_bridge": cluster.default_bridge,
2618       "master_netdev": cluster.master_netdev,
2619       "volume_group_name": cluster.volume_group_name,
2620       "file_storage_dir": cluster.file_storage_dir,
2621       }
2622
2623     return result
2624
2625
2626 class LUQueryConfigValues(NoHooksLU):
2627   """Return configuration values.
2628
2629   """
2630   _OP_REQP = []
2631   REQ_BGL = False
2632   _FIELDS_DYNAMIC = utils.FieldSet()
2633   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2634
2635   def ExpandNames(self):
2636     self.needed_locks = {}
2637
2638     _CheckOutputFields(static=self._FIELDS_STATIC,
2639                        dynamic=self._FIELDS_DYNAMIC,
2640                        selected=self.op.output_fields)
2641
2642   def CheckPrereq(self):
2643     """No prerequisites.
2644
2645     """
2646     pass
2647
2648   def Exec(self, feedback_fn):
2649     """Dump a representation of the cluster config to the standard output.
2650
2651     """
2652     values = []
2653     for field in self.op.output_fields:
2654       if field == "cluster_name":
2655         entry = self.cfg.GetClusterName()
2656       elif field == "master_node":
2657         entry = self.cfg.GetMasterNode()
2658       elif field == "drain_flag":
2659         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2660       else:
2661         raise errors.ParameterError(field)
2662       values.append(entry)
2663     return values
2664
2665
2666 class LUActivateInstanceDisks(NoHooksLU):
2667   """Bring up an instance's disks.
2668
2669   """
2670   _OP_REQP = ["instance_name"]
2671   REQ_BGL = False
2672
2673   def ExpandNames(self):
2674     self._ExpandAndLockInstance()
2675     self.needed_locks[locking.LEVEL_NODE] = []
2676     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2677
2678   def DeclareLocks(self, level):
2679     if level == locking.LEVEL_NODE:
2680       self._LockInstancesNodes()
2681
2682   def CheckPrereq(self):
2683     """Check prerequisites.
2684
2685     This checks that the instance is in the cluster.
2686
2687     """
2688     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2689     assert self.instance is not None, \
2690       "Cannot retrieve locked instance %s" % self.op.instance_name
2691     _CheckNodeOnline(self, self.instance.primary_node)
2692     if not hasattr(self.op, "ignore_size"):
2693       self.op.ignore_size = False
2694
2695   def Exec(self, feedback_fn):
2696     """Activate the disks.
2697
2698     """
2699     disks_ok, disks_info = \
2700               _AssembleInstanceDisks(self, self.instance,
2701                                      ignore_size=self.op.ignore_size)
2702     if not disks_ok:
2703       raise errors.OpExecError("Cannot activate block devices")
2704
2705     return disks_info
2706
2707
2708 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
2709                            ignore_size=False):
2710   """Prepare the block devices for an instance.
2711
2712   This sets up the block devices on all nodes.
2713
2714   @type lu: L{LogicalUnit}
2715   @param lu: the logical unit on whose behalf we execute
2716   @type instance: L{objects.Instance}
2717   @param instance: the instance for whose disks we assemble
2718   @type ignore_secondaries: boolean
2719   @param ignore_secondaries: if true, errors on secondary nodes
2720       won't result in an error return from the function
2721   @type ignore_size: boolean
2722   @param ignore_size: if true, the current known size of the disk
2723       will not be used during the disk activation, useful for cases
2724       when the size is wrong
2725   @return: False if the operation failed, otherwise a list of
2726       (host, instance_visible_name, node_visible_name)
2727       with the mapping from node devices to instance devices
2728
2729   """
2730   device_info = []
2731   disks_ok = True
2732   iname = instance.name
2733   # With the two passes mechanism we try to reduce the window of
2734   # opportunity for the race condition of switching DRBD to primary
2735   # before handshaking occured, but we do not eliminate it
2736
2737   # The proper fix would be to wait (with some limits) until the
2738   # connection has been made and drbd transitions from WFConnection
2739   # into any other network-connected state (Connected, SyncTarget,
2740   # SyncSource, etc.)
2741
2742   # 1st pass, assemble on all nodes in secondary mode
2743   for inst_disk in instance.disks:
2744     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2745       if ignore_size:
2746         node_disk = node_disk.Copy()
2747         node_disk.UnsetSize()
2748       lu.cfg.SetDiskID(node_disk, node)
2749       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2750       msg = result.RemoteFailMsg()
2751       if msg:
2752         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2753                            " (is_primary=False, pass=1): %s",
2754                            inst_disk.iv_name, node, msg)
2755         if not ignore_secondaries:
2756           disks_ok = False
2757
2758   # FIXME: race condition on drbd migration to primary
2759
2760   # 2nd pass, do only the primary node
2761   for inst_disk in instance.disks:
2762     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2763       if node != instance.primary_node:
2764         continue
2765       if ignore_size:
2766         node_disk = node_disk.Copy()
2767         node_disk.UnsetSize()
2768       lu.cfg.SetDiskID(node_disk, node)
2769       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2770       msg = result.RemoteFailMsg()
2771       if msg:
2772         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2773                            " (is_primary=True, pass=2): %s",
2774                            inst_disk.iv_name, node, msg)
2775         disks_ok = False
2776     device_info.append((instance.primary_node, inst_disk.iv_name,
2777                         result.payload))
2778
2779   # leave the disks configured for the primary node
2780   # this is a workaround that would be fixed better by
2781   # improving the logical/physical id handling
2782   for disk in instance.disks:
2783     lu.cfg.SetDiskID(disk, instance.primary_node)
2784
2785   return disks_ok, device_info
2786
2787
2788 def _StartInstanceDisks(lu, instance, force):
2789   """Start the disks of an instance.
2790
2791   """
2792   disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2793                                            ignore_secondaries=force)
2794   if not disks_ok:
2795     _ShutdownInstanceDisks(lu, instance)
2796     if force is not None and not force:
2797       lu.proc.LogWarning("", hint="If the message above refers to a"
2798                          " secondary node,"
2799                          " you can retry the operation using '--force'.")
2800     raise errors.OpExecError("Disk consistency error")
2801
2802
2803 class LUDeactivateInstanceDisks(NoHooksLU):
2804   """Shutdown an instance's disks.
2805
2806   """
2807   _OP_REQP = ["instance_name"]
2808   REQ_BGL = False
2809
2810   def ExpandNames(self):
2811     self._ExpandAndLockInstance()
2812     self.needed_locks[locking.LEVEL_NODE] = []
2813     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2814
2815   def DeclareLocks(self, level):
2816     if level == locking.LEVEL_NODE:
2817       self._LockInstancesNodes()
2818
2819   def CheckPrereq(self):
2820     """Check prerequisites.
2821
2822     This checks that the instance is in the cluster.
2823
2824     """
2825     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2826     assert self.instance is not None, \
2827       "Cannot retrieve locked instance %s" % self.op.instance_name
2828
2829   def Exec(self, feedback_fn):
2830     """Deactivate the disks
2831
2832     """
2833     instance = self.instance
2834     _SafeShutdownInstanceDisks(self, instance)
2835
2836
2837 def _SafeShutdownInstanceDisks(lu, instance):
2838   """Shutdown block devices of an instance.
2839
2840   This function checks if an instance is running, before calling
2841   _ShutdownInstanceDisks.
2842
2843   """
2844   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2845                                       [instance.hypervisor])
2846   ins_l = ins_l[instance.primary_node]
2847   if ins_l.failed or not isinstance(ins_l.data, list):
2848     raise errors.OpExecError("Can't contact node '%s'" %
2849                              instance.primary_node)
2850
2851   if instance.name in ins_l.data:
2852     raise errors.OpExecError("Instance is running, can't shutdown"
2853                              " block devices.")
2854
2855   _ShutdownInstanceDisks(lu, instance)
2856
2857
2858 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2859   """Shutdown block devices of an instance.
2860
2861   This does the shutdown on all nodes of the instance.
2862
2863   If the ignore_primary is false, errors on the primary node are
2864   ignored.
2865
2866   """
2867   all_result = True
2868   for disk in instance.disks:
2869     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2870       lu.cfg.SetDiskID(top_disk, node)
2871       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2872       msg = result.RemoteFailMsg()
2873       if msg:
2874         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2875                       disk.iv_name, node, msg)
2876         if not ignore_primary or node != instance.primary_node:
2877           all_result = False
2878   return all_result
2879
2880
2881 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2882   """Checks if a node has enough free memory.
2883
2884   This function check if a given node has the needed amount of free
2885   memory. In case the node has less memory or we cannot get the
2886   information from the node, this function raise an OpPrereqError
2887   exception.
2888
2889   @type lu: C{LogicalUnit}
2890   @param lu: a logical unit from which we get configuration data
2891   @type node: C{str}
2892   @param node: the node to check
2893   @type reason: C{str}
2894   @param reason: string to use in the error message
2895   @type requested: C{int}
2896   @param requested: the amount of memory in MiB to check for
2897   @type hypervisor_name: C{str}
2898   @param hypervisor_name: the hypervisor to ask for memory stats
2899   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2900       we cannot check the node
2901
2902   """
2903   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2904   nodeinfo[node].Raise()
2905   free_mem = nodeinfo[node].data.get('memory_free')
2906   if not isinstance(free_mem, int):
2907     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2908                              " was '%s'" % (node, free_mem))
2909   if requested > free_mem:
2910     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2911                              " needed %s MiB, available %s MiB" %
2912                              (node, reason, requested, free_mem))
2913
2914
2915 class LUStartupInstance(LogicalUnit):
2916   """Starts an instance.
2917
2918   """
2919   HPATH = "instance-start"
2920   HTYPE = constants.HTYPE_INSTANCE
2921   _OP_REQP = ["instance_name", "force"]
2922   REQ_BGL = False
2923
2924   def ExpandNames(self):
2925     self._ExpandAndLockInstance()
2926
2927   def BuildHooksEnv(self):
2928     """Build hooks env.
2929
2930     This runs on master, primary and secondary nodes of the instance.
2931
2932     """
2933     env = {
2934       "FORCE": self.op.force,
2935       }
2936     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2937     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2938     return env, nl, nl
2939
2940   def CheckPrereq(self):
2941     """Check prerequisites.
2942
2943     This checks that the instance is in the cluster.
2944
2945     """
2946     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2947     assert self.instance is not None, \
2948       "Cannot retrieve locked instance %s" % self.op.instance_name
2949
2950     # extra beparams
2951     self.beparams = getattr(self.op, "beparams", {})
2952     if self.beparams:
2953       if not isinstance(self.beparams, dict):
2954         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2955                                    " dict" % (type(self.beparams), ))
2956       # fill the beparams dict
2957       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2958       self.op.beparams = self.beparams
2959
2960     # extra hvparams
2961     self.hvparams = getattr(self.op, "hvparams", {})
2962     if self.hvparams:
2963       if not isinstance(self.hvparams, dict):
2964         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2965                                    " dict" % (type(self.hvparams), ))
2966
2967       # check hypervisor parameter syntax (locally)
2968       cluster = self.cfg.GetClusterInfo()
2969       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2970       filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2971                                     instance.hvparams)
2972       filled_hvp.update(self.hvparams)
2973       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2974       hv_type.CheckParameterSyntax(filled_hvp)
2975       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2976       self.op.hvparams = self.hvparams
2977
2978     _CheckNodeOnline(self, instance.primary_node)
2979
2980     bep = self.cfg.GetClusterInfo().FillBE(instance)
2981     # check bridges existence
2982     _CheckInstanceBridgesExist(self, instance)
2983
2984     remote_info = self.rpc.call_instance_info(instance.primary_node,
2985                                               instance.name,
2986                                               instance.hypervisor)
2987     remote_info.Raise()
2988     if not remote_info.data:
2989       _CheckNodeFreeMemory(self, instance.primary_node,
2990                            "starting instance %s" % instance.name,
2991                            bep[constants.BE_MEMORY], instance.hypervisor)
2992
2993   def Exec(self, feedback_fn):
2994     """Start the instance.
2995
2996     """
2997     instance = self.instance
2998     force = self.op.force
2999
3000     self.cfg.MarkInstanceUp(instance.name)
3001
3002     node_current = instance.primary_node
3003
3004     _StartInstanceDisks(self, instance, force)
3005
3006     result = self.rpc.call_instance_start(node_current, instance,
3007                                           self.hvparams, self.beparams)
3008     msg = result.RemoteFailMsg()
3009     if msg:
3010       _ShutdownInstanceDisks(self, instance)
3011       raise errors.OpExecError("Could not start instance: %s" % msg)
3012
3013
3014 class LURebootInstance(LogicalUnit):
3015   """Reboot an instance.
3016
3017   """
3018   HPATH = "instance-reboot"
3019   HTYPE = constants.HTYPE_INSTANCE
3020   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3021   REQ_BGL = False
3022
3023   def ExpandNames(self):
3024     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3025                                    constants.INSTANCE_REBOOT_HARD,
3026                                    constants.INSTANCE_REBOOT_FULL]:
3027       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3028                                   (constants.INSTANCE_REBOOT_SOFT,
3029                                    constants.INSTANCE_REBOOT_HARD,
3030                                    constants.INSTANCE_REBOOT_FULL))
3031     self._ExpandAndLockInstance()
3032
3033   def BuildHooksEnv(self):
3034     """Build hooks env.
3035
3036     This runs on master, primary and secondary nodes of the instance.
3037
3038     """
3039     env = {
3040       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3041       "REBOOT_TYPE": self.op.reboot_type,
3042       }
3043     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3044     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3045     return env, nl, nl
3046
3047   def CheckPrereq(self):
3048     """Check prerequisites.
3049
3050     This checks that the instance is in the cluster.
3051
3052     """
3053     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3054     assert self.instance is not None, \
3055       "Cannot retrieve locked instance %s" % self.op.instance_name
3056
3057     _CheckNodeOnline(self, instance.primary_node)
3058
3059     # check bridges existence
3060     _CheckInstanceBridgesExist(self, instance)
3061
3062   def Exec(self, feedback_fn):
3063     """Reboot the instance.
3064
3065     """
3066     instance = self.instance
3067     ignore_secondaries = self.op.ignore_secondaries
3068     reboot_type = self.op.reboot_type
3069
3070     node_current = instance.primary_node
3071
3072     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3073                        constants.INSTANCE_REBOOT_HARD]:
3074       for disk in instance.disks:
3075         self.cfg.SetDiskID(disk, node_current)
3076       result = self.rpc.call_instance_reboot(node_current, instance,
3077                                              reboot_type)
3078       msg = result.RemoteFailMsg()
3079       if msg:
3080         raise errors.OpExecError("Could not reboot instance: %s" % msg)
3081     else:
3082       result = self.rpc.call_instance_shutdown(node_current, instance)
3083       msg = result.RemoteFailMsg()
3084       if msg:
3085         raise errors.OpExecError("Could not shutdown instance for"
3086                                  " full reboot: %s" % msg)
3087       _ShutdownInstanceDisks(self, instance)
3088       _StartInstanceDisks(self, instance, ignore_secondaries)
3089       result = self.rpc.call_instance_start(node_current, instance, None, None)
3090       msg = result.RemoteFailMsg()
3091       if msg:
3092         _ShutdownInstanceDisks(self, instance)
3093         raise errors.OpExecError("Could not start instance for"
3094                                  " full reboot: %s" % msg)
3095
3096     self.cfg.MarkInstanceUp(instance.name)
3097
3098
3099 class LUShutdownInstance(LogicalUnit):
3100   """Shutdown an instance.
3101
3102   """
3103   HPATH = "instance-stop"
3104   HTYPE = constants.HTYPE_INSTANCE
3105   _OP_REQP = ["instance_name"]
3106   REQ_BGL = False
3107
3108   def ExpandNames(self):
3109     self._ExpandAndLockInstance()
3110
3111   def BuildHooksEnv(self):
3112     """Build hooks env.
3113
3114     This runs on master, primary and secondary nodes of the instance.
3115
3116     """
3117     env = _BuildInstanceHookEnvByObject(self, self.instance)
3118     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3119     return env, nl, nl
3120
3121   def CheckPrereq(self):
3122     """Check prerequisites.
3123
3124     This checks that the instance is in the cluster.
3125
3126     """
3127     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3128     assert self.instance is not None, \
3129       "Cannot retrieve locked instance %s" % self.op.instance_name
3130     _CheckNodeOnline(self, self.instance.primary_node)
3131
3132   def Exec(self, feedback_fn):
3133     """Shutdown the instance.
3134
3135     """
3136     instance = self.instance
3137     node_current = instance.primary_node
3138     self.cfg.MarkInstanceDown(instance.name)
3139     result = self.rpc.call_instance_shutdown(node_current, instance)
3140     msg = result.RemoteFailMsg()
3141     if msg:
3142       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3143
3144     _ShutdownInstanceDisks(self, instance)
3145
3146
3147 class LUReinstallInstance(LogicalUnit):
3148   """Reinstall an instance.
3149
3150   """
3151   HPATH = "instance-reinstall"
3152   HTYPE = constants.HTYPE_INSTANCE
3153   _OP_REQP = ["instance_name"]
3154   REQ_BGL = False
3155
3156   def ExpandNames(self):
3157     self._ExpandAndLockInstance()
3158
3159   def BuildHooksEnv(self):
3160     """Build hooks env.
3161
3162     This runs on master, primary and secondary nodes of the instance.
3163
3164     """
3165     env = _BuildInstanceHookEnvByObject(self, self.instance)
3166     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3167     return env, nl, nl
3168
3169   def CheckPrereq(self):
3170     """Check prerequisites.
3171
3172     This checks that the instance is in the cluster and is not running.
3173
3174     """
3175     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3176     assert instance is not None, \
3177       "Cannot retrieve locked instance %s" % self.op.instance_name
3178     _CheckNodeOnline(self, instance.primary_node)
3179
3180     if instance.disk_template == constants.DT_DISKLESS:
3181       raise errors.OpPrereqError("Instance '%s' has no disks" %
3182                                  self.op.instance_name)
3183     if instance.admin_up:
3184       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3185                                  self.op.instance_name)
3186     remote_info = self.rpc.call_instance_info(instance.primary_node,
3187                                               instance.name,
3188                                               instance.hypervisor)
3189     remote_info.Raise()
3190     if remote_info.data:
3191       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3192                                  (self.op.instance_name,
3193                                   instance.primary_node))
3194
3195     self.op.os_type = getattr(self.op, "os_type", None)
3196     if self.op.os_type is not None:
3197       # OS verification
3198       pnode = self.cfg.GetNodeInfo(
3199         self.cfg.ExpandNodeName(instance.primary_node))
3200       if pnode is None:
3201         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3202                                    self.op.pnode)
3203       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3204       result.Raise()
3205       if not isinstance(result.data, objects.OS):
3206         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3207                                    " primary node"  % self.op.os_type)
3208
3209     self.instance = instance
3210
3211   def Exec(self, feedback_fn):
3212     """Reinstall the instance.
3213
3214     """
3215     inst = self.instance
3216
3217     if self.op.os_type is not None:
3218       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3219       inst.os = self.op.os_type
3220       self.cfg.Update(inst)
3221
3222     _StartInstanceDisks(self, inst, None)
3223     try:
3224       feedback_fn("Running the instance OS create scripts...")
3225       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3226       msg = result.RemoteFailMsg()
3227       if msg:
3228         raise errors.OpExecError("Could not install OS for instance %s"
3229                                  " on node %s: %s" %
3230                                  (inst.name, inst.primary_node, msg))
3231     finally:
3232       _ShutdownInstanceDisks(self, inst)
3233
3234
3235 class LURenameInstance(LogicalUnit):
3236   """Rename an instance.
3237
3238   """
3239   HPATH = "instance-rename"
3240   HTYPE = constants.HTYPE_INSTANCE
3241   _OP_REQP = ["instance_name", "new_name"]
3242
3243   def BuildHooksEnv(self):
3244     """Build hooks env.
3245
3246     This runs on master, primary and secondary nodes of the instance.
3247
3248     """
3249     env = _BuildInstanceHookEnvByObject(self, self.instance)
3250     env["INSTANCE_NEW_NAME"] = self.op.new_name
3251     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3252     return env, nl, nl
3253
3254   def CheckPrereq(self):
3255     """Check prerequisites.
3256
3257     This checks that the instance is in the cluster and is not running.
3258
3259     """
3260     instance = self.cfg.GetInstanceInfo(
3261       self.cfg.ExpandInstanceName(self.op.instance_name))
3262     if instance is None:
3263       raise errors.OpPrereqError("Instance '%s' not known" %
3264                                  self.op.instance_name)
3265     _CheckNodeOnline(self, instance.primary_node)
3266
3267     if instance.admin_up:
3268       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3269                                  self.op.instance_name)
3270     remote_info = self.rpc.call_instance_info(instance.primary_node,
3271                                               instance.name,
3272                                               instance.hypervisor)
3273     remote_info.Raise()
3274     if remote_info.data:
3275       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3276                                  (self.op.instance_name,
3277                                   instance.primary_node))
3278     self.instance = instance
3279
3280     # new name verification
3281     name_info = utils.HostInfo(self.op.new_name)
3282
3283     self.op.new_name = new_name = name_info.name
3284     instance_list = self.cfg.GetInstanceList()
3285     if new_name in instance_list:
3286       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3287                                  new_name)
3288
3289     if not getattr(self.op, "ignore_ip", False):
3290       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3291         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3292                                    (name_info.ip, new_name))
3293
3294
3295   def Exec(self, feedback_fn):
3296     """Reinstall the instance.
3297
3298     """
3299     inst = self.instance
3300     old_name = inst.name
3301
3302     if inst.disk_template == constants.DT_FILE:
3303       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3304
3305     self.cfg.RenameInstance(inst.name, self.op.new_name)
3306     # Change the instance lock. This is definitely safe while we hold the BGL
3307     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3308     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3309
3310     # re-read the instance from the configuration after rename
3311     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3312
3313     if inst.disk_template == constants.DT_FILE:
3314       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3315       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3316                                                      old_file_storage_dir,
3317                                                      new_file_storage_dir)
3318       result.Raise()
3319       if not result.data:
3320         raise errors.OpExecError("Could not connect to node '%s' to rename"
3321                                  " directory '%s' to '%s' (but the instance"
3322                                  " has been renamed in Ganeti)" % (
3323                                  inst.primary_node, old_file_storage_dir,
3324                                  new_file_storage_dir))
3325
3326       if not result.data[0]:
3327         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3328                                  " (but the instance has been renamed in"
3329                                  " Ganeti)" % (old_file_storage_dir,
3330                                                new_file_storage_dir))
3331
3332     _StartInstanceDisks(self, inst, None)
3333     try:
3334       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3335                                                  old_name)
3336       msg = result.RemoteFailMsg()
3337       if msg:
3338         msg = ("Could not run OS rename script for instance %s on node %s"
3339                " (but the instance has been renamed in Ganeti): %s" %
3340                (inst.name, inst.primary_node, msg))
3341         self.proc.LogWarning(msg)
3342     finally:
3343       _ShutdownInstanceDisks(self, inst)
3344
3345
3346 class LURemoveInstance(LogicalUnit):
3347   """Remove an instance.
3348
3349   """
3350   HPATH = "instance-remove"
3351   HTYPE = constants.HTYPE_INSTANCE
3352   _OP_REQP = ["instance_name", "ignore_failures"]
3353   REQ_BGL = False
3354
3355   def ExpandNames(self):
3356     self._ExpandAndLockInstance()
3357     self.needed_locks[locking.LEVEL_NODE] = []
3358     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3359
3360   def DeclareLocks(self, level):
3361     if level == locking.LEVEL_NODE:
3362       self._LockInstancesNodes()
3363
3364   def BuildHooksEnv(self):
3365     """Build hooks env.
3366
3367     This runs on master, primary and secondary nodes of the instance.
3368
3369     """
3370     env = _BuildInstanceHookEnvByObject(self, self.instance)
3371     nl = [self.cfg.GetMasterNode()]
3372     return env, nl, nl
3373
3374   def CheckPrereq(self):
3375     """Check prerequisites.
3376
3377     This checks that the instance is in the cluster.
3378
3379     """
3380     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3381     assert self.instance is not None, \
3382       "Cannot retrieve locked instance %s" % self.op.instance_name
3383
3384   def Exec(self, feedback_fn):
3385     """Remove the instance.
3386
3387     """
3388     instance = self.instance
3389     logging.info("Shutting down instance %s on node %s",
3390                  instance.name, instance.primary_node)
3391
3392     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3393     msg = result.RemoteFailMsg()
3394     if msg:
3395       if self.op.ignore_failures:
3396         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3397       else:
3398         raise errors.OpExecError("Could not shutdown instance %s on"
3399                                  " node %s: %s" %
3400                                  (instance.name, instance.primary_node, msg))
3401
3402     logging.info("Removing block devices for instance %s", instance.name)
3403
3404     if not _RemoveDisks(self, instance):
3405       if self.op.ignore_failures:
3406         feedback_fn("Warning: can't remove instance's disks")
3407       else:
3408         raise errors.OpExecError("Can't remove instance's disks")
3409
3410     logging.info("Removing instance %s out of cluster config", instance.name)
3411
3412     self.cfg.RemoveInstance(instance.name)
3413     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3414
3415
3416 class LUQueryInstances(NoHooksLU):
3417   """Logical unit for querying instances.
3418
3419   """
3420   _OP_REQP = ["output_fields", "names", "use_locking"]
3421   REQ_BGL = False
3422   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3423                                     "admin_state",
3424                                     "disk_template", "ip", "mac", "bridge",
3425                                     "sda_size", "sdb_size", "vcpus", "tags",
3426                                     "network_port", "beparams",
3427                                     r"(disk)\.(size)/([0-9]+)",
3428                                     r"(disk)\.(sizes)", "disk_usage",
3429                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3430                                     r"(nic)\.(macs|ips|bridges)",
3431                                     r"(disk|nic)\.(count)",
3432                                     "serial_no", "hypervisor", "hvparams",] +
3433                                   ["hv/%s" % name
3434                                    for name in constants.HVS_PARAMETERS] +
3435                                   ["be/%s" % name
3436                                    for name in constants.BES_PARAMETERS])
3437   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3438
3439
3440   def ExpandNames(self):
3441     _CheckOutputFields(static=self._FIELDS_STATIC,
3442                        dynamic=self._FIELDS_DYNAMIC,
3443                        selected=self.op.output_fields)
3444
3445     self.needed_locks = {}
3446     self.share_locks[locking.LEVEL_INSTANCE] = 1
3447     self.share_locks[locking.LEVEL_NODE] = 1
3448
3449     if self.op.names:
3450       self.wanted = _GetWantedInstances(self, self.op.names)
3451     else:
3452       self.wanted = locking.ALL_SET
3453
3454     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3455     self.do_locking = self.do_node_query and self.op.use_locking
3456     if self.do_locking:
3457       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3458       self.needed_locks[locking.LEVEL_NODE] = []
3459       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3460
3461   def DeclareLocks(self, level):
3462     if level == locking.LEVEL_NODE and self.do_locking:
3463       self._LockInstancesNodes()
3464
3465   def CheckPrereq(self):
3466     """Check prerequisites.
3467
3468     """
3469     pass
3470
3471   def Exec(self, feedback_fn):
3472     """Computes the list of nodes and their attributes.
3473
3474     """
3475     all_info = self.cfg.GetAllInstancesInfo()
3476     if self.wanted == locking.ALL_SET:
3477       # caller didn't specify instance names, so ordering is not important
3478       if self.do_locking:
3479         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3480       else:
3481         instance_names = all_info.keys()
3482       instance_names = utils.NiceSort(instance_names)
3483     else:
3484       # caller did specify names, so we must keep the ordering
3485       if self.do_locking:
3486         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3487       else:
3488         tgt_set = all_info.keys()
3489       missing = set(self.wanted).difference(tgt_set)
3490       if missing:
3491         raise errors.OpExecError("Some instances were removed before"
3492                                  " retrieving their data: %s" % missing)
3493       instance_names = self.wanted
3494
3495     instance_list = [all_info[iname] for iname in instance_names]
3496
3497     # begin data gathering
3498
3499     nodes = frozenset([inst.primary_node for inst in instance_list])
3500     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3501
3502     bad_nodes = []
3503     off_nodes = []
3504     if self.do_node_query:
3505       live_data = {}
3506       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3507       for name in nodes:
3508         result = node_data[name]
3509         if result.offline:
3510           # offline nodes will be in both lists
3511           off_nodes.append(name)
3512         if result.failed:
3513           bad_nodes.append(name)
3514         else:
3515           if result.data:
3516             live_data.update(result.data)
3517             # else no instance is alive
3518     else:
3519       live_data = dict([(name, {}) for name in instance_names])
3520
3521     # end data gathering
3522
3523     HVPREFIX = "hv/"
3524     BEPREFIX = "be/"
3525     output = []
3526     for instance in instance_list:
3527       iout = []
3528       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3529       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3530       for field in self.op.output_fields:
3531         st_match = self._FIELDS_STATIC.Matches(field)
3532         if field == "name":
3533           val = instance.name
3534         elif field == "os":
3535           val = instance.os
3536         elif field == "pnode":
3537           val = instance.primary_node
3538         elif field == "snodes":
3539           val = list(instance.secondary_nodes)
3540         elif field == "admin_state":
3541           val = instance.admin_up
3542         elif field == "oper_state":
3543           if instance.primary_node in bad_nodes:
3544             val = None
3545           else:
3546             val = bool(live_data.get(instance.name))
3547         elif field == "status":
3548           if instance.primary_node in off_nodes:
3549             val = "ERROR_nodeoffline"
3550           elif instance.primary_node in bad_nodes:
3551             val = "ERROR_nodedown"
3552           else:
3553             running = bool(live_data.get(instance.name))
3554             if running:
3555               if instance.admin_up:
3556                 val = "running"
3557               else:
3558                 val = "ERROR_up"
3559             else:
3560               if instance.admin_up:
3561                 val = "ERROR_down"
3562               else:
3563                 val = "ADMIN_down"
3564         elif field == "oper_ram":
3565           if instance.primary_node in bad_nodes:
3566             val = None
3567           elif instance.name in live_data:
3568             val = live_data[instance.name].get("memory", "?")
3569           else:
3570             val = "-"
3571         elif field == "vcpus":
3572           val = i_be[constants.BE_VCPUS]
3573         elif field == "disk_template":
3574           val = instance.disk_template
3575         elif field == "ip":
3576           if instance.nics:
3577             val = instance.nics[0].ip
3578           else:
3579             val = None
3580         elif field == "bridge":
3581           if instance.nics:
3582             val = instance.nics[0].bridge
3583           else:
3584             val = None
3585         elif field == "mac":
3586           if instance.nics:
3587             val = instance.nics[0].mac
3588           else:
3589             val = None
3590         elif field == "sda_size" or field == "sdb_size":
3591           idx = ord(field[2]) - ord('a')
3592           try:
3593             val = instance.FindDisk(idx).size
3594           except errors.OpPrereqError:
3595             val = None
3596         elif field == "disk_usage": # total disk usage per node
3597           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3598           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3599         elif field == "tags":
3600           val = list(instance.GetTags())
3601         elif field == "serial_no":
3602           val = instance.serial_no
3603         elif field == "network_port":
3604           val = instance.network_port
3605         elif field == "hypervisor":
3606           val = instance.hypervisor
3607         elif field == "hvparams":
3608           val = i_hv
3609         elif (field.startswith(HVPREFIX) and
3610               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3611           val = i_hv.get(field[len(HVPREFIX):], None)
3612         elif field == "beparams":
3613           val = i_be
3614         elif (field.startswith(BEPREFIX) and
3615               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3616           val = i_be.get(field[len(BEPREFIX):], None)
3617         elif st_match and st_match.groups():
3618           # matches a variable list
3619           st_groups = st_match.groups()
3620           if st_groups and st_groups[0] == "disk":
3621             if st_groups[1] == "count":
3622               val = len(instance.disks)
3623             elif st_groups[1] == "sizes":
3624               val = [disk.size for disk in instance.disks]
3625             elif st_groups[1] == "size":
3626               try:
3627                 val = instance.FindDisk(st_groups[2]).size
3628               except errors.OpPrereqError:
3629                 val = None
3630             else:
3631               assert False, "Unhandled disk parameter"
3632           elif st_groups[0] == "nic":
3633             if st_groups[1] == "count":
3634               val = len(instance.nics)
3635             elif st_groups[1] == "macs":
3636               val = [nic.mac for nic in instance.nics]
3637             elif st_groups[1] == "ips":
3638               val = [nic.ip for nic in instance.nics]
3639             elif st_groups[1] == "bridges":
3640               val = [nic.bridge for nic in instance.nics]
3641             else:
3642               # index-based item
3643               nic_idx = int(st_groups[2])
3644               if nic_idx >= len(instance.nics):
3645                 val = None
3646               else:
3647                 if st_groups[1] == "mac":
3648                   val = instance.nics[nic_idx].mac
3649                 elif st_groups[1] == "ip":
3650                   val = instance.nics[nic_idx].ip
3651                 elif st_groups[1] == "bridge":
3652                   val = instance.nics[nic_idx].bridge
3653                 else:
3654                   assert False, "Unhandled NIC parameter"
3655           else:
3656             assert False, ("Declared but unhandled variable parameter '%s'" %
3657                            field)
3658         else:
3659           assert False, "Declared but unhandled parameter '%s'" % field
3660         iout.append(val)
3661       output.append(iout)
3662
3663     return output
3664
3665
3666 class LUFailoverInstance(LogicalUnit):
3667   """Failover an instance.
3668
3669   """
3670   HPATH = "instance-failover"
3671   HTYPE = constants.HTYPE_INSTANCE
3672   _OP_REQP = ["instance_name", "ignore_consistency"]
3673   REQ_BGL = False
3674
3675   def ExpandNames(self):
3676     self._ExpandAndLockInstance()
3677     self.needed_locks[locking.LEVEL_NODE] = []
3678     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3679
3680   def DeclareLocks(self, level):
3681     if level == locking.LEVEL_NODE:
3682       self._LockInstancesNodes()
3683
3684   def BuildHooksEnv(self):
3685     """Build hooks env.
3686
3687     This runs on master, primary and secondary nodes of the instance.
3688
3689     """
3690     env = {
3691       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3692       }
3693     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3694     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3695     return env, nl, nl
3696
3697   def CheckPrereq(self):
3698     """Check prerequisites.
3699
3700     This checks that the instance is in the cluster.
3701
3702     """
3703     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3704     assert self.instance is not None, \
3705       "Cannot retrieve locked instance %s" % self.op.instance_name
3706
3707     bep = self.cfg.GetClusterInfo().FillBE(instance)
3708     if instance.disk_template not in constants.DTS_NET_MIRROR:
3709       raise errors.OpPrereqError("Instance's disk layout is not"
3710                                  " network mirrored, cannot failover.")
3711
3712     secondary_nodes = instance.secondary_nodes
3713     if not secondary_nodes:
3714       raise errors.ProgrammerError("no secondary node but using "
3715                                    "a mirrored disk template")
3716
3717     target_node = secondary_nodes[0]
3718     _CheckNodeOnline(self, target_node)
3719     _CheckNodeNotDrained(self, target_node)
3720
3721     if instance.admin_up:
3722       # check memory requirements on the secondary node
3723       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3724                            instance.name, bep[constants.BE_MEMORY],
3725                            instance.hypervisor)
3726     else:
3727       self.LogInfo("Not checking memory on the secondary node as"
3728                    " instance will not be started")
3729
3730     # check bridge existence
3731     brlist = [nic.bridge for nic in instance.nics]
3732     result = self.rpc.call_bridges_exist(target_node, brlist)
3733     result.Raise()
3734     if not result.data:
3735       raise errors.OpPrereqError("One or more target bridges %s does not"
3736                                  " exist on destination node '%s'" %
3737                                  (brlist, target_node))
3738
3739   def Exec(self, feedback_fn):
3740     """Failover an instance.
3741
3742     The failover is done by shutting it down on its present node and
3743     starting it on the secondary.
3744
3745     """
3746     instance = self.instance
3747
3748     source_node = instance.primary_node
3749     target_node = instance.secondary_nodes[0]
3750
3751     feedback_fn("* checking disk consistency between source and target")
3752     for dev in instance.disks:
3753       # for drbd, these are drbd over lvm
3754       if not _CheckDiskConsistency(self, dev, target_node, False):
3755         if instance.admin_up and not self.op.ignore_consistency:
3756           raise errors.OpExecError("Disk %s is degraded on target node,"
3757                                    " aborting failover." % dev.iv_name)
3758
3759     feedback_fn("* shutting down instance on source node")
3760     logging.info("Shutting down instance %s on node %s",
3761                  instance.name, source_node)
3762
3763     result = self.rpc.call_instance_shutdown(source_node, instance)
3764     msg = result.RemoteFailMsg()
3765     if msg:
3766       if self.op.ignore_consistency:
3767         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3768                              " Proceeding anyway. Please make sure node"
3769                              " %s is down. Error details: %s",
3770                              instance.name, source_node, source_node, msg)
3771       else:
3772         raise errors.OpExecError("Could not shutdown instance %s on"
3773                                  " node %s: %s" %
3774                                  (instance.name, source_node, msg))
3775
3776     feedback_fn("* deactivating the instance's disks on source node")
3777     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3778       raise errors.OpExecError("Can't shut down the instance's disks.")
3779
3780     instance.primary_node = target_node
3781     # distribute new instance config to the other nodes
3782     self.cfg.Update(instance)
3783
3784     # Only start the instance if it's marked as up
3785     if instance.admin_up:
3786       feedback_fn("* activating the instance's disks on target node")
3787       logging.info("Starting instance %s on node %s",
3788                    instance.name, target_node)
3789
3790       disks_ok, _ = _AssembleInstanceDisks(self, instance,
3791                                                ignore_secondaries=True)
3792       if not disks_ok:
3793         _ShutdownInstanceDisks(self, instance)
3794         raise errors.OpExecError("Can't activate the instance's disks")
3795
3796       feedback_fn("* starting the instance on the target node")
3797       result = self.rpc.call_instance_start(target_node, instance, None, None)
3798       msg = result.RemoteFailMsg()
3799       if msg:
3800         _ShutdownInstanceDisks(self, instance)
3801         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3802                                  (instance.name, target_node, msg))
3803
3804
3805 class LUMigrateInstance(LogicalUnit):
3806   """Migrate an instance.
3807
3808   This is migration without shutting down, compared to the failover,
3809   which is done with shutdown.
3810
3811   """
3812   HPATH = "instance-migrate"
3813   HTYPE = constants.HTYPE_INSTANCE
3814   _OP_REQP = ["instance_name", "live", "cleanup"]
3815
3816   REQ_BGL = False
3817
3818   def ExpandNames(self):
3819     self._ExpandAndLockInstance()
3820     self.needed_locks[locking.LEVEL_NODE] = []
3821     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3822
3823   def DeclareLocks(self, level):
3824     if level == locking.LEVEL_NODE:
3825       self._LockInstancesNodes()
3826
3827   def BuildHooksEnv(self):
3828     """Build hooks env.
3829
3830     This runs on master, primary and secondary nodes of the instance.
3831
3832     """
3833     env = _BuildInstanceHookEnvByObject(self, self.instance)
3834     env["MIGRATE_LIVE"] = self.op.live
3835     env["MIGRATE_CLEANUP"] = self.op.cleanup
3836     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3837     return env, nl, nl
3838
3839   def CheckPrereq(self):
3840     """Check prerequisites.
3841
3842     This checks that the instance is in the cluster.
3843
3844     """
3845     instance = self.cfg.GetInstanceInfo(
3846       self.cfg.ExpandInstanceName(self.op.instance_name))
3847     if instance is None:
3848       raise errors.OpPrereqError("Instance '%s' not known" %
3849                                  self.op.instance_name)
3850
3851     if instance.disk_template != constants.DT_DRBD8:
3852       raise errors.OpPrereqError("Instance's disk layout is not"
3853                                  " drbd8, cannot migrate.")
3854
3855     secondary_nodes = instance.secondary_nodes
3856     if not secondary_nodes:
3857       raise errors.ConfigurationError("No secondary node but using"
3858                                       " drbd8 disk template")
3859
3860     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3861
3862     target_node = secondary_nodes[0]
3863     # check memory requirements on the secondary node
3864     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3865                          instance.name, i_be[constants.BE_MEMORY],
3866                          instance.hypervisor)
3867
3868     # check bridge existence
3869     brlist = [nic.bridge for nic in instance.nics]
3870     result = self.rpc.call_bridges_exist(target_node, brlist)
3871     if result.failed or not result.data:
3872       raise errors.OpPrereqError("One or more target bridges %s does not"
3873                                  " exist on destination node '%s'" %
3874                                  (brlist, target_node))
3875
3876     if not self.op.cleanup:
3877       _CheckNodeNotDrained(self, target_node)
3878       result = self.rpc.call_instance_migratable(instance.primary_node,
3879                                                  instance)
3880       msg = result.RemoteFailMsg()
3881       if msg:
3882         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3883                                    msg)
3884
3885     self.instance = instance
3886
3887   def _WaitUntilSync(self):
3888     """Poll with custom rpc for disk sync.
3889
3890     This uses our own step-based rpc call.
3891
3892     """
3893     self.feedback_fn("* wait until resync is done")
3894     all_done = False
3895     while not all_done:
3896       all_done = True
3897       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3898                                             self.nodes_ip,
3899                                             self.instance.disks)
3900       min_percent = 100
3901       for node, nres in result.items():
3902         msg = nres.RemoteFailMsg()
3903         if msg:
3904           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3905                                    (node, msg))
3906         node_done, node_percent = nres.payload
3907         all_done = all_done and node_done
3908         if node_percent is not None:
3909           min_percent = min(min_percent, node_percent)
3910       if not all_done:
3911         if min_percent < 100:
3912           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3913         time.sleep(2)
3914
3915   def _EnsureSecondary(self, node):
3916     """Demote a node to secondary.
3917
3918     """
3919     self.feedback_fn("* switching node %s to secondary mode" % node)
3920
3921     for dev in self.instance.disks:
3922       self.cfg.SetDiskID(dev, node)
3923
3924     result = self.rpc.call_blockdev_close(node, self.instance.name,
3925                                           self.instance.disks)
3926     msg = result.RemoteFailMsg()
3927     if msg:
3928       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3929                                " error %s" % (node, msg))
3930
3931   def _GoStandalone(self):
3932     """Disconnect from the network.
3933
3934     """
3935     self.feedback_fn("* changing into standalone mode")
3936     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3937                                                self.instance.disks)
3938     for node, nres in result.items():
3939       msg = nres.RemoteFailMsg()
3940       if msg:
3941         raise errors.OpExecError("Cannot disconnect disks node %s,"
3942                                  " error %s" % (node, msg))
3943
3944   def _GoReconnect(self, multimaster):
3945     """Reconnect to the network.
3946
3947     """
3948     if multimaster:
3949       msg = "dual-master"
3950     else:
3951       msg = "single-master"
3952     self.feedback_fn("* changing disks into %s mode" % msg)
3953     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3954                                            self.instance.disks,
3955                                            self.instance.name, multimaster)
3956     for node, nres in result.items():
3957       msg = nres.RemoteFailMsg()
3958       if msg:
3959         raise errors.OpExecError("Cannot change disks config on node %s,"
3960                                  " error: %s" % (node, msg))
3961
3962   def _ExecCleanup(self):
3963     """Try to cleanup after a failed migration.
3964
3965     The cleanup is done by:
3966       - check that the instance is running only on one node
3967         (and update the config if needed)
3968       - change disks on its secondary node to secondary
3969       - wait until disks are fully synchronized
3970       - disconnect from the network
3971       - change disks into single-master mode
3972       - wait again until disks are fully synchronized
3973
3974     """
3975     instance = self.instance
3976     target_node = self.target_node
3977     source_node = self.source_node
3978
3979     # check running on only one node
3980     self.feedback_fn("* checking where the instance actually runs"
3981                      " (if this hangs, the hypervisor might be in"
3982                      " a bad state)")
3983     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3984     for node, result in ins_l.items():
3985       result.Raise()
3986       if not isinstance(result.data, list):
3987         raise errors.OpExecError("Can't contact node '%s'" % node)
3988
3989     runningon_source = instance.name in ins_l[source_node].data
3990     runningon_target = instance.name in ins_l[target_node].data
3991
3992     if runningon_source and runningon_target:
3993       raise errors.OpExecError("Instance seems to be running on two nodes,"
3994                                " or the hypervisor is confused. You will have"
3995                                " to ensure manually that it runs only on one"
3996                                " and restart this operation.")
3997
3998     if not (runningon_source or runningon_target):
3999       raise errors.OpExecError("Instance does not seem to be running at all."
4000                                " In this case, it's safer to repair by"
4001                                " running 'gnt-instance stop' to ensure disk"
4002                                " shutdown, and then restarting it.")
4003
4004     if runningon_target:
4005       # the migration has actually succeeded, we need to update the config
4006       self.feedback_fn("* instance running on secondary node (%s),"
4007                        " updating config" % target_node)
4008       instance.primary_node = target_node
4009       self.cfg.Update(instance)
4010       demoted_node = source_node
4011     else:
4012       self.feedback_fn("* instance confirmed to be running on its"
4013                        " primary node (%s)" % source_node)
4014       demoted_node = target_node
4015
4016     self._EnsureSecondary(demoted_node)
4017     try:
4018       self._WaitUntilSync()
4019     except errors.OpExecError:
4020       # we ignore here errors, since if the device is standalone, it
4021       # won't be able to sync
4022       pass
4023     self._GoStandalone()
4024     self._GoReconnect(False)
4025     self._WaitUntilSync()
4026
4027     self.feedback_fn("* done")
4028
4029   def _RevertDiskStatus(self):
4030     """Try to revert the disk status after a failed migration.
4031
4032     """
4033     target_node = self.target_node
4034     try:
4035       self._EnsureSecondary(target_node)
4036       self._GoStandalone()
4037       self._GoReconnect(False)
4038       self._WaitUntilSync()
4039     except errors.OpExecError, err:
4040       self.LogWarning("Migration failed and I can't reconnect the"
4041                       " drives: error '%s'\n"
4042                       "Please look and recover the instance status" %
4043                       str(err))
4044
4045   def _AbortMigration(self):
4046     """Call the hypervisor code to abort a started migration.
4047
4048     """
4049     instance = self.instance
4050     target_node = self.target_node
4051     migration_info = self.migration_info
4052
4053     abort_result = self.rpc.call_finalize_migration(target_node,
4054                                                     instance,
4055                                                     migration_info,
4056                                                     False)
4057     abort_msg = abort_result.RemoteFailMsg()
4058     if abort_msg:
4059       logging.error("Aborting migration failed on target node %s: %s" %
4060                     (target_node, abort_msg))
4061       # Don't raise an exception here, as we stil have to try to revert the
4062       # disk status, even if this step failed.
4063
4064   def _ExecMigration(self):
4065     """Migrate an instance.
4066
4067     The migrate is done by:
4068       - change the disks into dual-master mode
4069       - wait until disks are fully synchronized again
4070       - migrate the instance
4071       - change disks on the new secondary node (the old primary) to secondary
4072       - wait until disks are fully synchronized
4073       - change disks into single-master mode
4074
4075     """
4076     instance = self.instance
4077     target_node = self.target_node
4078     source_node = self.source_node
4079
4080     self.feedback_fn("* checking disk consistency between source and target")
4081     for dev in instance.disks:
4082       if not _CheckDiskConsistency(self, dev, target_node, False):
4083         raise errors.OpExecError("Disk %s is degraded or not fully"
4084                                  " synchronized on target node,"
4085                                  " aborting migrate." % dev.iv_name)
4086
4087     # First get the migration information from the remote node
4088     result = self.rpc.call_migration_info(source_node, instance)
4089     msg = result.RemoteFailMsg()
4090     if msg:
4091       log_err = ("Failed fetching source migration information from %s: %s" %
4092                  (source_node, msg))
4093       logging.error(log_err)
4094       raise errors.OpExecError(log_err)
4095
4096     self.migration_info = migration_info = result.payload
4097
4098     # Then switch the disks to master/master mode
4099     self._EnsureSecondary(target_node)
4100     self._GoStandalone()
4101     self._GoReconnect(True)
4102     self._WaitUntilSync()
4103
4104     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4105     result = self.rpc.call_accept_instance(target_node,
4106                                            instance,
4107                                            migration_info,
4108                                            self.nodes_ip[target_node])
4109
4110     msg = result.RemoteFailMsg()
4111     if msg:
4112       logging.error("Instance pre-migration failed, trying to revert"
4113                     " disk status: %s", msg)
4114       self._AbortMigration()
4115       self._RevertDiskStatus()
4116       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4117                                (instance.name, msg))
4118
4119     self.feedback_fn("* migrating instance to %s" % target_node)
4120     time.sleep(10)
4121     result = self.rpc.call_instance_migrate(source_node, instance,
4122                                             self.nodes_ip[target_node],
4123                                             self.op.live)
4124     msg = result.RemoteFailMsg()
4125     if msg:
4126       logging.error("Instance migration failed, trying to revert"
4127                     " disk status: %s", msg)
4128       self._AbortMigration()
4129       self._RevertDiskStatus()
4130       raise errors.OpExecError("Could not migrate instance %s: %s" %
4131                                (instance.name, msg))
4132     time.sleep(10)
4133
4134     instance.primary_node = target_node
4135     # distribute new instance config to the other nodes
4136     self.cfg.Update(instance)
4137
4138     result = self.rpc.call_finalize_migration(target_node,
4139                                               instance,
4140                                               migration_info,
4141                                               True)
4142     msg = result.RemoteFailMsg()
4143     if msg:
4144       logging.error("Instance migration succeeded, but finalization failed:"
4145                     " %s" % msg)
4146       raise errors.OpExecError("Could not finalize instance migration: %s" %
4147                                msg)
4148
4149     self._EnsureSecondary(source_node)
4150     self._WaitUntilSync()
4151     self._GoStandalone()
4152     self._GoReconnect(False)
4153     self._WaitUntilSync()
4154
4155     self.feedback_fn("* done")
4156
4157   def Exec(self, feedback_fn):
4158     """Perform the migration.
4159
4160     """
4161     self.feedback_fn = feedback_fn
4162
4163     self.source_node = self.instance.primary_node
4164     self.target_node = self.instance.secondary_nodes[0]
4165     self.all_nodes = [self.source_node, self.target_node]
4166     self.nodes_ip = {
4167       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4168       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4169       }
4170     if self.op.cleanup:
4171       return self._ExecCleanup()
4172     else:
4173       return self._ExecMigration()
4174
4175
4176 def _CreateBlockDev(lu, node, instance, device, force_create,
4177                     info, force_open):
4178   """Create a tree of block devices on a given node.
4179
4180   If this device type has to be created on secondaries, create it and
4181   all its children.
4182
4183   If not, just recurse to children keeping the same 'force' value.
4184
4185   @param lu: the lu on whose behalf we execute
4186   @param node: the node on which to create the device
4187   @type instance: L{objects.Instance}
4188   @param instance: the instance which owns the device
4189   @type device: L{objects.Disk}
4190   @param device: the device to create
4191   @type force_create: boolean
4192   @param force_create: whether to force creation of this device; this
4193       will be change to True whenever we find a device which has
4194       CreateOnSecondary() attribute
4195   @param info: the extra 'metadata' we should attach to the device
4196       (this will be represented as a LVM tag)
4197   @type force_open: boolean
4198   @param force_open: this parameter will be passes to the
4199       L{backend.BlockdevCreate} function where it specifies
4200       whether we run on primary or not, and it affects both
4201       the child assembly and the device own Open() execution
4202
4203   """
4204   if device.CreateOnSecondary():
4205     force_create = True
4206
4207   if device.children:
4208     for child in device.children:
4209       _CreateBlockDev(lu, node, instance, child, force_create,
4210                       info, force_open)
4211
4212   if not force_create:
4213     return
4214
4215   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4216
4217
4218 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4219   """Create a single block device on a given node.
4220
4221   This will not recurse over children of the device, so they must be
4222   created in advance.
4223
4224   @param lu: the lu on whose behalf we execute
4225   @param node: the node on which to create the device
4226   @type instance: L{objects.Instance}
4227   @param instance: the instance which owns the device
4228   @type device: L{objects.Disk}
4229   @param device: the device to create
4230   @param info: the extra 'metadata' we should attach to the device
4231       (this will be represented as a LVM tag)
4232   @type force_open: boolean
4233   @param force_open: this parameter will be passes to the
4234       L{backend.BlockdevCreate} function where it specifies
4235       whether we run on primary or not, and it affects both
4236       the child assembly and the device own Open() execution
4237
4238   """
4239   lu.cfg.SetDiskID(device, node)
4240   result = lu.rpc.call_blockdev_create(node, device, device.size,
4241                                        instance.name, force_open, info)
4242   msg = result.RemoteFailMsg()
4243   if msg:
4244     raise errors.OpExecError("Can't create block device %s on"
4245                              " node %s for instance %s: %s" %
4246                              (device, node, instance.name, msg))
4247   if device.physical_id is None:
4248     device.physical_id = result.payload
4249
4250
4251 def _GenerateUniqueNames(lu, exts):
4252   """Generate a suitable LV name.
4253
4254   This will generate a logical volume name for the given instance.
4255
4256   """
4257   results = []
4258   for val in exts:
4259     new_id = lu.cfg.GenerateUniqueID()
4260     results.append("%s%s" % (new_id, val))
4261   return results
4262
4263
4264 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4265                          p_minor, s_minor):
4266   """Generate a drbd8 device complete with its children.
4267
4268   """
4269   port = lu.cfg.AllocatePort()
4270   vgname = lu.cfg.GetVGName()
4271   shared_secret = lu.cfg.GenerateDRBDSecret()
4272   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4273                           logical_id=(vgname, names[0]))
4274   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4275                           logical_id=(vgname, names[1]))
4276   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4277                           logical_id=(primary, secondary, port,
4278                                       p_minor, s_minor,
4279                                       shared_secret),
4280                           children=[dev_data, dev_meta],
4281                           iv_name=iv_name)
4282   return drbd_dev
4283
4284
4285 def _GenerateDiskTemplate(lu, template_name,
4286                           instance_name, primary_node,
4287                           secondary_nodes, disk_info,
4288                           file_storage_dir, file_driver,
4289                           base_index):
4290   """Generate the entire disk layout for a given template type.
4291
4292   """
4293   #TODO: compute space requirements
4294
4295   vgname = lu.cfg.GetVGName()
4296   disk_count = len(disk_info)
4297   disks = []
4298   if template_name == constants.DT_DISKLESS:
4299     pass
4300   elif template_name == constants.DT_PLAIN:
4301     if len(secondary_nodes) != 0:
4302       raise errors.ProgrammerError("Wrong template configuration")
4303
4304     names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4305                                       for i in range(disk_count)])
4306     for idx, disk in enumerate(disk_info):
4307       disk_index = idx + base_index
4308       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4309                               logical_id=(vgname, names[idx]),
4310                               iv_name="disk/%d" % disk_index,
4311                               mode=disk["mode"])
4312       disks.append(disk_dev)
4313   elif template_name == constants.DT_DRBD8:
4314     if len(secondary_nodes) != 1:
4315       raise errors.ProgrammerError("Wrong template configuration")
4316     remote_node = secondary_nodes[0]
4317     minors = lu.cfg.AllocateDRBDMinor(
4318       [primary_node, remote_node] * len(disk_info), instance_name)
4319
4320     names = []
4321     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4322                                                for i in range(disk_count)]):
4323       names.append(lv_prefix + "_data")
4324       names.append(lv_prefix + "_meta")
4325     for idx, disk in enumerate(disk_info):
4326       disk_index = idx + base_index
4327       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4328                                       disk["size"], names[idx*2:idx*2+2],
4329                                       "disk/%d" % disk_index,
4330                                       minors[idx*2], minors[idx*2+1])
4331       disk_dev.mode = disk["mode"]
4332       disks.append(disk_dev)
4333   elif template_name == constants.DT_FILE:
4334     if len(secondary_nodes) != 0:
4335       raise errors.ProgrammerError("Wrong template configuration")
4336
4337     for idx, disk in enumerate(disk_info):
4338       disk_index = idx + base_index
4339       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4340                               iv_name="disk/%d" % disk_index,
4341                               logical_id=(file_driver,
4342                                           "%s/disk%d" % (file_storage_dir,
4343                                                          disk_index)),
4344                               mode=disk["mode"])
4345       disks.append(disk_dev)
4346   else:
4347     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4348   return disks
4349
4350
4351 def _GetInstanceInfoText(instance):
4352   """Compute that text that should be added to the disk's metadata.
4353
4354   """
4355   return "originstname+%s" % instance.name
4356
4357
4358 def _CreateDisks(lu, instance):
4359   """Create all disks for an instance.
4360
4361   This abstracts away some work from AddInstance.
4362
4363   @type lu: L{LogicalUnit}
4364   @param lu: the logical unit on whose behalf we execute
4365   @type instance: L{objects.Instance}
4366   @param instance: the instance whose disks we should create
4367   @rtype: boolean
4368   @return: the success of the creation
4369
4370   """
4371   info = _GetInstanceInfoText(instance)
4372   pnode = instance.primary_node
4373
4374   if instance.disk_template == constants.DT_FILE:
4375     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4376     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4377
4378     if result.failed or not result.data:
4379       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4380
4381     if not result.data[0]:
4382       raise errors.OpExecError("Failed to create directory '%s'" %
4383                                file_storage_dir)
4384
4385   # Note: this needs to be kept in sync with adding of disks in
4386   # LUSetInstanceParams
4387   for device in instance.disks:
4388     logging.info("Creating volume %s for instance %s",
4389                  device.iv_name, instance.name)
4390     #HARDCODE
4391     for node in instance.all_nodes:
4392       f_create = node == pnode
4393       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4394
4395
4396 def _RemoveDisks(lu, instance):
4397   """Remove all disks for an instance.
4398
4399   This abstracts away some work from `AddInstance()` and
4400   `RemoveInstance()`. Note that in case some of the devices couldn't
4401   be removed, the removal will continue with the other ones (compare
4402   with `_CreateDisks()`).
4403
4404   @type lu: L{LogicalUnit}
4405   @param lu: the logical unit on whose behalf we execute
4406   @type instance: L{objects.Instance}
4407   @param instance: the instance whose disks we should remove
4408   @rtype: boolean
4409   @return: the success of the removal
4410
4411   """
4412   logging.info("Removing block devices for instance %s", instance.name)
4413
4414   all_result = True
4415   for device in instance.disks:
4416     for node, disk in device.ComputeNodeTree(instance.primary_node):
4417       lu.cfg.SetDiskID(disk, node)
4418       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4419       if msg:
4420         lu.LogWarning("Could not remove block device %s on node %s,"
4421                       " continuing anyway: %s", device.iv_name, node, msg)
4422         all_result = False
4423
4424   if instance.disk_template == constants.DT_FILE:
4425     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4426     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4427                                                  file_storage_dir)
4428     if result.failed or not result.data:
4429       logging.error("Could not remove directory '%s'", file_storage_dir)
4430       all_result = False
4431
4432   return all_result
4433
4434
4435 def _ComputeDiskSize(disk_template, disks):
4436   """Compute disk size requirements in the volume group
4437
4438   """
4439   # Required free disk space as a function of disk and swap space
4440   req_size_dict = {
4441     constants.DT_DISKLESS: None,
4442     constants.DT_PLAIN: sum(d["size"] for d in disks),
4443     # 128 MB are added for drbd metadata for each disk
4444     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4445     constants.DT_FILE: None,
4446   }
4447
4448   if disk_template not in req_size_dict:
4449     raise errors.ProgrammerError("Disk template '%s' size requirement"
4450                                  " is unknown" %  disk_template)
4451
4452   return req_size_dict[disk_template]
4453
4454
4455 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4456   """Hypervisor parameter validation.
4457
4458   This function abstract the hypervisor parameter validation to be
4459   used in both instance create and instance modify.
4460
4461   @type lu: L{LogicalUnit}
4462   @param lu: the logical unit for which we check
4463   @type nodenames: list
4464   @param nodenames: the list of nodes on which we should check
4465   @type hvname: string
4466   @param hvname: the name of the hypervisor we should use
4467   @type hvparams: dict
4468   @param hvparams: the parameters which we need to check
4469   @raise errors.OpPrereqError: if the parameters are not valid
4470
4471   """
4472   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4473                                                   hvname,
4474                                                   hvparams)
4475   for node in nodenames:
4476     info = hvinfo[node]
4477     if info.offline:
4478       continue
4479     msg = info.RemoteFailMsg()
4480     if msg:
4481       raise errors.OpPrereqError("Hypervisor parameter validation"
4482                                  " failed on node %s: %s" % (node, msg))
4483
4484
4485 class LUCreateInstance(LogicalUnit):
4486   """Create an instance.
4487
4488   """
4489   HPATH = "instance-add"
4490   HTYPE = constants.HTYPE_INSTANCE
4491   _OP_REQP = ["instance_name", "disks", "disk_template",
4492               "mode", "start",
4493               "wait_for_sync", "ip_check", "nics",
4494               "hvparams", "beparams"]
4495   REQ_BGL = False
4496
4497   def _ExpandNode(self, node):
4498     """Expands and checks one node name.
4499
4500     """
4501     node_full = self.cfg.ExpandNodeName(node)
4502     if node_full is None:
4503       raise errors.OpPrereqError("Unknown node %s" % node)
4504     return node_full
4505
4506   def ExpandNames(self):
4507     """ExpandNames for CreateInstance.
4508
4509     Figure out the right locks for instance creation.
4510
4511     """
4512     self.needed_locks = {}
4513
4514     # set optional parameters to none if they don't exist
4515     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4516       if not hasattr(self.op, attr):
4517         setattr(self.op, attr, None)
4518
4519     # cheap checks, mostly valid constants given
4520
4521     # verify creation mode
4522     if self.op.mode not in (constants.INSTANCE_CREATE,
4523                             constants.INSTANCE_IMPORT):
4524       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4525                                  self.op.mode)
4526
4527     # disk template and mirror node verification
4528     if self.op.disk_template not in constants.DISK_TEMPLATES:
4529       raise errors.OpPrereqError("Invalid disk template name")
4530
4531     if self.op.hypervisor is None:
4532       self.op.hypervisor = self.cfg.GetHypervisorType()
4533
4534     cluster = self.cfg.GetClusterInfo()
4535     enabled_hvs = cluster.enabled_hypervisors
4536     if self.op.hypervisor not in enabled_hvs:
4537       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4538                                  " cluster (%s)" % (self.op.hypervisor,
4539                                   ",".join(enabled_hvs)))
4540
4541     # check hypervisor parameter syntax (locally)
4542     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4543     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4544                                   self.op.hvparams)
4545     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4546     hv_type.CheckParameterSyntax(filled_hvp)
4547     self.hv_full = filled_hvp
4548
4549     # fill and remember the beparams dict
4550     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4551     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4552                                     self.op.beparams)
4553
4554     #### instance parameters check
4555
4556     # instance name verification
4557     hostname1 = utils.HostInfo(self.op.instance_name)
4558     self.op.instance_name = instance_name = hostname1.name
4559
4560     # this is just a preventive check, but someone might still add this
4561     # instance in the meantime, and creation will fail at lock-add time
4562     if instance_name in self.cfg.GetInstanceList():
4563       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4564                                  instance_name)
4565
4566     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4567
4568     # NIC buildup
4569     self.nics = []
4570     for nic in self.op.nics:
4571       # ip validity checks
4572       ip = nic.get("ip", None)
4573       if ip is None or ip.lower() == "none":
4574         nic_ip = None
4575       elif ip.lower() == constants.VALUE_AUTO:
4576         nic_ip = hostname1.ip
4577       else:
4578         if not utils.IsValidIP(ip):
4579           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4580                                      " like a valid IP" % ip)
4581         nic_ip = ip
4582
4583       # MAC address verification
4584       mac = nic.get("mac", constants.VALUE_AUTO)
4585       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4586         if not utils.IsValidMac(mac.lower()):
4587           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4588                                      mac)
4589       # bridge verification
4590       bridge = nic.get("bridge", None)
4591       if bridge is None:
4592         bridge = self.cfg.GetDefBridge()
4593       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4594
4595     # disk checks/pre-build
4596     self.disks = []
4597     for disk in self.op.disks:
4598       mode = disk.get("mode", constants.DISK_RDWR)
4599       if mode not in constants.DISK_ACCESS_SET:
4600         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4601                                    mode)
4602       size = disk.get("size", None)
4603       if size is None:
4604         raise errors.OpPrereqError("Missing disk size")
4605       try:
4606         size = int(size)
4607       except ValueError:
4608         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4609       self.disks.append({"size": size, "mode": mode})
4610
4611     # used in CheckPrereq for ip ping check
4612     self.check_ip = hostname1.ip
4613
4614     # file storage checks
4615     if (self.op.file_driver and
4616         not self.op.file_driver in constants.FILE_DRIVER):
4617       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4618                                  self.op.file_driver)
4619
4620     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4621       raise errors.OpPrereqError("File storage directory path not absolute")
4622
4623     ### Node/iallocator related checks
4624     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4625       raise errors.OpPrereqError("One and only one of iallocator and primary"
4626                                  " node must be given")
4627
4628     if self.op.iallocator:
4629       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4630     else:
4631       self.op.pnode = self._ExpandNode(self.op.pnode)
4632       nodelist = [self.op.pnode]
4633       if self.op.snode is not None:
4634         self.op.snode = self._ExpandNode(self.op.snode)
4635         nodelist.append(self.op.snode)
4636       self.needed_locks[locking.LEVEL_NODE] = nodelist
4637
4638     # in case of import lock the source node too
4639     if self.op.mode == constants.INSTANCE_IMPORT:
4640       src_node = getattr(self.op, "src_node", None)
4641       src_path = getattr(self.op, "src_path", None)
4642
4643       if src_path is None:
4644         self.op.src_path = src_path = self.op.instance_name
4645
4646       if src_node is None:
4647         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4648         self.op.src_node = None
4649         if os.path.isabs(src_path):
4650           raise errors.OpPrereqError("Importing an instance from an absolute"
4651                                      " path requires a source node option.")
4652       else:
4653         self.op.src_node = src_node = self._ExpandNode(src_node)
4654         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4655           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4656         if not os.path.isabs(src_path):
4657           self.op.src_path = src_path = \
4658             os.path.join(constants.EXPORT_DIR, src_path)
4659
4660     else: # INSTANCE_CREATE
4661       if getattr(self.op, "os_type", None) is None:
4662         raise errors.OpPrereqError("No guest OS specified")
4663
4664   def _RunAllocator(self):
4665     """Run the allocator based on input opcode.
4666
4667     """
4668     nics = [n.ToDict() for n in self.nics]
4669     ial = IAllocator(self,
4670                      mode=constants.IALLOCATOR_MODE_ALLOC,
4671                      name=self.op.instance_name,
4672                      disk_template=self.op.disk_template,
4673                      tags=[],
4674                      os=self.op.os_type,
4675                      vcpus=self.be_full[constants.BE_VCPUS],
4676                      mem_size=self.be_full[constants.BE_MEMORY],
4677                      disks=self.disks,
4678                      nics=nics,
4679                      hypervisor=self.op.hypervisor,
4680                      )
4681
4682     ial.Run(self.op.iallocator)
4683
4684     if not ial.success:
4685       raise errors.OpPrereqError("Can't compute nodes using"
4686                                  " iallocator '%s': %s" % (self.op.iallocator,
4687                                                            ial.info))
4688     if len(ial.nodes) != ial.required_nodes:
4689       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4690                                  " of nodes (%s), required %s" %
4691                                  (self.op.iallocator, len(ial.nodes),
4692                                   ial.required_nodes))
4693     self.op.pnode = ial.nodes[0]
4694     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4695                  self.op.instance_name, self.op.iallocator,
4696                  ", ".join(ial.nodes))
4697     if ial.required_nodes == 2:
4698       self.op.snode = ial.nodes[1]
4699
4700   def BuildHooksEnv(self):
4701     """Build hooks env.
4702
4703     This runs on master, primary and secondary nodes of the instance.
4704
4705     """
4706     env = {
4707       "ADD_MODE": self.op.mode,
4708       }
4709     if self.op.mode == constants.INSTANCE_IMPORT:
4710       env["SRC_NODE"] = self.op.src_node
4711       env["SRC_PATH"] = self.op.src_path
4712       env["SRC_IMAGES"] = self.src_images
4713
4714     env.update(_BuildInstanceHookEnv(
4715       name=self.op.instance_name,
4716       primary_node=self.op.pnode,
4717       secondary_nodes=self.secondaries,
4718       status=self.op.start,
4719       os_type=self.op.os_type,
4720       memory=self.be_full[constants.BE_MEMORY],
4721       vcpus=self.be_full[constants.BE_VCPUS],
4722       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4723       disk_template=self.op.disk_template,
4724       disks=[(d["size"], d["mode"]) for d in self.disks],
4725       bep=self.be_full,
4726       hvp=self.hv_full,
4727       hypervisor_name=self.op.hypervisor,
4728     ))
4729
4730     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4731           self.secondaries)
4732     return env, nl, nl
4733
4734
4735   def CheckPrereq(self):
4736     """Check prerequisites.
4737
4738     """
4739     if (not self.cfg.GetVGName() and
4740         self.op.disk_template not in constants.DTS_NOT_LVM):
4741       raise errors.OpPrereqError("Cluster does not support lvm-based"
4742                                  " instances")
4743
4744     if self.op.mode == constants.INSTANCE_IMPORT:
4745       src_node = self.op.src_node
4746       src_path = self.op.src_path
4747
4748       if src_node is None:
4749         exp_list = self.rpc.call_export_list(
4750           self.acquired_locks[locking.LEVEL_NODE])
4751         found = False
4752         for node in exp_list:
4753           if not exp_list[node].failed and src_path in exp_list[node].data:
4754             found = True
4755             self.op.src_node = src_node = node
4756             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4757                                                        src_path)
4758             break
4759         if not found:
4760           raise errors.OpPrereqError("No export found for relative path %s" %
4761                                       src_path)
4762
4763       _CheckNodeOnline(self, src_node)
4764       result = self.rpc.call_export_info(src_node, src_path)
4765       result.Raise()
4766       if not result.data:
4767         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4768
4769       export_info = result.data
4770       if not export_info.has_section(constants.INISECT_EXP):
4771         raise errors.ProgrammerError("Corrupted export config")
4772
4773       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4774       if (int(ei_version) != constants.EXPORT_VERSION):
4775         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4776                                    (ei_version, constants.EXPORT_VERSION))
4777
4778       # Check that the new instance doesn't have less disks than the export
4779       instance_disks = len(self.disks)
4780       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4781       if instance_disks < export_disks:
4782         raise errors.OpPrereqError("Not enough disks to import."
4783                                    " (instance: %d, export: %d)" %
4784                                    (instance_disks, export_disks))
4785
4786       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4787       disk_images = []
4788       for idx in range(export_disks):
4789         option = 'disk%d_dump' % idx
4790         if export_info.has_option(constants.INISECT_INS, option):
4791           # FIXME: are the old os-es, disk sizes, etc. useful?
4792           export_name = export_info.get(constants.INISECT_INS, option)
4793           image = os.path.join(src_path, export_name)
4794           disk_images.append(image)
4795         else:
4796           disk_images.append(False)
4797
4798       self.src_images = disk_images
4799
4800       old_name = export_info.get(constants.INISECT_INS, 'name')
4801       # FIXME: int() here could throw a ValueError on broken exports
4802       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4803       if self.op.instance_name == old_name:
4804         for idx, nic in enumerate(self.nics):
4805           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4806             nic_mac_ini = 'nic%d_mac' % idx
4807             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4808
4809     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4810     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4811     if self.op.start and not self.op.ip_check:
4812       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4813                                  " adding an instance in start mode")
4814
4815     if self.op.ip_check:
4816       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4817         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4818                                    (self.check_ip, self.op.instance_name))
4819
4820     #### mac address generation
4821     # By generating here the mac address both the allocator and the hooks get
4822     # the real final mac address rather than the 'auto' or 'generate' value.
4823     # There is a race condition between the generation and the instance object
4824     # creation, which means that we know the mac is valid now, but we're not
4825     # sure it will be when we actually add the instance. If things go bad
4826     # adding the instance will abort because of a duplicate mac, and the
4827     # creation job will fail.
4828     for nic in self.nics:
4829       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4830         nic.mac = self.cfg.GenerateMAC()
4831
4832     #### allocator run
4833
4834     if self.op.iallocator is not None:
4835       self._RunAllocator()
4836
4837     #### node related checks
4838
4839     # check primary node
4840     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4841     assert self.pnode is not None, \
4842       "Cannot retrieve locked node %s" % self.op.pnode
4843     if pnode.offline:
4844       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4845                                  pnode.name)
4846     if pnode.drained:
4847       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4848                                  pnode.name)
4849
4850     self.secondaries = []
4851
4852     # mirror node verification
4853     if self.op.disk_template in constants.DTS_NET_MIRROR:
4854       if self.op.snode is None:
4855         raise errors.OpPrereqError("The networked disk templates need"
4856                                    " a mirror node")
4857       if self.op.snode == pnode.name:
4858         raise errors.OpPrereqError("The secondary node cannot be"
4859                                    " the primary node.")
4860       _CheckNodeOnline(self, self.op.snode)
4861       _CheckNodeNotDrained(self, self.op.snode)
4862       self.secondaries.append(self.op.snode)
4863
4864     nodenames = [pnode.name] + self.secondaries
4865
4866     req_size = _ComputeDiskSize(self.op.disk_template,
4867                                 self.disks)
4868
4869     # Check lv size requirements
4870     if req_size is not None:
4871       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4872                                          self.op.hypervisor)
4873       for node in nodenames:
4874         info = nodeinfo[node]
4875         info.Raise()
4876         info = info.data
4877         if not info:
4878           raise errors.OpPrereqError("Cannot get current information"
4879                                      " from node '%s'" % node)
4880         vg_free = info.get('vg_free', None)
4881         if not isinstance(vg_free, int):
4882           raise errors.OpPrereqError("Can't compute free disk space on"
4883                                      " node %s" % node)
4884         if req_size > info['vg_free']:
4885           raise errors.OpPrereqError("Not enough disk space on target node %s."
4886                                      " %d MB available, %d MB required" %
4887                                      (node, info['vg_free'], req_size))
4888
4889     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4890
4891     # os verification
4892     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4893     result.Raise()
4894     if not isinstance(result.data, objects.OS) or not result.data:
4895       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4896                                  " primary node"  % self.op.os_type)
4897
4898     # bridge check on primary node
4899     bridges = [n.bridge for n in self.nics]
4900     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4901     result.Raise()
4902     if not result.data:
4903       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4904                                  " exist on destination node '%s'" %
4905                                  (",".join(bridges), pnode.name))
4906
4907     # memory check on primary node
4908     if self.op.start:
4909       _CheckNodeFreeMemory(self, self.pnode.name,
4910                            "creating instance %s" % self.op.instance_name,
4911                            self.be_full[constants.BE_MEMORY],
4912                            self.op.hypervisor)
4913
4914   def Exec(self, feedback_fn):
4915     """Create and add the instance to the cluster.
4916
4917     """
4918     instance = self.op.instance_name
4919     pnode_name = self.pnode.name
4920
4921     ht_kind = self.op.hypervisor
4922     if ht_kind in constants.HTS_REQ_PORT:
4923       network_port = self.cfg.AllocatePort()
4924     else:
4925       network_port = None
4926
4927     ##if self.op.vnc_bind_address is None:
4928     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4929
4930     # this is needed because os.path.join does not accept None arguments
4931     if self.op.file_storage_dir is None:
4932       string_file_storage_dir = ""
4933     else:
4934       string_file_storage_dir = self.op.file_storage_dir
4935
4936     # build the full file storage dir path
4937     file_storage_dir = os.path.normpath(os.path.join(
4938                                         self.cfg.GetFileStorageDir(),
4939                                         string_file_storage_dir, instance))
4940
4941
4942     disks = _GenerateDiskTemplate(self,
4943                                   self.op.disk_template,
4944                                   instance, pnode_name,
4945                                   self.secondaries,
4946                                   self.disks,
4947                                   file_storage_dir,
4948                                   self.op.file_driver,
4949                                   0)
4950
4951     iobj = objects.Instance(name=instance, os=self.op.os_type,
4952                             primary_node=pnode_name,
4953                             nics=self.nics, disks=disks,
4954                             disk_template=self.op.disk_template,
4955                             admin_up=False,
4956                             network_port=network_port,
4957                             beparams=self.op.beparams,
4958                             hvparams=self.op.hvparams,
4959                             hypervisor=self.op.hypervisor,
4960                             )
4961
4962     feedback_fn("* creating instance disks...")
4963     try:
4964       _CreateDisks(self, iobj)
4965     except errors.OpExecError:
4966       self.LogWarning("Device creation failed, reverting...")
4967       try:
4968         _RemoveDisks(self, iobj)
4969       finally:
4970         self.cfg.ReleaseDRBDMinors(instance)
4971         raise
4972
4973     feedback_fn("adding instance %s to cluster config" % instance)
4974
4975     self.cfg.AddInstance(iobj)
4976     # Declare that we don't want to remove the instance lock anymore, as we've
4977     # added the instance to the config
4978     del self.remove_locks[locking.LEVEL_INSTANCE]
4979     # Unlock all the nodes
4980     if self.op.mode == constants.INSTANCE_IMPORT:
4981       nodes_keep = [self.op.src_node]
4982       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4983                        if node != self.op.src_node]
4984       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4985       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4986     else:
4987       self.context.glm.release(locking.LEVEL_NODE)
4988       del self.acquired_locks[locking.LEVEL_NODE]
4989
4990     if self.op.wait_for_sync:
4991       disk_abort = not _WaitForSync(self, iobj)
4992     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4993       # make sure the disks are not degraded (still sync-ing is ok)
4994       time.sleep(15)
4995       feedback_fn("* checking mirrors status")
4996       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4997     else:
4998       disk_abort = False
4999
5000     if disk_abort:
5001       _RemoveDisks(self, iobj)
5002       self.cfg.RemoveInstance(iobj.name)
5003       # Make sure the instance lock gets removed
5004       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5005       raise errors.OpExecError("There are some degraded disks for"
5006                                " this instance")
5007
5008     feedback_fn("creating os for instance %s on node %s" %
5009                 (instance, pnode_name))
5010
5011     if iobj.disk_template != constants.DT_DISKLESS:
5012       if self.op.mode == constants.INSTANCE_CREATE:
5013         feedback_fn("* running the instance OS create scripts...")
5014         result = self.rpc.call_instance_os_add(pnode_name, iobj)
5015         msg = result.RemoteFailMsg()
5016         if msg:
5017           raise errors.OpExecError("Could not add os for instance %s"
5018                                    " on node %s: %s" %
5019                                    (instance, pnode_name, msg))
5020
5021       elif self.op.mode == constants.INSTANCE_IMPORT:
5022         feedback_fn("* running the instance OS import scripts...")
5023         src_node = self.op.src_node
5024         src_images = self.src_images
5025         cluster_name = self.cfg.GetClusterName()
5026         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5027                                                          src_node, src_images,
5028                                                          cluster_name)
5029         import_result.Raise()
5030         for idx, result in enumerate(import_result.data):
5031           if not result:
5032             self.LogWarning("Could not import the image %s for instance"
5033                             " %s, disk %d, on node %s" %
5034                             (src_images[idx], instance, idx, pnode_name))
5035       else:
5036         # also checked in the prereq part
5037         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5038                                      % self.op.mode)
5039
5040     if self.op.start:
5041       iobj.admin_up = True
5042       self.cfg.Update(iobj)
5043       logging.info("Starting instance %s on node %s", instance, pnode_name)
5044       feedback_fn("* starting instance...")
5045       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5046       msg = result.RemoteFailMsg()
5047       if msg:
5048         raise errors.OpExecError("Could not start instance: %s" % msg)
5049
5050
5051 class LUConnectConsole(NoHooksLU):
5052   """Connect to an instance's console.
5053
5054   This is somewhat special in that it returns the command line that
5055   you need to run on the master node in order to connect to the
5056   console.
5057
5058   """
5059   _OP_REQP = ["instance_name"]
5060   REQ_BGL = False
5061
5062   def ExpandNames(self):
5063     self._ExpandAndLockInstance()
5064
5065   def CheckPrereq(self):
5066     """Check prerequisites.
5067
5068     This checks that the instance is in the cluster.
5069
5070     """
5071     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5072     assert self.instance is not None, \
5073       "Cannot retrieve locked instance %s" % self.op.instance_name
5074     _CheckNodeOnline(self, self.instance.primary_node)
5075
5076   def Exec(self, feedback_fn):
5077     """Connect to the console of an instance
5078
5079     """
5080     instance = self.instance
5081     node = instance.primary_node
5082
5083     node_insts = self.rpc.call_instance_list([node],
5084                                              [instance.hypervisor])[node]
5085     node_insts.Raise()
5086
5087     if instance.name not in node_insts.data:
5088       raise errors.OpExecError("Instance %s is not running." % instance.name)
5089
5090     logging.debug("Connecting to console of %s on %s", instance.name, node)
5091
5092     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5093     cluster = self.cfg.GetClusterInfo()
5094     # beparams and hvparams are passed separately, to avoid editing the
5095     # instance and then saving the defaults in the instance itself.
5096     hvparams = cluster.FillHV(instance)
5097     beparams = cluster.FillBE(instance)
5098     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5099
5100     # build ssh cmdline
5101     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5102
5103
5104 class LUReplaceDisks(LogicalUnit):
5105   """Replace the disks of an instance.
5106
5107   """
5108   HPATH = "mirrors-replace"
5109   HTYPE = constants.HTYPE_INSTANCE
5110   _OP_REQP = ["instance_name", "mode", "disks"]
5111   REQ_BGL = False
5112
5113   def CheckArguments(self):
5114     if not hasattr(self.op, "remote_node"):
5115       self.op.remote_node = None
5116     if not hasattr(self.op, "iallocator"):
5117       self.op.iallocator = None
5118
5119     # check for valid parameter combination
5120     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5121     if self.op.mode == constants.REPLACE_DISK_CHG:
5122       if cnt == 2:
5123         raise errors.OpPrereqError("When changing the secondary either an"
5124                                    " iallocator script must be used or the"
5125                                    " new node given")
5126       elif cnt == 0:
5127         raise errors.OpPrereqError("Give either the iallocator or the new"
5128                                    " secondary, not both")
5129     else: # not replacing the secondary
5130       if cnt != 2:
5131         raise errors.OpPrereqError("The iallocator and new node options can"
5132                                    " be used only when changing the"
5133                                    " secondary node")
5134
5135   def ExpandNames(self):
5136     self._ExpandAndLockInstance()
5137
5138     if self.op.iallocator is not None:
5139       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5140     elif self.op.remote_node is not None:
5141       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5142       if remote_node is None:
5143         raise errors.OpPrereqError("Node '%s' not known" %
5144                                    self.op.remote_node)
5145       self.op.remote_node = remote_node
5146       # Warning: do not remove the locking of the new secondary here
5147       # unless DRBD8.AddChildren is changed to work in parallel;
5148       # currently it doesn't since parallel invocations of
5149       # FindUnusedMinor will conflict
5150       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5151       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5152     else:
5153       self.needed_locks[locking.LEVEL_NODE] = []
5154       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5155
5156   def DeclareLocks(self, level):
5157     # If we're not already locking all nodes in the set we have to declare the
5158     # instance's primary/secondary nodes.
5159     if (level == locking.LEVEL_NODE and
5160         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5161       self._LockInstancesNodes()
5162
5163   def _RunAllocator(self):
5164     """Compute a new secondary node using an IAllocator.
5165
5166     """
5167     ial = IAllocator(self,
5168                      mode=constants.IALLOCATOR_MODE_RELOC,
5169                      name=self.op.instance_name,
5170                      relocate_from=[self.sec_node])
5171
5172     ial.Run(self.op.iallocator)
5173
5174     if not ial.success:
5175       raise errors.OpPrereqError("Can't compute nodes using"
5176                                  " iallocator '%s': %s" % (self.op.iallocator,
5177                                                            ial.info))
5178     if len(ial.nodes) != ial.required_nodes:
5179       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5180                                  " of nodes (%s), required %s" %
5181                                  (len(ial.nodes), ial.required_nodes))
5182     self.op.remote_node = ial.nodes[0]
5183     self.LogInfo("Selected new secondary for the instance: %s",
5184                  self.op.remote_node)
5185
5186   def BuildHooksEnv(self):
5187     """Build hooks env.
5188
5189     This runs on the master, the primary and all the secondaries.
5190
5191     """
5192     env = {
5193       "MODE": self.op.mode,
5194       "NEW_SECONDARY": self.op.remote_node,
5195       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5196       }
5197     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5198     nl = [
5199       self.cfg.GetMasterNode(),
5200       self.instance.primary_node,
5201       ]
5202     if self.op.remote_node is not None:
5203       nl.append(self.op.remote_node)
5204     return env, nl, nl
5205
5206   def CheckPrereq(self):
5207     """Check prerequisites.
5208
5209     This checks that the instance is in the cluster.
5210
5211     """
5212     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5213     assert instance is not None, \
5214       "Cannot retrieve locked instance %s" % self.op.instance_name
5215     self.instance = instance
5216
5217     if instance.disk_template != constants.DT_DRBD8:
5218       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5219                                  " instances")
5220
5221     if len(instance.secondary_nodes) != 1:
5222       raise errors.OpPrereqError("The instance has a strange layout,"
5223                                  " expected one secondary but found %d" %
5224                                  len(instance.secondary_nodes))
5225
5226     self.sec_node = instance.secondary_nodes[0]
5227
5228     if self.op.iallocator is not None:
5229       self._RunAllocator()
5230
5231     remote_node = self.op.remote_node
5232     if remote_node is not None:
5233       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5234       assert self.remote_node_info is not None, \
5235         "Cannot retrieve locked node %s" % remote_node
5236     else:
5237       self.remote_node_info = None
5238     if remote_node == instance.primary_node:
5239       raise errors.OpPrereqError("The specified node is the primary node of"
5240                                  " the instance.")
5241     elif remote_node == self.sec_node:
5242       raise errors.OpPrereqError("The specified node is already the"
5243                                  " secondary node of the instance.")
5244
5245     if self.op.mode == constants.REPLACE_DISK_PRI:
5246       n1 = self.tgt_node = instance.primary_node
5247       n2 = self.oth_node = self.sec_node
5248     elif self.op.mode == constants.REPLACE_DISK_SEC:
5249       n1 = self.tgt_node = self.sec_node
5250       n2 = self.oth_node = instance.primary_node
5251     elif self.op.mode == constants.REPLACE_DISK_CHG:
5252       n1 = self.new_node = remote_node
5253       n2 = self.oth_node = instance.primary_node
5254       self.tgt_node = self.sec_node
5255       _CheckNodeNotDrained(self, remote_node)
5256     else:
5257       raise errors.ProgrammerError("Unhandled disk replace mode")
5258
5259     _CheckNodeOnline(self, n1)
5260     _CheckNodeOnline(self, n2)
5261
5262     if not self.op.disks:
5263       self.op.disks = range(len(instance.disks))
5264
5265     for disk_idx in self.op.disks:
5266       instance.FindDisk(disk_idx)
5267
5268   def _ExecD8DiskOnly(self, feedback_fn):
5269     """Replace a disk on the primary or secondary for dbrd8.
5270
5271     The algorithm for replace is quite complicated:
5272
5273       1. for each disk to be replaced:
5274
5275         1. create new LVs on the target node with unique names
5276         1. detach old LVs from the drbd device
5277         1. rename old LVs to name_replaced.<time_t>
5278         1. rename new LVs to old LVs
5279         1. attach the new LVs (with the old names now) to the drbd device
5280
5281       1. wait for sync across all devices
5282
5283       1. for each modified disk:
5284
5285         1. remove old LVs (which have the name name_replaces.<time_t>)
5286
5287     Failures are not very well handled.
5288
5289     """
5290     steps_total = 6
5291     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5292     instance = self.instance
5293     iv_names = {}
5294     vgname = self.cfg.GetVGName()
5295     # start of work
5296     cfg = self.cfg
5297     tgt_node = self.tgt_node
5298     oth_node = self.oth_node
5299
5300     # Step: check device activation
5301     self.proc.LogStep(1, steps_total, "check device existence")
5302     info("checking volume groups")
5303     my_vg = cfg.GetVGName()
5304     results = self.rpc.call_vg_list([oth_node, tgt_node])
5305     if not results:
5306       raise errors.OpExecError("Can't list volume groups on the nodes")
5307     for node in oth_node, tgt_node:
5308       res = results[node]
5309       if res.failed or not res.data or my_vg not in res.data:
5310         raise errors.OpExecError("Volume group '%s' not found on %s" %
5311                                  (my_vg, node))
5312     for idx, dev in enumerate(instance.disks):
5313       if idx not in self.op.disks:
5314         continue
5315       for node in tgt_node, oth_node:
5316         info("checking disk/%d on %s" % (idx, node))
5317         cfg.SetDiskID(dev, node)
5318         result = self.rpc.call_blockdev_find(node, dev)
5319         msg = result.RemoteFailMsg()
5320         if not msg and not result.payload:
5321           msg = "disk not found"
5322         if msg:
5323           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5324                                    (idx, node, msg))
5325
5326     # Step: check other node consistency
5327     self.proc.LogStep(2, steps_total, "check peer consistency")
5328     for idx, dev in enumerate(instance.disks):
5329       if idx not in self.op.disks:
5330         continue
5331       info("checking disk/%d consistency on %s" % (idx, oth_node))
5332       if not _CheckDiskConsistency(self, dev, oth_node,
5333                                    oth_node==instance.primary_node):
5334         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5335                                  " to replace disks on this node (%s)" %
5336                                  (oth_node, tgt_node))
5337
5338     # Step: create new storage
5339     self.proc.LogStep(3, steps_total, "allocate new storage")
5340     for idx, dev in enumerate(instance.disks):
5341       if idx not in self.op.disks:
5342         continue
5343       size = dev.size
5344       cfg.SetDiskID(dev, tgt_node)
5345       lv_names = [".disk%d_%s" % (idx, suf)
5346                   for suf in ["data", "meta"]]
5347       names = _GenerateUniqueNames(self, lv_names)
5348       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5349                              logical_id=(vgname, names[0]))
5350       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5351                              logical_id=(vgname, names[1]))
5352       new_lvs = [lv_data, lv_meta]
5353       old_lvs = dev.children
5354       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5355       info("creating new local storage on %s for %s" %
5356            (tgt_node, dev.iv_name))
5357       # we pass force_create=True to force the LVM creation
5358       for new_lv in new_lvs:
5359         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5360                         _GetInstanceInfoText(instance), False)
5361
5362     # Step: for each lv, detach+rename*2+attach
5363     self.proc.LogStep(4, steps_total, "change drbd configuration")
5364     for dev, old_lvs, new_lvs in iv_names.itervalues():
5365       info("detaching %s drbd from local storage" % dev.iv_name)
5366       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5367       result.Raise()
5368       if not result.data:
5369         raise errors.OpExecError("Can't detach drbd from local storage on node"
5370                                  " %s for device %s" % (tgt_node, dev.iv_name))
5371       #dev.children = []
5372       #cfg.Update(instance)
5373
5374       # ok, we created the new LVs, so now we know we have the needed
5375       # storage; as such, we proceed on the target node to rename
5376       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5377       # using the assumption that logical_id == physical_id (which in
5378       # turn is the unique_id on that node)
5379
5380       # FIXME(iustin): use a better name for the replaced LVs
5381       temp_suffix = int(time.time())
5382       ren_fn = lambda d, suff: (d.physical_id[0],
5383                                 d.physical_id[1] + "_replaced-%s" % suff)
5384       # build the rename list based on what LVs exist on the node
5385       rlist = []
5386       for to_ren in old_lvs:
5387         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5388         if not result.RemoteFailMsg() and result.payload:
5389           # device exists
5390           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5391
5392       info("renaming the old LVs on the target node")
5393       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5394       result.Raise()
5395       if not result.data:
5396         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5397       # now we rename the new LVs to the old LVs
5398       info("renaming the new LVs on the target node")
5399       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5400       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5401       result.Raise()
5402       if not result.data:
5403         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5404
5405       for old, new in zip(old_lvs, new_lvs):
5406         new.logical_id = old.logical_id
5407         cfg.SetDiskID(new, tgt_node)
5408
5409       for disk in old_lvs:
5410         disk.logical_id = ren_fn(disk, temp_suffix)
5411         cfg.SetDiskID(disk, tgt_node)
5412
5413       # now that the new lvs have the old name, we can add them to the device
5414       info("adding new mirror component on %s" % tgt_node)
5415       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5416       if result.failed or not result.data:
5417         for new_lv in new_lvs:
5418           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5419           if msg:
5420             warning("Can't rollback device %s: %s", dev, msg,
5421                     hint="cleanup manually the unused logical volumes")
5422         raise errors.OpExecError("Can't add local storage to drbd")
5423
5424       dev.children = new_lvs
5425       cfg.Update(instance)
5426
5427     # Step: wait for sync
5428
5429     # this can fail as the old devices are degraded and _WaitForSync
5430     # does a combined result over all disks, so we don't check its
5431     # return value
5432     self.proc.LogStep(5, steps_total, "sync devices")
5433     _WaitForSync(self, instance, unlock=True)
5434
5435     # so check manually all the devices
5436     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5437       cfg.SetDiskID(dev, instance.primary_node)
5438       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5439       msg = result.RemoteFailMsg()
5440       if not msg and not result.payload:
5441         msg = "disk not found"
5442       if msg:
5443         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5444                                  (name, msg))
5445       if result.payload[5]:
5446         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5447
5448     # Step: remove old storage
5449     self.proc.LogStep(6, steps_total, "removing old storage")
5450     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5451       info("remove logical volumes for %s" % name)
5452       for lv in old_lvs:
5453         cfg.SetDiskID(lv, tgt_node)
5454         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5455         if msg:
5456           warning("Can't remove old LV: %s" % msg,
5457                   hint="manually remove unused LVs")
5458           continue
5459
5460   def _ExecD8Secondary(self, feedback_fn):
5461     """Replace the secondary node for drbd8.
5462
5463     The algorithm for replace is quite complicated:
5464       - for all disks of the instance:
5465         - create new LVs on the new node with same names
5466         - shutdown the drbd device on the old secondary
5467         - disconnect the drbd network on the primary
5468         - create the drbd device on the new secondary
5469         - network attach the drbd on the primary, using an artifice:
5470           the drbd code for Attach() will connect to the network if it
5471           finds a device which is connected to the good local disks but
5472           not network enabled
5473       - wait for sync across all devices
5474       - remove all disks from the old secondary
5475
5476     Failures are not very well handled.
5477
5478     """
5479     steps_total = 6
5480     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5481     instance = self.instance
5482     iv_names = {}
5483     # start of work
5484     cfg = self.cfg
5485     old_node = self.tgt_node
5486     new_node = self.new_node
5487     pri_node = instance.primary_node
5488     nodes_ip = {
5489       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5490       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5491       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5492       }
5493
5494     # Step: check device activation
5495     self.proc.LogStep(1, steps_total, "check device existence")
5496     info("checking volume groups")
5497     my_vg = cfg.GetVGName()
5498     results = self.rpc.call_vg_list([pri_node, new_node])
5499     for node in pri_node, new_node:
5500       res = results[node]
5501       if res.failed or not res.data or my_vg not in res.data:
5502         raise errors.OpExecError("Volume group '%s' not found on %s" %
5503                                  (my_vg, node))
5504     for idx, dev in enumerate(instance.disks):
5505       if idx not in self.op.disks:
5506         continue
5507       info("checking disk/%d on %s" % (idx, pri_node))
5508       cfg.SetDiskID(dev, pri_node)
5509       result = self.rpc.call_blockdev_find(pri_node, dev)
5510       msg = result.RemoteFailMsg()
5511       if not msg and not result.payload:
5512         msg = "disk not found"
5513       if msg:
5514         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5515                                  (idx, pri_node, msg))
5516
5517     # Step: check other node consistency
5518     self.proc.LogStep(2, steps_total, "check peer consistency")
5519     for idx, dev in enumerate(instance.disks):
5520       if idx not in self.op.disks:
5521         continue
5522       info("checking disk/%d consistency on %s" % (idx, pri_node))
5523       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5524         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5525                                  " unsafe to replace the secondary" %
5526                                  pri_node)
5527
5528     # Step: create new storage
5529     self.proc.LogStep(3, steps_total, "allocate new storage")
5530     for idx, dev in enumerate(instance.disks):
5531       info("adding new local storage on %s for disk/%d" %
5532            (new_node, idx))
5533       # we pass force_create=True to force LVM creation
5534       for new_lv in dev.children:
5535         _CreateBlockDev(self, new_node, instance, new_lv, True,
5536                         _GetInstanceInfoText(instance), False)
5537
5538     # Step 4: dbrd minors and drbd setups changes
5539     # after this, we must manually remove the drbd minors on both the
5540     # error and the success paths
5541     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5542                                    instance.name)
5543     logging.debug("Allocated minors %s" % (minors,))
5544     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5545     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5546       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5547       # create new devices on new_node; note that we create two IDs:
5548       # one without port, so the drbd will be activated without
5549       # networking information on the new node at this stage, and one
5550       # with network, for the latter activation in step 4
5551       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5552       if pri_node == o_node1:
5553         p_minor = o_minor1
5554       else:
5555         p_minor = o_minor2
5556
5557       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5558       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5559
5560       iv_names[idx] = (dev, dev.children, new_net_id)
5561       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5562                     new_net_id)
5563       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5564                               logical_id=new_alone_id,
5565                               children=dev.children,
5566                               size=dev.size)
5567       try:
5568         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5569                               _GetInstanceInfoText(instance), False)
5570       except errors.GenericError:
5571         self.cfg.ReleaseDRBDMinors(instance.name)
5572         raise
5573
5574     for idx, dev in enumerate(instance.disks):
5575       # we have new devices, shutdown the drbd on the old secondary
5576       info("shutting down drbd for disk/%d on old node" % idx)
5577       cfg.SetDiskID(dev, old_node)
5578       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5579       if msg:
5580         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5581                 (idx, msg),
5582                 hint="Please cleanup this device manually as soon as possible")
5583
5584     info("detaching primary drbds from the network (=> standalone)")
5585     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5586                                                instance.disks)[pri_node]
5587
5588     msg = result.RemoteFailMsg()
5589     if msg:
5590       # detaches didn't succeed (unlikely)
5591       self.cfg.ReleaseDRBDMinors(instance.name)
5592       raise errors.OpExecError("Can't detach the disks from the network on"
5593                                " old node: %s" % (msg,))
5594
5595     # if we managed to detach at least one, we update all the disks of
5596     # the instance to point to the new secondary
5597     info("updating instance configuration")
5598     for dev, _, new_logical_id in iv_names.itervalues():
5599       dev.logical_id = new_logical_id
5600       cfg.SetDiskID(dev, pri_node)
5601     cfg.Update(instance)
5602
5603     # and now perform the drbd attach
5604     info("attaching primary drbds to new secondary (standalone => connected)")
5605     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5606                                            instance.disks, instance.name,
5607                                            False)
5608     for to_node, to_result in result.items():
5609       msg = to_result.RemoteFailMsg()
5610       if msg:
5611         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5612                 hint="please do a gnt-instance info to see the"
5613                 " status of disks")
5614
5615     # this can fail as the old devices are degraded and _WaitForSync
5616     # does a combined result over all disks, so we don't check its
5617     # return value
5618     self.proc.LogStep(5, steps_total, "sync devices")
5619     _WaitForSync(self, instance, unlock=True)
5620
5621     # so check manually all the devices
5622     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5623       cfg.SetDiskID(dev, pri_node)
5624       result = self.rpc.call_blockdev_find(pri_node, dev)
5625       msg = result.RemoteFailMsg()
5626       if not msg and not result.payload:
5627         msg = "disk not found"
5628       if msg:
5629         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5630                                  (idx, msg))
5631       if result.payload[5]:
5632         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5633
5634     self.proc.LogStep(6, steps_total, "removing old storage")
5635     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5636       info("remove logical volumes for disk/%d" % idx)
5637       for lv in old_lvs:
5638         cfg.SetDiskID(lv, old_node)
5639         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5640         if msg:
5641           warning("Can't remove LV on old secondary: %s", msg,
5642                   hint="Cleanup stale volumes by hand")
5643
5644   def Exec(self, feedback_fn):
5645     """Execute disk replacement.
5646
5647     This dispatches the disk replacement to the appropriate handler.
5648
5649     """
5650     instance = self.instance
5651
5652     # Activate the instance disks if we're replacing them on a down instance
5653     if not instance.admin_up:
5654       _StartInstanceDisks(self, instance, True)
5655
5656     if self.op.mode == constants.REPLACE_DISK_CHG:
5657       fn = self._ExecD8Secondary
5658     else:
5659       fn = self._ExecD8DiskOnly
5660
5661     ret = fn(feedback_fn)
5662
5663     # Deactivate the instance disks if we're replacing them on a down instance
5664     if not instance.admin_up:
5665       _SafeShutdownInstanceDisks(self, instance)
5666
5667     return ret
5668
5669
5670 class LUGrowDisk(LogicalUnit):
5671   """Grow a disk of an instance.
5672
5673   """
5674   HPATH = "disk-grow"
5675   HTYPE = constants.HTYPE_INSTANCE
5676   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5677   REQ_BGL = False
5678
5679   def ExpandNames(self):
5680     self._ExpandAndLockInstance()
5681     self.needed_locks[locking.LEVEL_NODE] = []
5682     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5683
5684   def DeclareLocks(self, level):
5685     if level == locking.LEVEL_NODE:
5686       self._LockInstancesNodes()
5687
5688   def BuildHooksEnv(self):
5689     """Build hooks env.
5690
5691     This runs on the master, the primary and all the secondaries.
5692
5693     """
5694     env = {
5695       "DISK": self.op.disk,
5696       "AMOUNT": self.op.amount,
5697       }
5698     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5699     nl = [
5700       self.cfg.GetMasterNode(),
5701       self.instance.primary_node,
5702       ]
5703     return env, nl, nl
5704
5705   def CheckPrereq(self):
5706     """Check prerequisites.
5707
5708     This checks that the instance is in the cluster.
5709
5710     """
5711     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5712     assert instance is not None, \
5713       "Cannot retrieve locked instance %s" % self.op.instance_name
5714     nodenames = list(instance.all_nodes)
5715     for node in nodenames:
5716       _CheckNodeOnline(self, node)
5717
5718
5719     self.instance = instance
5720
5721     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5722       raise errors.OpPrereqError("Instance's disk layout does not support"
5723                                  " growing.")
5724
5725     self.disk = instance.FindDisk(self.op.disk)
5726
5727     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5728                                        instance.hypervisor)
5729     for node in nodenames:
5730       info = nodeinfo[node]
5731       if info.failed or not info.data:
5732         raise errors.OpPrereqError("Cannot get current information"
5733                                    " from node '%s'" % node)
5734       vg_free = info.data.get('vg_free', None)
5735       if not isinstance(vg_free, int):
5736         raise errors.OpPrereqError("Can't compute free disk space on"
5737                                    " node %s" % node)
5738       if self.op.amount > vg_free:
5739         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5740                                    " %d MiB available, %d MiB required" %
5741                                    (node, vg_free, self.op.amount))
5742
5743   def Exec(self, feedback_fn):
5744     """Execute disk grow.
5745
5746     """
5747     instance = self.instance
5748     disk = self.disk
5749     for node in instance.all_nodes:
5750       self.cfg.SetDiskID(disk, node)
5751       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5752       msg = result.RemoteFailMsg()
5753       if msg:
5754         raise errors.OpExecError("Grow request failed to node %s: %s" %
5755                                  (node, msg))
5756     disk.RecordGrow(self.op.amount)
5757     self.cfg.Update(instance)
5758     if self.op.wait_for_sync:
5759       disk_abort = not _WaitForSync(self, instance)
5760       if disk_abort:
5761         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5762                              " status.\nPlease check the instance.")
5763
5764
5765 class LUQueryInstanceData(NoHooksLU):
5766   """Query runtime instance data.
5767
5768   """
5769   _OP_REQP = ["instances", "static"]
5770   REQ_BGL = False
5771
5772   def ExpandNames(self):
5773     self.needed_locks = {}
5774     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5775
5776     if not isinstance(self.op.instances, list):
5777       raise errors.OpPrereqError("Invalid argument type 'instances'")
5778
5779     if self.op.instances:
5780       self.wanted_names = []
5781       for name in self.op.instances:
5782         full_name = self.cfg.ExpandInstanceName(name)
5783         if full_name is None:
5784           raise errors.OpPrereqError("Instance '%s' not known" % name)
5785         self.wanted_names.append(full_name)
5786       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5787     else:
5788       self.wanted_names = None
5789       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5790
5791     self.needed_locks[locking.LEVEL_NODE] = []
5792     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5793
5794   def DeclareLocks(self, level):
5795     if level == locking.LEVEL_NODE:
5796       self._LockInstancesNodes()
5797
5798   def CheckPrereq(self):
5799     """Check prerequisites.
5800
5801     This only checks the optional instance list against the existing names.
5802
5803     """
5804     if self.wanted_names is None:
5805       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5806
5807     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5808                              in self.wanted_names]
5809     return
5810
5811   def _ComputeDiskStatus(self, instance, snode, dev):
5812     """Compute block device status.
5813
5814     """
5815     static = self.op.static
5816     if not static:
5817       self.cfg.SetDiskID(dev, instance.primary_node)
5818       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5819       if dev_pstatus.offline:
5820         dev_pstatus = None
5821       else:
5822         msg = dev_pstatus.RemoteFailMsg()
5823         if msg:
5824           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5825                                    (instance.name, msg))
5826         dev_pstatus = dev_pstatus.payload
5827     else:
5828       dev_pstatus = None
5829
5830     if dev.dev_type in constants.LDS_DRBD:
5831       # we change the snode then (otherwise we use the one passed in)
5832       if dev.logical_id[0] == instance.primary_node:
5833         snode = dev.logical_id[1]
5834       else:
5835         snode = dev.logical_id[0]
5836
5837     if snode and not static:
5838       self.cfg.SetDiskID(dev, snode)
5839       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5840       if dev_sstatus.offline:
5841         dev_sstatus = None
5842       else:
5843         msg = dev_sstatus.RemoteFailMsg()
5844         if msg:
5845           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5846                                    (instance.name, msg))
5847         dev_sstatus = dev_sstatus.payload
5848     else:
5849       dev_sstatus = None
5850
5851     if dev.children:
5852       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5853                       for child in dev.children]
5854     else:
5855       dev_children = []
5856
5857     data = {
5858       "iv_name": dev.iv_name,
5859       "dev_type": dev.dev_type,
5860       "logical_id": dev.logical_id,
5861       "physical_id": dev.physical_id,
5862       "pstatus": dev_pstatus,
5863       "sstatus": dev_sstatus,
5864       "children": dev_children,
5865       "mode": dev.mode,
5866       "size": dev.size,
5867       }
5868
5869     return data
5870
5871   def Exec(self, feedback_fn):
5872     """Gather and return data"""
5873     result = {}
5874
5875     cluster = self.cfg.GetClusterInfo()
5876
5877     for instance in self.wanted_instances:
5878       if not self.op.static:
5879         remote_info = self.rpc.call_instance_info(instance.primary_node,
5880                                                   instance.name,
5881                                                   instance.hypervisor)
5882         remote_info.Raise()
5883         remote_info = remote_info.data
5884         if remote_info and "state" in remote_info:
5885           remote_state = "up"
5886         else:
5887           remote_state = "down"
5888       else:
5889         remote_state = None
5890       if instance.admin_up:
5891         config_state = "up"
5892       else:
5893         config_state = "down"
5894
5895       disks = [self._ComputeDiskStatus(instance, None, device)
5896                for device in instance.disks]
5897
5898       idict = {
5899         "name": instance.name,
5900         "config_state": config_state,
5901         "run_state": remote_state,
5902         "pnode": instance.primary_node,
5903         "snodes": instance.secondary_nodes,
5904         "os": instance.os,
5905         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5906         "disks": disks,
5907         "hypervisor": instance.hypervisor,
5908         "network_port": instance.network_port,
5909         "hv_instance": instance.hvparams,
5910         "hv_actual": cluster.FillHV(instance),
5911         "be_instance": instance.beparams,
5912         "be_actual": cluster.FillBE(instance),
5913         }
5914
5915       result[instance.name] = idict
5916
5917     return result
5918
5919
5920 class LUSetInstanceParams(LogicalUnit):
5921   """Modifies an instances's parameters.
5922
5923   """
5924   HPATH = "instance-modify"
5925   HTYPE = constants.HTYPE_INSTANCE
5926   _OP_REQP = ["instance_name"]
5927   REQ_BGL = False
5928
5929   def CheckArguments(self):
5930     if not hasattr(self.op, 'nics'):
5931       self.op.nics = []
5932     if not hasattr(self.op, 'disks'):
5933       self.op.disks = []
5934     if not hasattr(self.op, 'beparams'):
5935       self.op.beparams = {}
5936     if not hasattr(self.op, 'hvparams'):
5937       self.op.hvparams = {}
5938     self.op.force = getattr(self.op, "force", False)
5939     if not (self.op.nics or self.op.disks or
5940             self.op.hvparams or self.op.beparams):
5941       raise errors.OpPrereqError("No changes submitted")
5942
5943     # Disk validation
5944     disk_addremove = 0
5945     for disk_op, disk_dict in self.op.disks:
5946       if disk_op == constants.DDM_REMOVE:
5947         disk_addremove += 1
5948         continue
5949       elif disk_op == constants.DDM_ADD:
5950         disk_addremove += 1
5951       else:
5952         if not isinstance(disk_op, int):
5953           raise errors.OpPrereqError("Invalid disk index")
5954       if disk_op == constants.DDM_ADD:
5955         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5956         if mode not in constants.DISK_ACCESS_SET:
5957           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5958         size = disk_dict.get('size', None)
5959         if size is None:
5960           raise errors.OpPrereqError("Required disk parameter size missing")
5961         try:
5962           size = int(size)
5963         except ValueError, err:
5964           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5965                                      str(err))
5966         disk_dict['size'] = size
5967       else:
5968         # modification of disk
5969         if 'size' in disk_dict:
5970           raise errors.OpPrereqError("Disk size change not possible, use"
5971                                      " grow-disk")
5972
5973     if disk_addremove > 1:
5974       raise errors.OpPrereqError("Only one disk add or remove operation"
5975                                  " supported at a time")
5976
5977     # NIC validation
5978     nic_addremove = 0
5979     for nic_op, nic_dict in self.op.nics:
5980       if nic_op == constants.DDM_REMOVE:
5981         nic_addremove += 1
5982         continue
5983       elif nic_op == constants.DDM_ADD:
5984         nic_addremove += 1
5985       else:
5986         if not isinstance(nic_op, int):
5987           raise errors.OpPrereqError("Invalid nic index")
5988
5989       # nic_dict should be a dict
5990       nic_ip = nic_dict.get('ip', None)
5991       if nic_ip is not None:
5992         if nic_ip.lower() == constants.VALUE_NONE:
5993           nic_dict['ip'] = None
5994         else:
5995           if not utils.IsValidIP(nic_ip):
5996             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5997
5998       if nic_op == constants.DDM_ADD:
5999         nic_bridge = nic_dict.get('bridge', None)
6000         if nic_bridge is None:
6001           nic_dict['bridge'] = self.cfg.GetDefBridge()
6002         nic_mac = nic_dict.get('mac', None)
6003         if nic_mac is None:
6004           nic_dict['mac'] = constants.VALUE_AUTO
6005
6006       if 'mac' in nic_dict:
6007         nic_mac = nic_dict['mac']
6008         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6009           if not utils.IsValidMac(nic_mac):
6010             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6011         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6012           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6013                                      " modifying an existing nic")
6014
6015     if nic_addremove > 1:
6016       raise errors.OpPrereqError("Only one NIC add or remove operation"
6017                                  " supported at a time")
6018
6019   def ExpandNames(self):
6020     self._ExpandAndLockInstance()
6021     self.needed_locks[locking.LEVEL_NODE] = []
6022     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6023
6024   def DeclareLocks(self, level):
6025     if level == locking.LEVEL_NODE:
6026       self._LockInstancesNodes()
6027
6028   def BuildHooksEnv(self):
6029     """Build hooks env.
6030
6031     This runs on the master, primary and secondaries.
6032
6033     """
6034     args = dict()
6035     if constants.BE_MEMORY in self.be_new:
6036       args['memory'] = self.be_new[constants.BE_MEMORY]
6037     if constants.BE_VCPUS in self.be_new:
6038       args['vcpus'] = self.be_new[constants.BE_VCPUS]
6039     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6040     # information at all.
6041     if self.op.nics:
6042       args['nics'] = []
6043       nic_override = dict(self.op.nics)
6044       for idx, nic in enumerate(self.instance.nics):
6045         if idx in nic_override:
6046           this_nic_override = nic_override[idx]
6047         else:
6048           this_nic_override = {}
6049         if 'ip' in this_nic_override:
6050           ip = this_nic_override['ip']
6051         else:
6052           ip = nic.ip
6053         if 'bridge' in this_nic_override:
6054           bridge = this_nic_override['bridge']
6055         else:
6056           bridge = nic.bridge
6057         if 'mac' in this_nic_override:
6058           mac = this_nic_override['mac']
6059         else:
6060           mac = nic.mac
6061         args['nics'].append((ip, bridge, mac))
6062       if constants.DDM_ADD in nic_override:
6063         ip = nic_override[constants.DDM_ADD].get('ip', None)
6064         bridge = nic_override[constants.DDM_ADD]['bridge']
6065         mac = nic_override[constants.DDM_ADD]['mac']
6066         args['nics'].append((ip, bridge, mac))
6067       elif constants.DDM_REMOVE in nic_override:
6068         del args['nics'][-1]
6069
6070     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6071     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6072     return env, nl, nl
6073
6074   def CheckPrereq(self):
6075     """Check prerequisites.
6076
6077     This only checks the instance list against the existing names.
6078
6079     """
6080     self.force = self.op.force
6081
6082     # checking the new params on the primary/secondary nodes
6083
6084     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6085     assert self.instance is not None, \
6086       "Cannot retrieve locked instance %s" % self.op.instance_name
6087     pnode = instance.primary_node
6088     nodelist = list(instance.all_nodes)
6089
6090     # hvparams processing
6091     if self.op.hvparams:
6092       i_hvdict = copy.deepcopy(instance.hvparams)
6093       for key, val in self.op.hvparams.iteritems():
6094         if val == constants.VALUE_DEFAULT:
6095           try:
6096             del i_hvdict[key]
6097           except KeyError:
6098             pass
6099         else:
6100           i_hvdict[key] = val
6101       cluster = self.cfg.GetClusterInfo()
6102       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
6103       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
6104                                 i_hvdict)
6105       # local check
6106       hypervisor.GetHypervisor(
6107         instance.hypervisor).CheckParameterSyntax(hv_new)
6108       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6109       self.hv_new = hv_new # the new actual values
6110       self.hv_inst = i_hvdict # the new dict (without defaults)
6111     else:
6112       self.hv_new = self.hv_inst = {}
6113
6114     # beparams processing
6115     if self.op.beparams:
6116       i_bedict = copy.deepcopy(instance.beparams)
6117       for key, val in self.op.beparams.iteritems():
6118         if val == constants.VALUE_DEFAULT:
6119           try:
6120             del i_bedict[key]
6121           except KeyError:
6122             pass
6123         else:
6124           i_bedict[key] = val
6125       cluster = self.cfg.GetClusterInfo()
6126       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
6127       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
6128                                 i_bedict)
6129       self.be_new = be_new # the new actual values
6130       self.be_inst = i_bedict # the new dict (without defaults)
6131     else:
6132       self.be_new = self.be_inst = {}
6133
6134     self.warn = []
6135
6136     if constants.BE_MEMORY in self.op.beparams and not self.force:
6137       mem_check_list = [pnode]
6138       if be_new[constants.BE_AUTO_BALANCE]:
6139         # either we changed auto_balance to yes or it was from before
6140         mem_check_list.extend(instance.secondary_nodes)
6141       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6142                                                   instance.hypervisor)
6143       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6144                                          instance.hypervisor)
6145       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6146         # Assume the primary node is unreachable and go ahead
6147         self.warn.append("Can't get info from primary node %s" % pnode)
6148       else:
6149         if not instance_info.failed and instance_info.data:
6150           current_mem = int(instance_info.data['memory'])
6151         else:
6152           # Assume instance not running
6153           # (there is a slight race condition here, but it's not very probable,
6154           # and we have no other way to check)
6155           current_mem = 0
6156         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6157                     nodeinfo[pnode].data['memory_free'])
6158         if miss_mem > 0:
6159           raise errors.OpPrereqError("This change will prevent the instance"
6160                                      " from starting, due to %d MB of memory"
6161                                      " missing on its primary node" % miss_mem)
6162
6163       if be_new[constants.BE_AUTO_BALANCE]:
6164         for node, nres in nodeinfo.iteritems():
6165           if node not in instance.secondary_nodes:
6166             continue
6167           if nres.failed or not isinstance(nres.data, dict):
6168             self.warn.append("Can't get info from secondary node %s" % node)
6169           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6170             self.warn.append("Not enough memory to failover instance to"
6171                              " secondary node %s" % node)
6172
6173     # NIC processing
6174     for nic_op, nic_dict in self.op.nics:
6175       if nic_op == constants.DDM_REMOVE:
6176         if not instance.nics:
6177           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6178         continue
6179       if nic_op != constants.DDM_ADD:
6180         # an existing nic
6181         if nic_op < 0 or nic_op >= len(instance.nics):
6182           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6183                                      " are 0 to %d" %
6184                                      (nic_op, len(instance.nics)))
6185       if 'bridge' in nic_dict:
6186         nic_bridge = nic_dict['bridge']
6187         if nic_bridge is None:
6188           raise errors.OpPrereqError('Cannot set the nic bridge to None')
6189         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6190           msg = ("Bridge '%s' doesn't exist on one of"
6191                  " the instance nodes" % nic_bridge)
6192           if self.force:
6193             self.warn.append(msg)
6194           else:
6195             raise errors.OpPrereqError(msg)
6196       if 'mac' in nic_dict:
6197         nic_mac = nic_dict['mac']
6198         if nic_mac is None:
6199           raise errors.OpPrereqError('Cannot set the nic mac to None')
6200         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6201           # otherwise generate the mac
6202           nic_dict['mac'] = self.cfg.GenerateMAC()
6203         else:
6204           # or validate/reserve the current one
6205           if self.cfg.IsMacInUse(nic_mac):
6206             raise errors.OpPrereqError("MAC address %s already in use"
6207                                        " in cluster" % nic_mac)
6208
6209     # DISK processing
6210     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6211       raise errors.OpPrereqError("Disk operations not supported for"
6212                                  " diskless instances")
6213     for disk_op, disk_dict in self.op.disks:
6214       if disk_op == constants.DDM_REMOVE:
6215         if len(instance.disks) == 1:
6216           raise errors.OpPrereqError("Cannot remove the last disk of"
6217                                      " an instance")
6218         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6219         ins_l = ins_l[pnode]
6220         if ins_l.failed or not isinstance(ins_l.data, list):
6221           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6222         if instance.name in ins_l.data:
6223           raise errors.OpPrereqError("Instance is running, can't remove"
6224                                      " disks.")
6225
6226       if (disk_op == constants.DDM_ADD and
6227           len(instance.nics) >= constants.MAX_DISKS):
6228         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6229                                    " add more" % constants.MAX_DISKS)
6230       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6231         # an existing disk
6232         if disk_op < 0 or disk_op >= len(instance.disks):
6233           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6234                                      " are 0 to %d" %
6235                                      (disk_op, len(instance.disks)))
6236
6237     return
6238
6239   def Exec(self, feedback_fn):
6240     """Modifies an instance.
6241
6242     All parameters take effect only at the next restart of the instance.
6243
6244     """
6245     # Process here the warnings from CheckPrereq, as we don't have a
6246     # feedback_fn there.
6247     for warn in self.warn:
6248       feedback_fn("WARNING: %s" % warn)
6249
6250     result = []
6251     instance = self.instance
6252     # disk changes
6253     for disk_op, disk_dict in self.op.disks:
6254       if disk_op == constants.DDM_REMOVE:
6255         # remove the last disk
6256         device = instance.disks.pop()
6257         device_idx = len(instance.disks)
6258         for node, disk in device.ComputeNodeTree(instance.primary_node):
6259           self.cfg.SetDiskID(disk, node)
6260           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6261           if msg:
6262             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6263                             " continuing anyway", device_idx, node, msg)
6264         result.append(("disk/%d" % device_idx, "remove"))
6265       elif disk_op == constants.DDM_ADD:
6266         # add a new disk
6267         if instance.disk_template == constants.DT_FILE:
6268           file_driver, file_path = instance.disks[0].logical_id
6269           file_path = os.path.dirname(file_path)
6270         else:
6271           file_driver = file_path = None
6272         disk_idx_base = len(instance.disks)
6273         new_disk = _GenerateDiskTemplate(self,
6274                                          instance.disk_template,
6275                                          instance.name, instance.primary_node,
6276                                          instance.secondary_nodes,
6277                                          [disk_dict],
6278                                          file_path,
6279                                          file_driver,
6280                                          disk_idx_base)[0]
6281         instance.disks.append(new_disk)
6282         info = _GetInstanceInfoText(instance)
6283
6284         logging.info("Creating volume %s for instance %s",
6285                      new_disk.iv_name, instance.name)
6286         # Note: this needs to be kept in sync with _CreateDisks
6287         #HARDCODE
6288         for node in instance.all_nodes:
6289           f_create = node == instance.primary_node
6290           try:
6291             _CreateBlockDev(self, node, instance, new_disk,
6292                             f_create, info, f_create)
6293           except errors.OpExecError, err:
6294             self.LogWarning("Failed to create volume %s (%s) on"
6295                             " node %s: %s",
6296                             new_disk.iv_name, new_disk, node, err)
6297         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6298                        (new_disk.size, new_disk.mode)))
6299       else:
6300         # change a given disk
6301         instance.disks[disk_op].mode = disk_dict['mode']
6302         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6303     # NIC changes
6304     for nic_op, nic_dict in self.op.nics:
6305       if nic_op == constants.DDM_REMOVE:
6306         # remove the last nic
6307         del instance.nics[-1]
6308         result.append(("nic.%d" % len(instance.nics), "remove"))
6309       elif nic_op == constants.DDM_ADD:
6310         # mac and bridge should be set, by now
6311         mac = nic_dict['mac']
6312         bridge = nic_dict['bridge']
6313         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6314                               bridge=bridge)
6315         instance.nics.append(new_nic)
6316         result.append(("nic.%d" % (len(instance.nics) - 1),
6317                        "add:mac=%s,ip=%s,bridge=%s" %
6318                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6319       else:
6320         # change a given nic
6321         for key in 'mac', 'ip', 'bridge':
6322           if key in nic_dict:
6323             setattr(instance.nics[nic_op], key, nic_dict[key])
6324             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6325
6326     # hvparams changes
6327     if self.op.hvparams:
6328       instance.hvparams = self.hv_inst
6329       for key, val in self.op.hvparams.iteritems():
6330         result.append(("hv/%s" % key, val))
6331
6332     # beparams changes
6333     if self.op.beparams:
6334       instance.beparams = self.be_inst
6335       for key, val in self.op.beparams.iteritems():
6336         result.append(("be/%s" % key, val))
6337
6338     self.cfg.Update(instance)
6339
6340     return result
6341
6342
6343 class LUQueryExports(NoHooksLU):
6344   """Query the exports list
6345
6346   """
6347   _OP_REQP = ['nodes']
6348   REQ_BGL = False
6349
6350   def ExpandNames(self):
6351     self.needed_locks = {}
6352     self.share_locks[locking.LEVEL_NODE] = 1
6353     if not self.op.nodes:
6354       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6355     else:
6356       self.needed_locks[locking.LEVEL_NODE] = \
6357         _GetWantedNodes(self, self.op.nodes)
6358
6359   def CheckPrereq(self):
6360     """Check prerequisites.
6361
6362     """
6363     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6364
6365   def Exec(self, feedback_fn):
6366     """Compute the list of all the exported system images.
6367
6368     @rtype: dict
6369     @return: a dictionary with the structure node->(export-list)
6370         where export-list is a list of the instances exported on
6371         that node.
6372
6373     """
6374     rpcresult = self.rpc.call_export_list(self.nodes)
6375     result = {}
6376     for node in rpcresult:
6377       if rpcresult[node].failed:
6378         result[node] = False
6379       else:
6380         result[node] = rpcresult[node].data
6381
6382     return result
6383
6384
6385 class LUExportInstance(LogicalUnit):
6386   """Export an instance to an image in the cluster.
6387
6388   """
6389   HPATH = "instance-export"
6390   HTYPE = constants.HTYPE_INSTANCE
6391   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6392   REQ_BGL = False
6393
6394   def ExpandNames(self):
6395     self._ExpandAndLockInstance()
6396     # FIXME: lock only instance primary and destination node
6397     #
6398     # Sad but true, for now we have do lock all nodes, as we don't know where
6399     # the previous export might be, and and in this LU we search for it and
6400     # remove it from its current node. In the future we could fix this by:
6401     #  - making a tasklet to search (share-lock all), then create the new one,
6402     #    then one to remove, after
6403     #  - removing the removal operation altogether
6404     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6405
6406   def DeclareLocks(self, level):
6407     """Last minute lock declaration."""
6408     # All nodes are locked anyway, so nothing to do here.
6409
6410   def BuildHooksEnv(self):
6411     """Build hooks env.
6412
6413     This will run on the master, primary node and target node.
6414
6415     """
6416     env = {
6417       "EXPORT_NODE": self.op.target_node,
6418       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6419       }
6420     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6421     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6422           self.op.target_node]
6423     return env, nl, nl
6424
6425   def CheckPrereq(self):
6426     """Check prerequisites.
6427
6428     This checks that the instance and node names are valid.
6429
6430     """
6431     instance_name = self.op.instance_name
6432     self.instance = self.cfg.GetInstanceInfo(instance_name)
6433     assert self.instance is not None, \
6434           "Cannot retrieve locked instance %s" % self.op.instance_name
6435     _CheckNodeOnline(self, self.instance.primary_node)
6436
6437     self.dst_node = self.cfg.GetNodeInfo(
6438       self.cfg.ExpandNodeName(self.op.target_node))
6439
6440     if self.dst_node is None:
6441       # This is wrong node name, not a non-locked node
6442       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6443     _CheckNodeOnline(self, self.dst_node.name)
6444     _CheckNodeNotDrained(self, self.dst_node.name)
6445
6446     # instance disk type verification
6447     for disk in self.instance.disks:
6448       if disk.dev_type == constants.LD_FILE:
6449         raise errors.OpPrereqError("Export not supported for instances with"
6450                                    " file-based disks")
6451
6452   def Exec(self, feedback_fn):
6453     """Export an instance to an image in the cluster.
6454
6455     """
6456     instance = self.instance
6457     dst_node = self.dst_node
6458     src_node = instance.primary_node
6459     if self.op.shutdown:
6460       # shutdown the instance, but not the disks
6461       result = self.rpc.call_instance_shutdown(src_node, instance)
6462       msg = result.RemoteFailMsg()
6463       if msg:
6464         raise errors.OpExecError("Could not shutdown instance %s on"
6465                                  " node %s: %s" %
6466                                  (instance.name, src_node, msg))
6467
6468     vgname = self.cfg.GetVGName()
6469
6470     snap_disks = []
6471
6472     # set the disks ID correctly since call_instance_start needs the
6473     # correct drbd minor to create the symlinks
6474     for disk in instance.disks:
6475       self.cfg.SetDiskID(disk, src_node)
6476
6477     # per-disk results
6478     dresults = []
6479     try:
6480       for idx, disk in enumerate(instance.disks):
6481         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6482         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6483         if new_dev_name.failed or not new_dev_name.data:
6484           self.LogWarning("Could not snapshot disk/%d on node %s",
6485                           idx, src_node)
6486           snap_disks.append(False)
6487         else:
6488           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6489                                  logical_id=(vgname, new_dev_name.data),
6490                                  physical_id=(vgname, new_dev_name.data),
6491                                  iv_name=disk.iv_name)
6492           snap_disks.append(new_dev)
6493
6494     finally:
6495       if self.op.shutdown and instance.admin_up:
6496         result = self.rpc.call_instance_start(src_node, instance, None, None)
6497         msg = result.RemoteFailMsg()
6498         if msg:
6499           _ShutdownInstanceDisks(self, instance)
6500           raise errors.OpExecError("Could not start instance: %s" % msg)
6501
6502     # TODO: check for size
6503
6504     cluster_name = self.cfg.GetClusterName()
6505     for idx, dev in enumerate(snap_disks):
6506       if dev:
6507         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6508                                                instance, cluster_name, idx)
6509         if result.failed or not result.data:
6510           self.LogWarning("Could not export disk/%d from node %s to"
6511                           " node %s", idx, src_node, dst_node.name)
6512           dresults.append(False)
6513         else:
6514           dresults.append(True)
6515         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6516         if msg:
6517           self.LogWarning("Could not remove snapshot for disk/%d from node"
6518                           " %s: %s", idx, src_node, msg)
6519       else:
6520         dresults.append(False)
6521
6522     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6523     fin_resu = True
6524     if result.failed or not result.data:
6525       self.LogWarning("Could not finalize export for instance %s on node %s",
6526                       instance.name, dst_node.name)
6527       fin_resu = False
6528
6529     nodelist = self.cfg.GetNodeList()
6530     nodelist.remove(dst_node.name)
6531
6532     # on one-node clusters nodelist will be empty after the removal
6533     # if we proceed the backup would be removed because OpQueryExports
6534     # substitutes an empty list with the full cluster node list.
6535     if nodelist:
6536       exportlist = self.rpc.call_export_list(nodelist)
6537       for node in exportlist:
6538         if exportlist[node].failed:
6539           continue
6540         if instance.name in exportlist[node].data:
6541           if not self.rpc.call_export_remove(node, instance.name):
6542             self.LogWarning("Could not remove older export for instance %s"
6543                             " on node %s", instance.name, node)
6544     return fin_resu, dresults
6545
6546
6547 class LURemoveExport(NoHooksLU):
6548   """Remove exports related to the named instance.
6549
6550   """
6551   _OP_REQP = ["instance_name"]
6552   REQ_BGL = False
6553
6554   def ExpandNames(self):
6555     self.needed_locks = {}
6556     # We need all nodes to be locked in order for RemoveExport to work, but we
6557     # don't need to lock the instance itself, as nothing will happen to it (and
6558     # we can remove exports also for a removed instance)
6559     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6560
6561   def CheckPrereq(self):
6562     """Check prerequisites.
6563     """
6564     pass
6565
6566   def Exec(self, feedback_fn):
6567     """Remove any export.
6568
6569     """
6570     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6571     # If the instance was not found we'll try with the name that was passed in.
6572     # This will only work if it was an FQDN, though.
6573     fqdn_warn = False
6574     if not instance_name:
6575       fqdn_warn = True
6576       instance_name = self.op.instance_name
6577
6578     exportlist = self.rpc.call_export_list(self.acquired_locks[
6579       locking.LEVEL_NODE])
6580     found = False
6581     for node in exportlist:
6582       if exportlist[node].failed:
6583         self.LogWarning("Failed to query node %s, continuing" % node)
6584         continue
6585       if instance_name in exportlist[node].data:
6586         found = True
6587         result = self.rpc.call_export_remove(node, instance_name)
6588         if result.failed or not result.data:
6589           logging.error("Could not remove export for instance %s"
6590                         " on node %s", instance_name, node)
6591
6592     if fqdn_warn and not found:
6593       feedback_fn("Export not found. If trying to remove an export belonging"
6594                   " to a deleted instance please use its Fully Qualified"
6595                   " Domain Name.")
6596
6597
6598 class TagsLU(NoHooksLU):
6599   """Generic tags LU.
6600
6601   This is an abstract class which is the parent of all the other tags LUs.
6602
6603   """
6604
6605   def ExpandNames(self):
6606     self.needed_locks = {}
6607     if self.op.kind == constants.TAG_NODE:
6608       name = self.cfg.ExpandNodeName(self.op.name)
6609       if name is None:
6610         raise errors.OpPrereqError("Invalid node name (%s)" %
6611                                    (self.op.name,))
6612       self.op.name = name
6613       self.needed_locks[locking.LEVEL_NODE] = name
6614     elif self.op.kind == constants.TAG_INSTANCE:
6615       name = self.cfg.ExpandInstanceName(self.op.name)
6616       if name is None:
6617         raise errors.OpPrereqError("Invalid instance name (%s)" %
6618                                    (self.op.name,))
6619       self.op.name = name
6620       self.needed_locks[locking.LEVEL_INSTANCE] = name
6621
6622   def CheckPrereq(self):
6623     """Check prerequisites.
6624
6625     """
6626     if self.op.kind == constants.TAG_CLUSTER:
6627       self.target = self.cfg.GetClusterInfo()
6628     elif self.op.kind == constants.TAG_NODE:
6629       self.target = self.cfg.GetNodeInfo(self.op.name)
6630     elif self.op.kind == constants.TAG_INSTANCE:
6631       self.target = self.cfg.GetInstanceInfo(self.op.name)
6632     else:
6633       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6634                                  str(self.op.kind))
6635
6636
6637 class LUGetTags(TagsLU):
6638   """Returns the tags of a given object.
6639
6640   """
6641   _OP_REQP = ["kind", "name"]
6642   REQ_BGL = False
6643
6644   def Exec(self, feedback_fn):
6645     """Returns the tag list.
6646
6647     """
6648     return list(self.target.GetTags())
6649
6650
6651 class LUSearchTags(NoHooksLU):
6652   """Searches the tags for a given pattern.
6653
6654   """
6655   _OP_REQP = ["pattern"]
6656   REQ_BGL = False
6657
6658   def ExpandNames(self):
6659     self.needed_locks = {}
6660
6661   def CheckPrereq(self):
6662     """Check prerequisites.
6663
6664     This checks the pattern passed for validity by compiling it.
6665
6666     """
6667     try:
6668       self.re = re.compile(self.op.pattern)
6669     except re.error, err:
6670       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6671                                  (self.op.pattern, err))
6672
6673   def Exec(self, feedback_fn):
6674     """Returns the tag list.
6675
6676     """
6677     cfg = self.cfg
6678     tgts = [("/cluster", cfg.GetClusterInfo())]
6679     ilist = cfg.GetAllInstancesInfo().values()
6680     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6681     nlist = cfg.GetAllNodesInfo().values()
6682     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6683     results = []
6684     for path, target in tgts:
6685       for tag in target.GetTags():
6686         if self.re.search(tag):
6687           results.append((path, tag))
6688     return results
6689
6690
6691 class LUAddTags(TagsLU):
6692   """Sets a tag on a given object.
6693
6694   """
6695   _OP_REQP = ["kind", "name", "tags"]
6696   REQ_BGL = False
6697
6698   def CheckPrereq(self):
6699     """Check prerequisites.
6700
6701     This checks the type and length of the tag name and value.
6702
6703     """
6704     TagsLU.CheckPrereq(self)
6705     for tag in self.op.tags:
6706       objects.TaggableObject.ValidateTag(tag)
6707
6708   def Exec(self, feedback_fn):
6709     """Sets the tag.
6710
6711     """
6712     try:
6713       for tag in self.op.tags:
6714         self.target.AddTag(tag)
6715     except errors.TagError, err:
6716       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6717     try:
6718       self.cfg.Update(self.target)
6719     except errors.ConfigurationError:
6720       raise errors.OpRetryError("There has been a modification to the"
6721                                 " config file and the operation has been"
6722                                 " aborted. Please retry.")
6723
6724
6725 class LUDelTags(TagsLU):
6726   """Delete a list of tags from a given object.
6727
6728   """
6729   _OP_REQP = ["kind", "name", "tags"]
6730   REQ_BGL = False
6731
6732   def CheckPrereq(self):
6733     """Check prerequisites.
6734
6735     This checks that we have the given tag.
6736
6737     """
6738     TagsLU.CheckPrereq(self)
6739     for tag in self.op.tags:
6740       objects.TaggableObject.ValidateTag(tag)
6741     del_tags = frozenset(self.op.tags)
6742     cur_tags = self.target.GetTags()
6743     if not del_tags <= cur_tags:
6744       diff_tags = del_tags - cur_tags
6745       diff_names = ["'%s'" % tag for tag in diff_tags]
6746       diff_names.sort()
6747       raise errors.OpPrereqError("Tag(s) %s not found" %
6748                                  (",".join(diff_names)))
6749
6750   def Exec(self, feedback_fn):
6751     """Remove the tag from the object.
6752
6753     """
6754     for tag in self.op.tags:
6755       self.target.RemoveTag(tag)
6756     try:
6757       self.cfg.Update(self.target)
6758     except errors.ConfigurationError:
6759       raise errors.OpRetryError("There has been a modification to the"
6760                                 " config file and the operation has been"
6761                                 " aborted. Please retry.")
6762
6763
6764 class LUTestDelay(NoHooksLU):
6765   """Sleep for a specified amount of time.
6766
6767   This LU sleeps on the master and/or nodes for a specified amount of
6768   time.
6769
6770   """
6771   _OP_REQP = ["duration", "on_master", "on_nodes"]
6772   REQ_BGL = False
6773
6774   def ExpandNames(self):
6775     """Expand names and set required locks.
6776
6777     This expands the node list, if any.
6778
6779     """
6780     self.needed_locks = {}
6781     if self.op.on_nodes:
6782       # _GetWantedNodes can be used here, but is not always appropriate to use
6783       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6784       # more information.
6785       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6786       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6787
6788   def CheckPrereq(self):
6789     """Check prerequisites.
6790
6791     """
6792
6793   def Exec(self, feedback_fn):
6794     """Do the actual sleep.
6795
6796     """
6797     if self.op.on_master:
6798       if not utils.TestDelay(self.op.duration):
6799         raise errors.OpExecError("Error during master delay test")
6800     if self.op.on_nodes:
6801       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6802       if not result:
6803         raise errors.OpExecError("Complete failure from rpc call")
6804       for node, node_result in result.items():
6805         node_result.Raise()
6806         if not node_result.data:
6807           raise errors.OpExecError("Failure during rpc call to node %s,"
6808                                    " result: %s" % (node, node_result.data))
6809
6810
6811 class IAllocator(object):
6812   """IAllocator framework.
6813
6814   An IAllocator instance has three sets of attributes:
6815     - cfg that is needed to query the cluster
6816     - input data (all members of the _KEYS class attribute are required)
6817     - four buffer attributes (in|out_data|text), that represent the
6818       input (to the external script) in text and data structure format,
6819       and the output from it, again in two formats
6820     - the result variables from the script (success, info, nodes) for
6821       easy usage
6822
6823   """
6824   _ALLO_KEYS = [
6825     "mem_size", "disks", "disk_template",
6826     "os", "tags", "nics", "vcpus", "hypervisor",
6827     ]
6828   _RELO_KEYS = [
6829     "relocate_from",
6830     ]
6831
6832   def __init__(self, lu, mode, name, **kwargs):
6833     self.lu = lu
6834     # init buffer variables
6835     self.in_text = self.out_text = self.in_data = self.out_data = None
6836     # init all input fields so that pylint is happy
6837     self.mode = mode
6838     self.name = name
6839     self.mem_size = self.disks = self.disk_template = None
6840     self.os = self.tags = self.nics = self.vcpus = None
6841     self.hypervisor = None
6842     self.relocate_from = None
6843     # computed fields
6844     self.required_nodes = None
6845     # init result fields
6846     self.success = self.info = self.nodes = None
6847     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6848       keyset = self._ALLO_KEYS
6849     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6850       keyset = self._RELO_KEYS
6851     else:
6852       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6853                                    " IAllocator" % self.mode)
6854     for key in kwargs:
6855       if key not in keyset:
6856         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6857                                      " IAllocator" % key)
6858       setattr(self, key, kwargs[key])
6859     for key in keyset:
6860       if key not in kwargs:
6861         raise errors.ProgrammerError("Missing input parameter '%s' to"
6862                                      " IAllocator" % key)
6863     self._BuildInputData()
6864
6865   def _ComputeClusterData(self):
6866     """Compute the generic allocator input data.
6867
6868     This is the data that is independent of the actual operation.
6869
6870     """
6871     cfg = self.lu.cfg
6872     cluster_info = cfg.GetClusterInfo()
6873     # cluster data
6874     data = {
6875       "version": constants.IALLOCATOR_VERSION,
6876       "cluster_name": cfg.GetClusterName(),
6877       "cluster_tags": list(cluster_info.GetTags()),
6878       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6879       # we don't have job IDs
6880       }
6881     iinfo = cfg.GetAllInstancesInfo().values()
6882     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6883
6884     # node data
6885     node_results = {}
6886     node_list = cfg.GetNodeList()
6887
6888     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6889       hypervisor_name = self.hypervisor
6890     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6891       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6892
6893     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6894                                            hypervisor_name)
6895     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6896                        cluster_info.enabled_hypervisors)
6897     for nname, nresult in node_data.items():
6898       # first fill in static (config-based) values
6899       ninfo = cfg.GetNodeInfo(nname)
6900       pnr = {
6901         "tags": list(ninfo.GetTags()),
6902         "primary_ip": ninfo.primary_ip,
6903         "secondary_ip": ninfo.secondary_ip,
6904         "offline": ninfo.offline,
6905         "drained": ninfo.drained,
6906         "master_candidate": ninfo.master_candidate,
6907         }
6908
6909       if not ninfo.offline:
6910         nresult.Raise()
6911         if not isinstance(nresult.data, dict):
6912           raise errors.OpExecError("Can't get data for node %s" % nname)
6913         remote_info = nresult.data
6914         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6915                      'vg_size', 'vg_free', 'cpu_total']:
6916           if attr not in remote_info:
6917             raise errors.OpExecError("Node '%s' didn't return attribute"
6918                                      " '%s'" % (nname, attr))
6919           try:
6920             remote_info[attr] = int(remote_info[attr])
6921           except ValueError, err:
6922             raise errors.OpExecError("Node '%s' returned invalid value"
6923                                      " for '%s': %s" % (nname, attr, err))
6924         # compute memory used by primary instances
6925         i_p_mem = i_p_up_mem = 0
6926         for iinfo, beinfo in i_list:
6927           if iinfo.primary_node == nname:
6928             i_p_mem += beinfo[constants.BE_MEMORY]
6929             if iinfo.name not in node_iinfo[nname].data:
6930               i_used_mem = 0
6931             else:
6932               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6933             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6934             remote_info['memory_free'] -= max(0, i_mem_diff)
6935
6936             if iinfo.admin_up:
6937               i_p_up_mem += beinfo[constants.BE_MEMORY]
6938
6939         # compute memory used by instances
6940         pnr_dyn = {
6941           "total_memory": remote_info['memory_total'],
6942           "reserved_memory": remote_info['memory_dom0'],
6943           "free_memory": remote_info['memory_free'],
6944           "total_disk": remote_info['vg_size'],
6945           "free_disk": remote_info['vg_free'],
6946           "total_cpus": remote_info['cpu_total'],
6947           "i_pri_memory": i_p_mem,
6948           "i_pri_up_memory": i_p_up_mem,
6949           }
6950         pnr.update(pnr_dyn)
6951
6952       node_results[nname] = pnr
6953     data["nodes"] = node_results
6954
6955     # instance data
6956     instance_data = {}
6957     for iinfo, beinfo in i_list:
6958       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6959                   for n in iinfo.nics]
6960       pir = {
6961         "tags": list(iinfo.GetTags()),
6962         "admin_up": iinfo.admin_up,
6963         "vcpus": beinfo[constants.BE_VCPUS],
6964         "memory": beinfo[constants.BE_MEMORY],
6965         "os": iinfo.os,
6966         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6967         "nics": nic_data,
6968         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6969         "disk_template": iinfo.disk_template,
6970         "hypervisor": iinfo.hypervisor,
6971         }
6972       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6973                                                  pir["disks"])
6974       instance_data[iinfo.name] = pir
6975
6976     data["instances"] = instance_data
6977
6978     self.in_data = data
6979
6980   def _AddNewInstance(self):
6981     """Add new instance data to allocator structure.
6982
6983     This in combination with _AllocatorGetClusterData will create the
6984     correct structure needed as input for the allocator.
6985
6986     The checks for the completeness of the opcode must have already been
6987     done.
6988
6989     """
6990     data = self.in_data
6991
6992     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6993
6994     if self.disk_template in constants.DTS_NET_MIRROR:
6995       self.required_nodes = 2
6996     else:
6997       self.required_nodes = 1
6998     request = {
6999       "type": "allocate",
7000       "name": self.name,
7001       "disk_template": self.disk_template,
7002       "tags": self.tags,
7003       "os": self.os,
7004       "vcpus": self.vcpus,
7005       "memory": self.mem_size,
7006       "disks": self.disks,
7007       "disk_space_total": disk_space,
7008       "nics": self.nics,
7009       "required_nodes": self.required_nodes,
7010       }
7011     data["request"] = request
7012
7013   def _AddRelocateInstance(self):
7014     """Add relocate instance data to allocator structure.
7015
7016     This in combination with _IAllocatorGetClusterData will create the
7017     correct structure needed as input for the allocator.
7018
7019     The checks for the completeness of the opcode must have already been
7020     done.
7021
7022     """
7023     instance = self.lu.cfg.GetInstanceInfo(self.name)
7024     if instance is None:
7025       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7026                                    " IAllocator" % self.name)
7027
7028     if instance.disk_template not in constants.DTS_NET_MIRROR:
7029       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7030
7031     if len(instance.secondary_nodes) != 1:
7032       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7033
7034     self.required_nodes = 1
7035     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7036     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7037
7038     request = {
7039       "type": "relocate",
7040       "name": self.name,
7041       "disk_space_total": disk_space,
7042       "required_nodes": self.required_nodes,
7043       "relocate_from": self.relocate_from,
7044       }
7045     self.in_data["request"] = request
7046
7047   def _BuildInputData(self):
7048     """Build input data structures.
7049
7050     """
7051     self._ComputeClusterData()
7052
7053     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7054       self._AddNewInstance()
7055     else:
7056       self._AddRelocateInstance()
7057
7058     self.in_text = serializer.Dump(self.in_data)
7059
7060   def Run(self, name, validate=True, call_fn=None):
7061     """Run an instance allocator and return the results.
7062
7063     """
7064     if call_fn is None:
7065       call_fn = self.lu.rpc.call_iallocator_runner
7066
7067     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7068     result.Raise()
7069
7070     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7071       raise errors.OpExecError("Invalid result from master iallocator runner")
7072
7073     rcode, stdout, stderr, fail = result.data
7074
7075     if rcode == constants.IARUN_NOTFOUND:
7076       raise errors.OpExecError("Can't find allocator '%s'" % name)
7077     elif rcode == constants.IARUN_FAILURE:
7078       raise errors.OpExecError("Instance allocator call failed: %s,"
7079                                " output: %s" % (fail, stdout+stderr))
7080     self.out_text = stdout
7081     if validate:
7082       self._ValidateResult()
7083
7084   def _ValidateResult(self):
7085     """Process the allocator results.
7086
7087     This will process and if successful save the result in
7088     self.out_data and the other parameters.
7089
7090     """
7091     try:
7092       rdict = serializer.Load(self.out_text)
7093     except Exception, err:
7094       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7095
7096     if not isinstance(rdict, dict):
7097       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7098
7099     for key in "success", "info", "nodes":
7100       if key not in rdict:
7101         raise errors.OpExecError("Can't parse iallocator results:"
7102                                  " missing key '%s'" % key)
7103       setattr(self, key, rdict[key])
7104
7105     if not isinstance(rdict["nodes"], list):
7106       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7107                                " is not a list")
7108     self.out_data = rdict
7109
7110
7111 class LUTestAllocator(NoHooksLU):
7112   """Run allocator tests.
7113
7114   This LU runs the allocator tests
7115
7116   """
7117   _OP_REQP = ["direction", "mode", "name"]
7118
7119   def CheckPrereq(self):
7120     """Check prerequisites.
7121
7122     This checks the opcode parameters depending on the director and mode test.
7123
7124     """
7125     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7126       for attr in ["name", "mem_size", "disks", "disk_template",
7127                    "os", "tags", "nics", "vcpus"]:
7128         if not hasattr(self.op, attr):
7129           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7130                                      attr)
7131       iname = self.cfg.ExpandInstanceName(self.op.name)
7132       if iname is not None:
7133         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7134                                    iname)
7135       if not isinstance(self.op.nics, list):
7136         raise errors.OpPrereqError("Invalid parameter 'nics'")
7137       for row in self.op.nics:
7138         if (not isinstance(row, dict) or
7139             "mac" not in row or
7140             "ip" not in row or
7141             "bridge" not in row):
7142           raise errors.OpPrereqError("Invalid contents of the"
7143                                      " 'nics' parameter")
7144       if not isinstance(self.op.disks, list):
7145         raise errors.OpPrereqError("Invalid parameter 'disks'")
7146       for row in self.op.disks:
7147         if (not isinstance(row, dict) or
7148             "size" not in row or
7149             not isinstance(row["size"], int) or
7150             "mode" not in row or
7151             row["mode"] not in ['r', 'w']):
7152           raise errors.OpPrereqError("Invalid contents of the"
7153                                      " 'disks' parameter")
7154       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7155         self.op.hypervisor = self.cfg.GetHypervisorType()
7156     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7157       if not hasattr(self.op, "name"):
7158         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7159       fname = self.cfg.ExpandInstanceName(self.op.name)
7160       if fname is None:
7161         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7162                                    self.op.name)
7163       self.op.name = fname
7164       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7165     else:
7166       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7167                                  self.op.mode)
7168
7169     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7170       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7171         raise errors.OpPrereqError("Missing allocator name")
7172     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7173       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7174                                  self.op.direction)
7175
7176   def Exec(self, feedback_fn):
7177     """Run the allocator test.
7178
7179     """
7180     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7181       ial = IAllocator(self,
7182                        mode=self.op.mode,
7183                        name=self.op.name,
7184                        mem_size=self.op.mem_size,
7185                        disks=self.op.disks,
7186                        disk_template=self.op.disk_template,
7187                        os=self.op.os,
7188                        tags=self.op.tags,
7189                        nics=self.op.nics,
7190                        vcpus=self.op.vcpus,
7191                        hypervisor=self.op.hypervisor,
7192                        )
7193     else:
7194       ial = IAllocator(self,
7195                        mode=self.op.mode,
7196                        name=self.op.name,
7197                        relocate_from=list(self.relocate_from),
7198                        )
7199
7200     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7201       result = ial.in_text
7202     else:
7203       ial.Run(self.op.allocator, validate=False)
7204       result = ial.out_text
7205     return result