Fix instance creation
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement ExpandNames
52     - implement CheckPrereq
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements:
57         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
58
59   Note that all commands require root permissions.
60
61   """
62   HPATH = None
63   HTYPE = None
64   _OP_REQP = []
65   REQ_BGL = True
66
67   def __init__(self, processor, op, context, rpc):
68     """Constructor for LogicalUnit.
69
70     This needs to be overriden in derived classes in order to check op
71     validity.
72
73     """
74     self.proc = processor
75     self.op = op
76     self.cfg = context.cfg
77     self.context = context
78     self.rpc = rpc
79     # Dicts used to declare locking needs to mcpu
80     self.needed_locks = None
81     self.acquired_locks = {}
82     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
83     self.add_locks = {}
84     self.remove_locks = {}
85     # Used to force good behavior when calling helper functions
86     self.recalculate_locks = {}
87     self.__ssh = None
88     # logging
89     self.LogWarning = processor.LogWarning
90     self.LogInfo = processor.LogInfo
91
92     for attr_name in self._OP_REQP:
93       attr_val = getattr(op, attr_name, None)
94       if attr_val is None:
95         raise errors.OpPrereqError("Required parameter '%s' missing" %
96                                    attr_name)
97     self.CheckArguments()
98
99   def __GetSSH(self):
100     """Returns the SshRunner object
101
102     """
103     if not self.__ssh:
104       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
105     return self.__ssh
106
107   ssh = property(fget=__GetSSH)
108
109   def CheckArguments(self):
110     """Check syntactic validity for the opcode arguments.
111
112     This method is for doing a simple syntactic check and ensure
113     validity of opcode parameters, without any cluster-related
114     checks. While the same can be accomplished in ExpandNames and/or
115     CheckPrereq, doing these separate is better because:
116
117       - ExpandNames is left as as purely a lock-related function
118       - CheckPrereq is run after we have aquired locks (and possible
119         waited for them)
120
121     The function is allowed to change the self.op attribute so that
122     later methods can no longer worry about missing parameters.
123
124     """
125     pass
126
127   def ExpandNames(self):
128     """Expand names for this LU.
129
130     This method is called before starting to execute the opcode, and it should
131     update all the parameters of the opcode to their canonical form (e.g. a
132     short node name must be fully expanded after this method has successfully
133     completed). This way locking, hooks, logging, ecc. can work correctly.
134
135     LUs which implement this method must also populate the self.needed_locks
136     member, as a dict with lock levels as keys, and a list of needed lock names
137     as values. Rules:
138
139       - use an empty dict if you don't need any lock
140       - if you don't need any lock at a particular level omit that level
141       - don't put anything for the BGL level
142       - if you want all locks at a level use locking.ALL_SET as a value
143
144     If you need to share locks (rather than acquire them exclusively) at one
145     level you can modify self.share_locks, setting a true value (usually 1) for
146     that level. By default locks are not shared.
147
148     Examples::
149
150       # Acquire all nodes and one instance
151       self.needed_locks = {
152         locking.LEVEL_NODE: locking.ALL_SET,
153         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
154       }
155       # Acquire just two nodes
156       self.needed_locks = {
157         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
158       }
159       # Acquire no locks
160       self.needed_locks = {} # No, you can't leave it to the default value None
161
162     """
163     # The implementation of this method is mandatory only if the new LU is
164     # concurrent, so that old LUs don't need to be changed all at the same
165     # time.
166     if self.REQ_BGL:
167       self.needed_locks = {} # Exclusive LUs don't need locks.
168     else:
169       raise NotImplementedError
170
171   def DeclareLocks(self, level):
172     """Declare LU locking needs for a level
173
174     While most LUs can just declare their locking needs at ExpandNames time,
175     sometimes there's the need to calculate some locks after having acquired
176     the ones before. This function is called just before acquiring locks at a
177     particular level, but after acquiring the ones at lower levels, and permits
178     such calculations. It can be used to modify self.needed_locks, and by
179     default it does nothing.
180
181     This function is only called if you have something already set in
182     self.needed_locks for the level.
183
184     @param level: Locking level which is going to be locked
185     @type level: member of ganeti.locking.LEVELS
186
187     """
188
189   def CheckPrereq(self):
190     """Check prerequisites for this LU.
191
192     This method should check that the prerequisites for the execution
193     of this LU are fulfilled. It can do internode communication, but
194     it should be idempotent - no cluster or system changes are
195     allowed.
196
197     The method should raise errors.OpPrereqError in case something is
198     not fulfilled. Its return value is ignored.
199
200     This method should also update all the parameters of the opcode to
201     their canonical form if it hasn't been done by ExpandNames before.
202
203     """
204     raise NotImplementedError
205
206   def Exec(self, feedback_fn):
207     """Execute the LU.
208
209     This method should implement the actual work. It should raise
210     errors.OpExecError for failures that are somewhat dealt with in
211     code, or expected.
212
213     """
214     raise NotImplementedError
215
216   def BuildHooksEnv(self):
217     """Build hooks environment for this LU.
218
219     This method should return a three-node tuple consisting of: a dict
220     containing the environment that will be used for running the
221     specific hook for this LU, a list of node names on which the hook
222     should run before the execution, and a list of node names on which
223     the hook should run after the execution.
224
225     The keys of the dict must not have 'GANETI_' prefixed as this will
226     be handled in the hooks runner. Also note additional keys will be
227     added by the hooks runner. If the LU doesn't define any
228     environment, an empty dict (and not None) should be returned.
229
230     No nodes should be returned as an empty list (and not None).
231
232     Note that if the HPATH for a LU class is None, this function will
233     not be called.
234
235     """
236     raise NotImplementedError
237
238   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
239     """Notify the LU about the results of its hooks.
240
241     This method is called every time a hooks phase is executed, and notifies
242     the Logical Unit about the hooks' result. The LU can then use it to alter
243     its result based on the hooks.  By default the method does nothing and the
244     previous result is passed back unchanged but any LU can define it if it
245     wants to use the local cluster hook-scripts somehow.
246
247     @param phase: one of L{constants.HOOKS_PHASE_POST} or
248         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
249     @param hook_results: the results of the multi-node hooks rpc call
250     @param feedback_fn: function used send feedback back to the caller
251     @param lu_result: the previous Exec result this LU had, or None
252         in the PRE phase
253     @return: the new Exec result, based on the previous result
254         and hook results
255
256     """
257     return lu_result
258
259   def _ExpandAndLockInstance(self):
260     """Helper function to expand and lock an instance.
261
262     Many LUs that work on an instance take its name in self.op.instance_name
263     and need to expand it and then declare the expanded name for locking. This
264     function does it, and then updates self.op.instance_name to the expanded
265     name. It also initializes needed_locks as a dict, if this hasn't been done
266     before.
267
268     """
269     if self.needed_locks is None:
270       self.needed_locks = {}
271     else:
272       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
273         "_ExpandAndLockInstance called with instance-level locks set"
274     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
275     if expanded_name is None:
276       raise errors.OpPrereqError("Instance '%s' not known" %
277                                   self.op.instance_name)
278     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
279     self.op.instance_name = expanded_name
280
281   def _LockInstancesNodes(self, primary_only=False):
282     """Helper function to declare instances' nodes for locking.
283
284     This function should be called after locking one or more instances to lock
285     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
286     with all primary or secondary nodes for instances already locked and
287     present in self.needed_locks[locking.LEVEL_INSTANCE].
288
289     It should be called from DeclareLocks, and for safety only works if
290     self.recalculate_locks[locking.LEVEL_NODE] is set.
291
292     In the future it may grow parameters to just lock some instance's nodes, or
293     to just lock primaries or secondary nodes, if needed.
294
295     If should be called in DeclareLocks in a way similar to::
296
297       if level == locking.LEVEL_NODE:
298         self._LockInstancesNodes()
299
300     @type primary_only: boolean
301     @param primary_only: only lock primary nodes of locked instances
302
303     """
304     assert locking.LEVEL_NODE in self.recalculate_locks, \
305       "_LockInstancesNodes helper function called with no nodes to recalculate"
306
307     # TODO: check if we're really been called with the instance locks held
308
309     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
310     # future we might want to have different behaviors depending on the value
311     # of self.recalculate_locks[locking.LEVEL_NODE]
312     wanted_nodes = []
313     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
314       instance = self.context.cfg.GetInstanceInfo(instance_name)
315       wanted_nodes.append(instance.primary_node)
316       if not primary_only:
317         wanted_nodes.extend(instance.secondary_nodes)
318
319     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
320       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
321     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
322       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
323
324     del self.recalculate_locks[locking.LEVEL_NODE]
325
326
327 class NoHooksLU(LogicalUnit):
328   """Simple LU which runs no hooks.
329
330   This LU is intended as a parent for other LogicalUnits which will
331   run no hooks, in order to reduce duplicate code.
332
333   """
334   HPATH = None
335   HTYPE = None
336
337
338 def _GetWantedNodes(lu, nodes):
339   """Returns list of checked and expanded node names.
340
341   @type lu: L{LogicalUnit}
342   @param lu: the logical unit on whose behalf we execute
343   @type nodes: list
344   @param nodes: list of node names or None for all nodes
345   @rtype: list
346   @return: the list of nodes, sorted
347   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
348
349   """
350   if not isinstance(nodes, list):
351     raise errors.OpPrereqError("Invalid argument type 'nodes'")
352
353   if not nodes:
354     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
355       " non-empty list of nodes whose name is to be expanded.")
356
357   wanted = []
358   for name in nodes:
359     node = lu.cfg.ExpandNodeName(name)
360     if node is None:
361       raise errors.OpPrereqError("No such node name '%s'" % name)
362     wanted.append(node)
363
364   return utils.NiceSort(wanted)
365
366
367 def _GetWantedInstances(lu, instances):
368   """Returns list of checked and expanded instance names.
369
370   @type lu: L{LogicalUnit}
371   @param lu: the logical unit on whose behalf we execute
372   @type instances: list
373   @param instances: list of instance names or None for all instances
374   @rtype: list
375   @return: the list of instances, sorted
376   @raise errors.OpPrereqError: if the instances parameter is wrong type
377   @raise errors.OpPrereqError: if any of the passed instances is not found
378
379   """
380   if not isinstance(instances, list):
381     raise errors.OpPrereqError("Invalid argument type 'instances'")
382
383   if instances:
384     wanted = []
385
386     for name in instances:
387       instance = lu.cfg.ExpandInstanceName(name)
388       if instance is None:
389         raise errors.OpPrereqError("No such instance name '%s'" % name)
390       wanted.append(instance)
391
392   else:
393     wanted = lu.cfg.GetInstanceList()
394   return utils.NiceSort(wanted)
395
396
397 def _CheckOutputFields(static, dynamic, selected):
398   """Checks whether all selected fields are valid.
399
400   @type static: L{utils.FieldSet}
401   @param static: static fields set
402   @type dynamic: L{utils.FieldSet}
403   @param dynamic: dynamic fields set
404
405   """
406   f = utils.FieldSet()
407   f.Extend(static)
408   f.Extend(dynamic)
409
410   delta = f.NonMatching(selected)
411   if delta:
412     raise errors.OpPrereqError("Unknown output fields selected: %s"
413                                % ",".join(delta))
414
415
416 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
417                           memory, vcpus, nics):
418   """Builds instance related env variables for hooks
419
420   This builds the hook environment from individual variables.
421
422   @type name: string
423   @param name: the name of the instance
424   @type primary_node: string
425   @param primary_node: the name of the instance's primary node
426   @type secondary_nodes: list
427   @param secondary_nodes: list of secondary nodes as strings
428   @type os_type: string
429   @param os_type: the name of the instance's OS
430   @type status: string
431   @param status: the desired status of the instances
432   @type memory: string
433   @param memory: the memory size of the instance
434   @type vcpus: string
435   @param vcpus: the count of VCPUs the instance has
436   @type nics: list
437   @param nics: list of tuples (ip, bridge, mac) representing
438       the NICs the instance  has
439   @rtype: dict
440   @return: the hook environment for this instance
441
442   """
443   env = {
444     "OP_TARGET": name,
445     "INSTANCE_NAME": name,
446     "INSTANCE_PRIMARY": primary_node,
447     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
448     "INSTANCE_OS_TYPE": os_type,
449     "INSTANCE_STATUS": status,
450     "INSTANCE_MEMORY": memory,
451     "INSTANCE_VCPUS": vcpus,
452   }
453
454   if nics:
455     nic_count = len(nics)
456     for idx, (ip, bridge, mac) in enumerate(nics):
457       if ip is None:
458         ip = ""
459       env["INSTANCE_NIC%d_IP" % idx] = ip
460       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
461       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
462   else:
463     nic_count = 0
464
465   env["INSTANCE_NIC_COUNT"] = nic_count
466
467   return env
468
469
470 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
471   """Builds instance related env variables for hooks from an object.
472
473   @type lu: L{LogicalUnit}
474   @param lu: the logical unit on whose behalf we execute
475   @type instance: L{objects.Instance}
476   @param instance: the instance for which we should build the
477       environment
478   @type override: dict
479   @param override: dictionary with key/values that will override
480       our values
481   @rtype: dict
482   @return: the hook environment dictionary
483
484   """
485   bep = lu.cfg.GetClusterInfo().FillBE(instance)
486   args = {
487     'name': instance.name,
488     'primary_node': instance.primary_node,
489     'secondary_nodes': instance.secondary_nodes,
490     'os_type': instance.os,
491     'status': instance.os,
492     'memory': bep[constants.BE_MEMORY],
493     'vcpus': bep[constants.BE_VCPUS],
494     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
495   }
496   if override:
497     args.update(override)
498   return _BuildInstanceHookEnv(**args)
499
500
501 def _CheckInstanceBridgesExist(lu, instance):
502   """Check that the brigdes needed by an instance exist.
503
504   """
505   # check bridges existance
506   brlist = [nic.bridge for nic in instance.nics]
507   if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
508     raise errors.OpPrereqError("one or more target bridges %s does not"
509                                " exist on destination node '%s'" %
510                                (brlist, instance.primary_node))
511
512
513 class LUDestroyCluster(NoHooksLU):
514   """Logical unit for destroying the cluster.
515
516   """
517   _OP_REQP = []
518
519   def CheckPrereq(self):
520     """Check prerequisites.
521
522     This checks whether the cluster is empty.
523
524     Any errors are signalled by raising errors.OpPrereqError.
525
526     """
527     master = self.cfg.GetMasterNode()
528
529     nodelist = self.cfg.GetNodeList()
530     if len(nodelist) != 1 or nodelist[0] != master:
531       raise errors.OpPrereqError("There are still %d node(s) in"
532                                  " this cluster." % (len(nodelist) - 1))
533     instancelist = self.cfg.GetInstanceList()
534     if instancelist:
535       raise errors.OpPrereqError("There are still %d instance(s) in"
536                                  " this cluster." % len(instancelist))
537
538   def Exec(self, feedback_fn):
539     """Destroys the cluster.
540
541     """
542     master = self.cfg.GetMasterNode()
543     if not self.rpc.call_node_stop_master(master, False):
544       raise errors.OpExecError("Could not disable the master role")
545     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
546     utils.CreateBackup(priv_key)
547     utils.CreateBackup(pub_key)
548     return master
549
550
551 class LUVerifyCluster(LogicalUnit):
552   """Verifies the cluster status.
553
554   """
555   HPATH = "cluster-verify"
556   HTYPE = constants.HTYPE_CLUSTER
557   _OP_REQP = ["skip_checks"]
558   REQ_BGL = False
559
560   def ExpandNames(self):
561     self.needed_locks = {
562       locking.LEVEL_NODE: locking.ALL_SET,
563       locking.LEVEL_INSTANCE: locking.ALL_SET,
564     }
565     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
566
567   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
568                   remote_version, feedback_fn):
569     """Run multiple tests against a node.
570
571     Test list::
572
573       - compares ganeti version
574       - checks vg existance and size > 20G
575       - checks config file checksum
576       - checks ssh to other nodes
577
578     @type node: string
579     @param node: the name of the node to check
580     @param file_list: required list of files
581     @param local_cksum: dictionary of local files and their checksums
582     @type vglist: dict
583     @param vglist: dictionary of volume group names and their size
584     @param node_result: the results from the node
585     @param remote_version: the RPC version from the remote node
586     @param feedback_fn: function used to accumulate results
587
588     """
589     # compares ganeti version
590     local_version = constants.PROTOCOL_VERSION
591     if not remote_version:
592       feedback_fn("  - ERROR: connection to %s failed" % (node))
593       return True
594
595     if local_version != remote_version:
596       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
597                       (local_version, node, remote_version))
598       return True
599
600     # checks vg existance and size > 20G
601
602     bad = False
603     if not vglist:
604       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
605                       (node,))
606       bad = True
607     else:
608       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
609                                             constants.MIN_VG_SIZE)
610       if vgstatus:
611         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
612         bad = True
613
614     if not node_result:
615       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
616       return True
617
618     # checks config file checksum
619     # checks ssh to any
620
621     if 'filelist' not in node_result:
622       bad = True
623       feedback_fn("  - ERROR: node hasn't returned file checksum data")
624     else:
625       remote_cksum = node_result['filelist']
626       for file_name in file_list:
627         if file_name not in remote_cksum:
628           bad = True
629           feedback_fn("  - ERROR: file '%s' missing" % file_name)
630         elif remote_cksum[file_name] != local_cksum[file_name]:
631           bad = True
632           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
633
634     if 'nodelist' not in node_result:
635       bad = True
636       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
637     else:
638       if node_result['nodelist']:
639         bad = True
640         for node in node_result['nodelist']:
641           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
642                           (node, node_result['nodelist'][node]))
643     if 'node-net-test' not in node_result:
644       bad = True
645       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
646     else:
647       if node_result['node-net-test']:
648         bad = True
649         nlist = utils.NiceSort(node_result['node-net-test'].keys())
650         for node in nlist:
651           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
652                           (node, node_result['node-net-test'][node]))
653
654     hyp_result = node_result.get('hypervisor', None)
655     if isinstance(hyp_result, dict):
656       for hv_name, hv_result in hyp_result.iteritems():
657         if hv_result is not None:
658           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
659                       (hv_name, hv_result))
660     return bad
661
662   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
663                       node_instance, feedback_fn):
664     """Verify an instance.
665
666     This function checks to see if the required block devices are
667     available on the instance's node.
668
669     """
670     bad = False
671
672     node_current = instanceconfig.primary_node
673
674     node_vol_should = {}
675     instanceconfig.MapLVsByNode(node_vol_should)
676
677     for node in node_vol_should:
678       for volume in node_vol_should[node]:
679         if node not in node_vol_is or volume not in node_vol_is[node]:
680           feedback_fn("  - ERROR: volume %s missing on node %s" %
681                           (volume, node))
682           bad = True
683
684     if not instanceconfig.status == 'down':
685       if (node_current not in node_instance or
686           not instance in node_instance[node_current]):
687         feedback_fn("  - ERROR: instance %s not running on node %s" %
688                         (instance, node_current))
689         bad = True
690
691     for node in node_instance:
692       if (not node == node_current):
693         if instance in node_instance[node]:
694           feedback_fn("  - ERROR: instance %s should not run on node %s" %
695                           (instance, node))
696           bad = True
697
698     return bad
699
700   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
701     """Verify if there are any unknown volumes in the cluster.
702
703     The .os, .swap and backup volumes are ignored. All other volumes are
704     reported as unknown.
705
706     """
707     bad = False
708
709     for node in node_vol_is:
710       for volume in node_vol_is[node]:
711         if node not in node_vol_should or volume not in node_vol_should[node]:
712           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
713                       (volume, node))
714           bad = True
715     return bad
716
717   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
718     """Verify the list of running instances.
719
720     This checks what instances are running but unknown to the cluster.
721
722     """
723     bad = False
724     for node in node_instance:
725       for runninginstance in node_instance[node]:
726         if runninginstance not in instancelist:
727           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
728                           (runninginstance, node))
729           bad = True
730     return bad
731
732   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
733     """Verify N+1 Memory Resilience.
734
735     Check that if one single node dies we can still start all the instances it
736     was primary for.
737
738     """
739     bad = False
740
741     for node, nodeinfo in node_info.iteritems():
742       # This code checks that every node which is now listed as secondary has
743       # enough memory to host all instances it is supposed to should a single
744       # other node in the cluster fail.
745       # FIXME: not ready for failover to an arbitrary node
746       # FIXME: does not support file-backed instances
747       # WARNING: we currently take into account down instances as well as up
748       # ones, considering that even if they're down someone might want to start
749       # them even in the event of a node failure.
750       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
751         needed_mem = 0
752         for instance in instances:
753           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
754           if bep[constants.BE_AUTO_BALANCE]:
755             needed_mem += bep[constants.BE_MEMORY]
756         if nodeinfo['mfree'] < needed_mem:
757           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
758                       " failovers should node %s fail" % (node, prinode))
759           bad = True
760     return bad
761
762   def CheckPrereq(self):
763     """Check prerequisites.
764
765     Transform the list of checks we're going to skip into a set and check that
766     all its members are valid.
767
768     """
769     self.skip_set = frozenset(self.op.skip_checks)
770     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
771       raise errors.OpPrereqError("Invalid checks to be skipped specified")
772
773   def BuildHooksEnv(self):
774     """Build hooks env.
775
776     Cluster-Verify hooks just rone in the post phase and their failure makes
777     the output be logged in the verify output and the verification to fail.
778
779     """
780     all_nodes = self.cfg.GetNodeList()
781     # TODO: populate the environment with useful information for verify hooks
782     env = {}
783     return env, [], all_nodes
784
785   def Exec(self, feedback_fn):
786     """Verify integrity of cluster, performing various test on nodes.
787
788     """
789     bad = False
790     feedback_fn("* Verifying global settings")
791     for msg in self.cfg.VerifyConfig():
792       feedback_fn("  - ERROR: %s" % msg)
793
794     vg_name = self.cfg.GetVGName()
795     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
796     nodelist = utils.NiceSort(self.cfg.GetNodeList())
797     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
798     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
799     i_non_redundant = [] # Non redundant instances
800     i_non_a_balanced = [] # Non auto-balanced instances
801     node_volume = {}
802     node_instance = {}
803     node_info = {}
804     instance_cfg = {}
805
806     # FIXME: verify OS list
807     # do local checksums
808     file_names = []
809     file_names.append(constants.SSL_CERT_FILE)
810     file_names.append(constants.CLUSTER_CONF_FILE)
811     local_checksums = utils.FingerprintFiles(file_names)
812
813     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
814     all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
815     all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
816     all_vglist = self.rpc.call_vg_list(nodelist)
817     node_verify_param = {
818       'filelist': file_names,
819       'nodelist': nodelist,
820       'hypervisor': hypervisors,
821       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
822                         for node in nodeinfo]
823       }
824     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
825                                            self.cfg.GetClusterName())
826     all_rversion = self.rpc.call_version(nodelist)
827     all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
828                                         self.cfg.GetHypervisorType())
829
830     cluster = self.cfg.GetClusterInfo()
831     for node in nodelist:
832       feedback_fn("* Verifying node %s" % node)
833       result = self._VerifyNode(node, file_names, local_checksums,
834                                 all_vglist[node], all_nvinfo[node],
835                                 all_rversion[node], feedback_fn)
836       bad = bad or result
837
838       # node_volume
839       volumeinfo = all_volumeinfo[node]
840
841       if isinstance(volumeinfo, basestring):
842         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
843                     (node, volumeinfo[-400:].encode('string_escape')))
844         bad = True
845         node_volume[node] = {}
846       elif not isinstance(volumeinfo, dict):
847         feedback_fn("  - ERROR: connection to %s failed" % (node,))
848         bad = True
849         continue
850       else:
851         node_volume[node] = volumeinfo
852
853       # node_instance
854       nodeinstance = all_instanceinfo[node]
855       if type(nodeinstance) != list:
856         feedback_fn("  - ERROR: connection to %s failed" % (node,))
857         bad = True
858         continue
859
860       node_instance[node] = nodeinstance
861
862       # node_info
863       nodeinfo = all_ninfo[node]
864       if not isinstance(nodeinfo, dict):
865         feedback_fn("  - ERROR: connection to %s failed" % (node,))
866         bad = True
867         continue
868
869       try:
870         node_info[node] = {
871           "mfree": int(nodeinfo['memory_free']),
872           "dfree": int(nodeinfo['vg_free']),
873           "pinst": [],
874           "sinst": [],
875           # dictionary holding all instances this node is secondary for,
876           # grouped by their primary node. Each key is a cluster node, and each
877           # value is a list of instances which have the key as primary and the
878           # current node as secondary.  this is handy to calculate N+1 memory
879           # availability if you can only failover from a primary to its
880           # secondary.
881           "sinst-by-pnode": {},
882         }
883       except ValueError:
884         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
885         bad = True
886         continue
887
888     node_vol_should = {}
889
890     for instance in instancelist:
891       feedback_fn("* Verifying instance %s" % instance)
892       inst_config = self.cfg.GetInstanceInfo(instance)
893       result =  self._VerifyInstance(instance, inst_config, node_volume,
894                                      node_instance, feedback_fn)
895       bad = bad or result
896
897       inst_config.MapLVsByNode(node_vol_should)
898
899       instance_cfg[instance] = inst_config
900
901       pnode = inst_config.primary_node
902       if pnode in node_info:
903         node_info[pnode]['pinst'].append(instance)
904       else:
905         feedback_fn("  - ERROR: instance %s, connection to primary node"
906                     " %s failed" % (instance, pnode))
907         bad = True
908
909       # If the instance is non-redundant we cannot survive losing its primary
910       # node, so we are not N+1 compliant. On the other hand we have no disk
911       # templates with more than one secondary so that situation is not well
912       # supported either.
913       # FIXME: does not support file-backed instances
914       if len(inst_config.secondary_nodes) == 0:
915         i_non_redundant.append(instance)
916       elif len(inst_config.secondary_nodes) > 1:
917         feedback_fn("  - WARNING: multiple secondaries for instance %s"
918                     % instance)
919
920       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
921         i_non_a_balanced.append(instance)
922
923       for snode in inst_config.secondary_nodes:
924         if snode in node_info:
925           node_info[snode]['sinst'].append(instance)
926           if pnode not in node_info[snode]['sinst-by-pnode']:
927             node_info[snode]['sinst-by-pnode'][pnode] = []
928           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
929         else:
930           feedback_fn("  - ERROR: instance %s, connection to secondary node"
931                       " %s failed" % (instance, snode))
932
933     feedback_fn("* Verifying orphan volumes")
934     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
935                                        feedback_fn)
936     bad = bad or result
937
938     feedback_fn("* Verifying remaining instances")
939     result = self._VerifyOrphanInstances(instancelist, node_instance,
940                                          feedback_fn)
941     bad = bad or result
942
943     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
944       feedback_fn("* Verifying N+1 Memory redundancy")
945       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
946       bad = bad or result
947
948     feedback_fn("* Other Notes")
949     if i_non_redundant:
950       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
951                   % len(i_non_redundant))
952
953     if i_non_a_balanced:
954       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
955                   % len(i_non_a_balanced))
956
957     return not bad
958
959   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
960     """Analize the post-hooks' result
961
962     This method analyses the hook result, handles it, and sends some
963     nicely-formatted feedback back to the user.
964
965     @param phase: one of L{constants.HOOKS_PHASE_POST} or
966         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
967     @param hooks_results: the results of the multi-node hooks rpc call
968     @param feedback_fn: function used send feedback back to the caller
969     @param lu_result: previous Exec result
970     @return: the new Exec result, based on the previous result
971         and hook results
972
973     """
974     # We only really run POST phase hooks, and are only interested in
975     # their results
976     if phase == constants.HOOKS_PHASE_POST:
977       # Used to change hooks' output to proper indentation
978       indent_re = re.compile('^', re.M)
979       feedback_fn("* Hooks Results")
980       if not hooks_results:
981         feedback_fn("  - ERROR: general communication failure")
982         lu_result = 1
983       else:
984         for node_name in hooks_results:
985           show_node_header = True
986           res = hooks_results[node_name]
987           if res is False or not isinstance(res, list):
988             feedback_fn("    Communication failure")
989             lu_result = 1
990             continue
991           for script, hkr, output in res:
992             if hkr == constants.HKR_FAIL:
993               # The node header is only shown once, if there are
994               # failing hooks on that node
995               if show_node_header:
996                 feedback_fn("  Node %s:" % node_name)
997                 show_node_header = False
998               feedback_fn("    ERROR: Script %s failed, output:" % script)
999               output = indent_re.sub('      ', output)
1000               feedback_fn("%s" % output)
1001               lu_result = 1
1002
1003       return lu_result
1004
1005
1006 class LUVerifyDisks(NoHooksLU):
1007   """Verifies the cluster disks status.
1008
1009   """
1010   _OP_REQP = []
1011   REQ_BGL = False
1012
1013   def ExpandNames(self):
1014     self.needed_locks = {
1015       locking.LEVEL_NODE: locking.ALL_SET,
1016       locking.LEVEL_INSTANCE: locking.ALL_SET,
1017     }
1018     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1019
1020   def CheckPrereq(self):
1021     """Check prerequisites.
1022
1023     This has no prerequisites.
1024
1025     """
1026     pass
1027
1028   def Exec(self, feedback_fn):
1029     """Verify integrity of cluster disks.
1030
1031     """
1032     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1033
1034     vg_name = self.cfg.GetVGName()
1035     nodes = utils.NiceSort(self.cfg.GetNodeList())
1036     instances = [self.cfg.GetInstanceInfo(name)
1037                  for name in self.cfg.GetInstanceList()]
1038
1039     nv_dict = {}
1040     for inst in instances:
1041       inst_lvs = {}
1042       if (inst.status != "up" or
1043           inst.disk_template not in constants.DTS_NET_MIRROR):
1044         continue
1045       inst.MapLVsByNode(inst_lvs)
1046       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1047       for node, vol_list in inst_lvs.iteritems():
1048         for vol in vol_list:
1049           nv_dict[(node, vol)] = inst
1050
1051     if not nv_dict:
1052       return result
1053
1054     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1055
1056     to_act = set()
1057     for node in nodes:
1058       # node_volume
1059       lvs = node_lvs[node]
1060
1061       if isinstance(lvs, basestring):
1062         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1063         res_nlvm[node] = lvs
1064       elif not isinstance(lvs, dict):
1065         logging.warning("Connection to node %s failed or invalid data"
1066                         " returned", node)
1067         res_nodes.append(node)
1068         continue
1069
1070       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1071         inst = nv_dict.pop((node, lv_name), None)
1072         if (not lv_online and inst is not None
1073             and inst.name not in res_instances):
1074           res_instances.append(inst.name)
1075
1076     # any leftover items in nv_dict are missing LVs, let's arrange the
1077     # data better
1078     for key, inst in nv_dict.iteritems():
1079       if inst.name not in res_missing:
1080         res_missing[inst.name] = []
1081       res_missing[inst.name].append(key)
1082
1083     return result
1084
1085
1086 class LURenameCluster(LogicalUnit):
1087   """Rename the cluster.
1088
1089   """
1090   HPATH = "cluster-rename"
1091   HTYPE = constants.HTYPE_CLUSTER
1092   _OP_REQP = ["name"]
1093
1094   def BuildHooksEnv(self):
1095     """Build hooks env.
1096
1097     """
1098     env = {
1099       "OP_TARGET": self.cfg.GetClusterName(),
1100       "NEW_NAME": self.op.name,
1101       }
1102     mn = self.cfg.GetMasterNode()
1103     return env, [mn], [mn]
1104
1105   def CheckPrereq(self):
1106     """Verify that the passed name is a valid one.
1107
1108     """
1109     hostname = utils.HostInfo(self.op.name)
1110
1111     new_name = hostname.name
1112     self.ip = new_ip = hostname.ip
1113     old_name = self.cfg.GetClusterName()
1114     old_ip = self.cfg.GetMasterIP()
1115     if new_name == old_name and new_ip == old_ip:
1116       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1117                                  " cluster has changed")
1118     if new_ip != old_ip:
1119       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1120         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1121                                    " reachable on the network. Aborting." %
1122                                    new_ip)
1123
1124     self.op.name = new_name
1125
1126   def Exec(self, feedback_fn):
1127     """Rename the cluster.
1128
1129     """
1130     clustername = self.op.name
1131     ip = self.ip
1132
1133     # shutdown the master IP
1134     master = self.cfg.GetMasterNode()
1135     if not self.rpc.call_node_stop_master(master, False):
1136       raise errors.OpExecError("Could not disable the master role")
1137
1138     try:
1139       # modify the sstore
1140       # TODO: sstore
1141       ss.SetKey(ss.SS_MASTER_IP, ip)
1142       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1143
1144       # Distribute updated ss config to all nodes
1145       myself = self.cfg.GetNodeInfo(master)
1146       dist_nodes = self.cfg.GetNodeList()
1147       if myself.name in dist_nodes:
1148         dist_nodes.remove(myself.name)
1149
1150       logging.debug("Copying updated ssconf data to all nodes")
1151       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1152         fname = ss.KeyToFilename(keyname)
1153         result = self.rpc.call_upload_file(dist_nodes, fname)
1154         for to_node in dist_nodes:
1155           if not result[to_node]:
1156             self.LogWarning("Copy of file %s to node %s failed",
1157                             fname, to_node)
1158     finally:
1159       if not self.rpc.call_node_start_master(master, False):
1160         self.LogWarning("Could not re-enable the master role on"
1161                         " the master, please restart manually.")
1162
1163
1164 def _RecursiveCheckIfLVMBased(disk):
1165   """Check if the given disk or its children are lvm-based.
1166
1167   @type disk: L{objects.Disk}
1168   @param disk: the disk to check
1169   @rtype: booleean
1170   @return: boolean indicating whether a LD_LV dev_type was found or not
1171
1172   """
1173   if disk.children:
1174     for chdisk in disk.children:
1175       if _RecursiveCheckIfLVMBased(chdisk):
1176         return True
1177   return disk.dev_type == constants.LD_LV
1178
1179
1180 class LUSetClusterParams(LogicalUnit):
1181   """Change the parameters of the cluster.
1182
1183   """
1184   HPATH = "cluster-modify"
1185   HTYPE = constants.HTYPE_CLUSTER
1186   _OP_REQP = []
1187   REQ_BGL = False
1188
1189   def ExpandNames(self):
1190     # FIXME: in the future maybe other cluster params won't require checking on
1191     # all nodes to be modified.
1192     self.needed_locks = {
1193       locking.LEVEL_NODE: locking.ALL_SET,
1194     }
1195     self.share_locks[locking.LEVEL_NODE] = 1
1196
1197   def BuildHooksEnv(self):
1198     """Build hooks env.
1199
1200     """
1201     env = {
1202       "OP_TARGET": self.cfg.GetClusterName(),
1203       "NEW_VG_NAME": self.op.vg_name,
1204       }
1205     mn = self.cfg.GetMasterNode()
1206     return env, [mn], [mn]
1207
1208   def CheckPrereq(self):
1209     """Check prerequisites.
1210
1211     This checks whether the given params don't conflict and
1212     if the given volume group is valid.
1213
1214     """
1215     # FIXME: This only works because there is only one parameter that can be
1216     # changed or removed.
1217     if self.op.vg_name is not None and not self.op.vg_name:
1218       instances = self.cfg.GetAllInstancesInfo().values()
1219       for inst in instances:
1220         for disk in inst.disks:
1221           if _RecursiveCheckIfLVMBased(disk):
1222             raise errors.OpPrereqError("Cannot disable lvm storage while"
1223                                        " lvm-based instances exist")
1224
1225     node_list = self.acquired_locks[locking.LEVEL_NODE]
1226
1227     # if vg_name not None, checks given volume group on all nodes
1228     if self.op.vg_name:
1229       vglist = self.rpc.call_vg_list(node_list)
1230       for node in node_list:
1231         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1232                                               constants.MIN_VG_SIZE)
1233         if vgstatus:
1234           raise errors.OpPrereqError("Error on node '%s': %s" %
1235                                      (node, vgstatus))
1236
1237     self.cluster = cluster = self.cfg.GetClusterInfo()
1238     # beparams changes do not need validation (we can't validate?),
1239     # but we still process here
1240     if self.op.beparams:
1241       self.new_beparams = cluster.FillDict(
1242         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1243
1244     # hypervisor list/parameters
1245     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1246     if self.op.hvparams:
1247       if not isinstance(self.op.hvparams, dict):
1248         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1249       for hv_name, hv_dict in self.op.hvparams.items():
1250         if hv_name not in self.new_hvparams:
1251           self.new_hvparams[hv_name] = hv_dict
1252         else:
1253           self.new_hvparams[hv_name].update(hv_dict)
1254
1255     if self.op.enabled_hypervisors is not None:
1256       self.hv_list = self.op.enabled_hypervisors
1257     else:
1258       self.hv_list = cluster.enabled_hypervisors
1259
1260     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1261       # either the enabled list has changed, or the parameters have, validate
1262       for hv_name, hv_params in self.new_hvparams.items():
1263         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1264             (self.op.enabled_hypervisors and
1265              hv_name in self.op.enabled_hypervisors)):
1266           # either this is a new hypervisor, or its parameters have changed
1267           hv_class = hypervisor.GetHypervisor(hv_name)
1268           hv_class.CheckParameterSyntax(hv_params)
1269           _CheckHVParams(self, node_list, hv_name, hv_params)
1270
1271   def Exec(self, feedback_fn):
1272     """Change the parameters of the cluster.
1273
1274     """
1275     if self.op.vg_name is not None:
1276       if self.op.vg_name != self.cfg.GetVGName():
1277         self.cfg.SetVGName(self.op.vg_name)
1278       else:
1279         feedback_fn("Cluster LVM configuration already in desired"
1280                     " state, not changing")
1281     if self.op.hvparams:
1282       self.cluster.hvparams = self.new_hvparams
1283     if self.op.enabled_hypervisors is not None:
1284       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1285     if self.op.beparams:
1286       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1287     self.cfg.Update(self.cluster)
1288
1289
1290 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1291   """Sleep and poll for an instance's disk to sync.
1292
1293   """
1294   if not instance.disks:
1295     return True
1296
1297   if not oneshot:
1298     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1299
1300   node = instance.primary_node
1301
1302   for dev in instance.disks:
1303     lu.cfg.SetDiskID(dev, node)
1304
1305   retries = 0
1306   while True:
1307     max_time = 0
1308     done = True
1309     cumul_degraded = False
1310     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1311     if not rstats:
1312       lu.LogWarning("Can't get any data from node %s", node)
1313       retries += 1
1314       if retries >= 10:
1315         raise errors.RemoteError("Can't contact node %s for mirror data,"
1316                                  " aborting." % node)
1317       time.sleep(6)
1318       continue
1319     retries = 0
1320     for i in range(len(rstats)):
1321       mstat = rstats[i]
1322       if mstat is None:
1323         lu.LogWarning("Can't compute data for node %s/%s",
1324                            node, instance.disks[i].iv_name)
1325         continue
1326       # we ignore the ldisk parameter
1327       perc_done, est_time, is_degraded, _ = mstat
1328       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1329       if perc_done is not None:
1330         done = False
1331         if est_time is not None:
1332           rem_time = "%d estimated seconds remaining" % est_time
1333           max_time = est_time
1334         else:
1335           rem_time = "no time estimate"
1336         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1337                         (instance.disks[i].iv_name, perc_done, rem_time))
1338     if done or oneshot:
1339       break
1340
1341     time.sleep(min(60, max_time))
1342
1343   if done:
1344     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1345   return not cumul_degraded
1346
1347
1348 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1349   """Check that mirrors are not degraded.
1350
1351   The ldisk parameter, if True, will change the test from the
1352   is_degraded attribute (which represents overall non-ok status for
1353   the device(s)) to the ldisk (representing the local storage status).
1354
1355   """
1356   lu.cfg.SetDiskID(dev, node)
1357   if ldisk:
1358     idx = 6
1359   else:
1360     idx = 5
1361
1362   result = True
1363   if on_primary or dev.AssembleOnSecondary():
1364     rstats = lu.rpc.call_blockdev_find(node, dev)
1365     if not rstats:
1366       logging.warning("Node %s: disk degraded, not found or node down", node)
1367       result = False
1368     else:
1369       result = result and (not rstats[idx])
1370   if dev.children:
1371     for child in dev.children:
1372       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1373
1374   return result
1375
1376
1377 class LUDiagnoseOS(NoHooksLU):
1378   """Logical unit for OS diagnose/query.
1379
1380   """
1381   _OP_REQP = ["output_fields", "names"]
1382   REQ_BGL = False
1383   _FIELDS_STATIC = utils.FieldSet()
1384   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1385
1386   def ExpandNames(self):
1387     if self.op.names:
1388       raise errors.OpPrereqError("Selective OS query not supported")
1389
1390     _CheckOutputFields(static=self._FIELDS_STATIC,
1391                        dynamic=self._FIELDS_DYNAMIC,
1392                        selected=self.op.output_fields)
1393
1394     # Lock all nodes, in shared mode
1395     self.needed_locks = {}
1396     self.share_locks[locking.LEVEL_NODE] = 1
1397     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1398
1399   def CheckPrereq(self):
1400     """Check prerequisites.
1401
1402     """
1403
1404   @staticmethod
1405   def _DiagnoseByOS(node_list, rlist):
1406     """Remaps a per-node return list into an a per-os per-node dictionary
1407
1408     @param node_list: a list with the names of all nodes
1409     @param rlist: a map with node names as keys and OS objects as values
1410
1411     @rtype: dict
1412     @returns: a dictionary with osnames as keys and as value another map, with
1413         nodes as keys and list of OS objects as values, eg::
1414
1415           {"debian-etch": {"node1": [<object>,...],
1416                            "node2": [<object>,]}
1417           }
1418
1419     """
1420     all_os = {}
1421     for node_name, nr in rlist.iteritems():
1422       if not nr:
1423         continue
1424       for os_obj in nr:
1425         if os_obj.name not in all_os:
1426           # build a list of nodes for this os containing empty lists
1427           # for each node in node_list
1428           all_os[os_obj.name] = {}
1429           for nname in node_list:
1430             all_os[os_obj.name][nname] = []
1431         all_os[os_obj.name][node_name].append(os_obj)
1432     return all_os
1433
1434   def Exec(self, feedback_fn):
1435     """Compute the list of OSes.
1436
1437     """
1438     node_list = self.acquired_locks[locking.LEVEL_NODE]
1439     node_data = self.rpc.call_os_diagnose(node_list)
1440     if node_data == False:
1441       raise errors.OpExecError("Can't gather the list of OSes")
1442     pol = self._DiagnoseByOS(node_list, node_data)
1443     output = []
1444     for os_name, os_data in pol.iteritems():
1445       row = []
1446       for field in self.op.output_fields:
1447         if field == "name":
1448           val = os_name
1449         elif field == "valid":
1450           val = utils.all([osl and osl[0] for osl in os_data.values()])
1451         elif field == "node_status":
1452           val = {}
1453           for node_name, nos_list in os_data.iteritems():
1454             val[node_name] = [(v.status, v.path) for v in nos_list]
1455         else:
1456           raise errors.ParameterError(field)
1457         row.append(val)
1458       output.append(row)
1459
1460     return output
1461
1462
1463 class LURemoveNode(LogicalUnit):
1464   """Logical unit for removing a node.
1465
1466   """
1467   HPATH = "node-remove"
1468   HTYPE = constants.HTYPE_NODE
1469   _OP_REQP = ["node_name"]
1470
1471   def BuildHooksEnv(self):
1472     """Build hooks env.
1473
1474     This doesn't run on the target node in the pre phase as a failed
1475     node would then be impossible to remove.
1476
1477     """
1478     env = {
1479       "OP_TARGET": self.op.node_name,
1480       "NODE_NAME": self.op.node_name,
1481       }
1482     all_nodes = self.cfg.GetNodeList()
1483     all_nodes.remove(self.op.node_name)
1484     return env, all_nodes, all_nodes
1485
1486   def CheckPrereq(self):
1487     """Check prerequisites.
1488
1489     This checks:
1490      - the node exists in the configuration
1491      - it does not have primary or secondary instances
1492      - it's not the master
1493
1494     Any errors are signalled by raising errors.OpPrereqError.
1495
1496     """
1497     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1498     if node is None:
1499       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1500
1501     instance_list = self.cfg.GetInstanceList()
1502
1503     masternode = self.cfg.GetMasterNode()
1504     if node.name == masternode:
1505       raise errors.OpPrereqError("Node is the master node,"
1506                                  " you need to failover first.")
1507
1508     for instance_name in instance_list:
1509       instance = self.cfg.GetInstanceInfo(instance_name)
1510       if node.name == instance.primary_node:
1511         raise errors.OpPrereqError("Instance %s still running on the node,"
1512                                    " please remove first." % instance_name)
1513       if node.name in instance.secondary_nodes:
1514         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1515                                    " please remove first." % instance_name)
1516     self.op.node_name = node.name
1517     self.node = node
1518
1519   def Exec(self, feedback_fn):
1520     """Removes the node from the cluster.
1521
1522     """
1523     node = self.node
1524     logging.info("Stopping the node daemon and removing configs from node %s",
1525                  node.name)
1526
1527     self.context.RemoveNode(node.name)
1528
1529     self.rpc.call_node_leave_cluster(node.name)
1530
1531
1532 class LUQueryNodes(NoHooksLU):
1533   """Logical unit for querying nodes.
1534
1535   """
1536   _OP_REQP = ["output_fields", "names"]
1537   REQ_BGL = False
1538   _FIELDS_DYNAMIC = utils.FieldSet(
1539     "dtotal", "dfree",
1540     "mtotal", "mnode", "mfree",
1541     "bootid",
1542     "ctotal",
1543     )
1544
1545   _FIELDS_STATIC = utils.FieldSet(
1546     "name", "pinst_cnt", "sinst_cnt",
1547     "pinst_list", "sinst_list",
1548     "pip", "sip", "tags",
1549     "serial_no",
1550     )
1551
1552   def ExpandNames(self):
1553     _CheckOutputFields(static=self._FIELDS_STATIC,
1554                        dynamic=self._FIELDS_DYNAMIC,
1555                        selected=self.op.output_fields)
1556
1557     self.needed_locks = {}
1558     self.share_locks[locking.LEVEL_NODE] = 1
1559
1560     if self.op.names:
1561       self.wanted = _GetWantedNodes(self, self.op.names)
1562     else:
1563       self.wanted = locking.ALL_SET
1564
1565     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1566     if self.do_locking:
1567       # if we don't request only static fields, we need to lock the nodes
1568       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1569
1570
1571   def CheckPrereq(self):
1572     """Check prerequisites.
1573
1574     """
1575     # The validation of the node list is done in the _GetWantedNodes,
1576     # if non empty, and if empty, there's no validation to do
1577     pass
1578
1579   def Exec(self, feedback_fn):
1580     """Computes the list of nodes and their attributes.
1581
1582     """
1583     all_info = self.cfg.GetAllNodesInfo()
1584     if self.do_locking:
1585       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1586     elif self.wanted != locking.ALL_SET:
1587       nodenames = self.wanted
1588       missing = set(nodenames).difference(all_info.keys())
1589       if missing:
1590         raise errors.OpExecError(
1591           "Some nodes were removed before retrieving their data: %s" % missing)
1592     else:
1593       nodenames = all_info.keys()
1594
1595     nodenames = utils.NiceSort(nodenames)
1596     nodelist = [all_info[name] for name in nodenames]
1597
1598     # begin data gathering
1599
1600     if self.do_locking:
1601       live_data = {}
1602       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1603                                           self.cfg.GetHypervisorType())
1604       for name in nodenames:
1605         nodeinfo = node_data.get(name, None)
1606         if nodeinfo:
1607           live_data[name] = {
1608             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1609             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1610             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1611             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1612             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1613             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1614             "bootid": nodeinfo['bootid'],
1615             }
1616         else:
1617           live_data[name] = {}
1618     else:
1619       live_data = dict.fromkeys(nodenames, {})
1620
1621     node_to_primary = dict([(name, set()) for name in nodenames])
1622     node_to_secondary = dict([(name, set()) for name in nodenames])
1623
1624     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1625                              "sinst_cnt", "sinst_list"))
1626     if inst_fields & frozenset(self.op.output_fields):
1627       instancelist = self.cfg.GetInstanceList()
1628
1629       for instance_name in instancelist:
1630         inst = self.cfg.GetInstanceInfo(instance_name)
1631         if inst.primary_node in node_to_primary:
1632           node_to_primary[inst.primary_node].add(inst.name)
1633         for secnode in inst.secondary_nodes:
1634           if secnode in node_to_secondary:
1635             node_to_secondary[secnode].add(inst.name)
1636
1637     # end data gathering
1638
1639     output = []
1640     for node in nodelist:
1641       node_output = []
1642       for field in self.op.output_fields:
1643         if field == "name":
1644           val = node.name
1645         elif field == "pinst_list":
1646           val = list(node_to_primary[node.name])
1647         elif field == "sinst_list":
1648           val = list(node_to_secondary[node.name])
1649         elif field == "pinst_cnt":
1650           val = len(node_to_primary[node.name])
1651         elif field == "sinst_cnt":
1652           val = len(node_to_secondary[node.name])
1653         elif field == "pip":
1654           val = node.primary_ip
1655         elif field == "sip":
1656           val = node.secondary_ip
1657         elif field == "tags":
1658           val = list(node.GetTags())
1659         elif field == "serial_no":
1660           val = node.serial_no
1661         elif self._FIELDS_DYNAMIC.Matches(field):
1662           val = live_data[node.name].get(field, None)
1663         else:
1664           raise errors.ParameterError(field)
1665         node_output.append(val)
1666       output.append(node_output)
1667
1668     return output
1669
1670
1671 class LUQueryNodeVolumes(NoHooksLU):
1672   """Logical unit for getting volumes on node(s).
1673
1674   """
1675   _OP_REQP = ["nodes", "output_fields"]
1676   REQ_BGL = False
1677   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1678   _FIELDS_STATIC = utils.FieldSet("node")
1679
1680   def ExpandNames(self):
1681     _CheckOutputFields(static=self._FIELDS_STATIC,
1682                        dynamic=self._FIELDS_DYNAMIC,
1683                        selected=self.op.output_fields)
1684
1685     self.needed_locks = {}
1686     self.share_locks[locking.LEVEL_NODE] = 1
1687     if not self.op.nodes:
1688       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1689     else:
1690       self.needed_locks[locking.LEVEL_NODE] = \
1691         _GetWantedNodes(self, self.op.nodes)
1692
1693   def CheckPrereq(self):
1694     """Check prerequisites.
1695
1696     This checks that the fields required are valid output fields.
1697
1698     """
1699     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1700
1701   def Exec(self, feedback_fn):
1702     """Computes the list of nodes and their attributes.
1703
1704     """
1705     nodenames = self.nodes
1706     volumes = self.rpc.call_node_volumes(nodenames)
1707
1708     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1709              in self.cfg.GetInstanceList()]
1710
1711     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1712
1713     output = []
1714     for node in nodenames:
1715       if node not in volumes or not volumes[node]:
1716         continue
1717
1718       node_vols = volumes[node][:]
1719       node_vols.sort(key=lambda vol: vol['dev'])
1720
1721       for vol in node_vols:
1722         node_output = []
1723         for field in self.op.output_fields:
1724           if field == "node":
1725             val = node
1726           elif field == "phys":
1727             val = vol['dev']
1728           elif field == "vg":
1729             val = vol['vg']
1730           elif field == "name":
1731             val = vol['name']
1732           elif field == "size":
1733             val = int(float(vol['size']))
1734           elif field == "instance":
1735             for inst in ilist:
1736               if node not in lv_by_node[inst]:
1737                 continue
1738               if vol['name'] in lv_by_node[inst][node]:
1739                 val = inst.name
1740                 break
1741             else:
1742               val = '-'
1743           else:
1744             raise errors.ParameterError(field)
1745           node_output.append(str(val))
1746
1747         output.append(node_output)
1748
1749     return output
1750
1751
1752 class LUAddNode(LogicalUnit):
1753   """Logical unit for adding node to the cluster.
1754
1755   """
1756   HPATH = "node-add"
1757   HTYPE = constants.HTYPE_NODE
1758   _OP_REQP = ["node_name"]
1759
1760   def BuildHooksEnv(self):
1761     """Build hooks env.
1762
1763     This will run on all nodes before, and on all nodes + the new node after.
1764
1765     """
1766     env = {
1767       "OP_TARGET": self.op.node_name,
1768       "NODE_NAME": self.op.node_name,
1769       "NODE_PIP": self.op.primary_ip,
1770       "NODE_SIP": self.op.secondary_ip,
1771       }
1772     nodes_0 = self.cfg.GetNodeList()
1773     nodes_1 = nodes_0 + [self.op.node_name, ]
1774     return env, nodes_0, nodes_1
1775
1776   def CheckPrereq(self):
1777     """Check prerequisites.
1778
1779     This checks:
1780      - the new node is not already in the config
1781      - it is resolvable
1782      - its parameters (single/dual homed) matches the cluster
1783
1784     Any errors are signalled by raising errors.OpPrereqError.
1785
1786     """
1787     node_name = self.op.node_name
1788     cfg = self.cfg
1789
1790     dns_data = utils.HostInfo(node_name)
1791
1792     node = dns_data.name
1793     primary_ip = self.op.primary_ip = dns_data.ip
1794     secondary_ip = getattr(self.op, "secondary_ip", None)
1795     if secondary_ip is None:
1796       secondary_ip = primary_ip
1797     if not utils.IsValidIP(secondary_ip):
1798       raise errors.OpPrereqError("Invalid secondary IP given")
1799     self.op.secondary_ip = secondary_ip
1800
1801     node_list = cfg.GetNodeList()
1802     if not self.op.readd and node in node_list:
1803       raise errors.OpPrereqError("Node %s is already in the configuration" %
1804                                  node)
1805     elif self.op.readd and node not in node_list:
1806       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1807
1808     for existing_node_name in node_list:
1809       existing_node = cfg.GetNodeInfo(existing_node_name)
1810
1811       if self.op.readd and node == existing_node_name:
1812         if (existing_node.primary_ip != primary_ip or
1813             existing_node.secondary_ip != secondary_ip):
1814           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1815                                      " address configuration as before")
1816         continue
1817
1818       if (existing_node.primary_ip == primary_ip or
1819           existing_node.secondary_ip == primary_ip or
1820           existing_node.primary_ip == secondary_ip or
1821           existing_node.secondary_ip == secondary_ip):
1822         raise errors.OpPrereqError("New node ip address(es) conflict with"
1823                                    " existing node %s" % existing_node.name)
1824
1825     # check that the type of the node (single versus dual homed) is the
1826     # same as for the master
1827     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1828     master_singlehomed = myself.secondary_ip == myself.primary_ip
1829     newbie_singlehomed = secondary_ip == primary_ip
1830     if master_singlehomed != newbie_singlehomed:
1831       if master_singlehomed:
1832         raise errors.OpPrereqError("The master has no private ip but the"
1833                                    " new node has one")
1834       else:
1835         raise errors.OpPrereqError("The master has a private ip but the"
1836                                    " new node doesn't have one")
1837
1838     # checks reachablity
1839     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1840       raise errors.OpPrereqError("Node not reachable by ping")
1841
1842     if not newbie_singlehomed:
1843       # check reachability from my secondary ip to newbie's secondary ip
1844       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1845                            source=myself.secondary_ip):
1846         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1847                                    " based ping to noded port")
1848
1849     self.new_node = objects.Node(name=node,
1850                                  primary_ip=primary_ip,
1851                                  secondary_ip=secondary_ip)
1852
1853   def Exec(self, feedback_fn):
1854     """Adds the new node to the cluster.
1855
1856     """
1857     new_node = self.new_node
1858     node = new_node.name
1859
1860     # check connectivity
1861     result = self.rpc.call_version([node])[node]
1862     if result:
1863       if constants.PROTOCOL_VERSION == result:
1864         logging.info("Communication to node %s fine, sw version %s match",
1865                      node, result)
1866       else:
1867         raise errors.OpExecError("Version mismatch master version %s,"
1868                                  " node version %s" %
1869                                  (constants.PROTOCOL_VERSION, result))
1870     else:
1871       raise errors.OpExecError("Cannot get version from the new node")
1872
1873     # setup ssh on node
1874     logging.info("Copy ssh key to node %s", node)
1875     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1876     keyarray = []
1877     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1878                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1879                 priv_key, pub_key]
1880
1881     for i in keyfiles:
1882       f = open(i, 'r')
1883       try:
1884         keyarray.append(f.read())
1885       finally:
1886         f.close()
1887
1888     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1889                                     keyarray[2],
1890                                     keyarray[3], keyarray[4], keyarray[5])
1891
1892     if not result:
1893       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1894
1895     # Add node to our /etc/hosts, and add key to known_hosts
1896     utils.AddHostToEtcHosts(new_node.name)
1897
1898     if new_node.secondary_ip != new_node.primary_ip:
1899       if not self.rpc.call_node_has_ip_address(new_node.name,
1900                                                new_node.secondary_ip):
1901         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1902                                  " you gave (%s). Please fix and re-run this"
1903                                  " command." % new_node.secondary_ip)
1904
1905     node_verify_list = [self.cfg.GetMasterNode()]
1906     node_verify_param = {
1907       'nodelist': [node],
1908       # TODO: do a node-net-test as well?
1909     }
1910
1911     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1912                                        self.cfg.GetClusterName())
1913     for verifier in node_verify_list:
1914       if not result[verifier]:
1915         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1916                                  " for remote verification" % verifier)
1917       if result[verifier]['nodelist']:
1918         for failed in result[verifier]['nodelist']:
1919           feedback_fn("ssh/hostname verification failed %s -> %s" %
1920                       (verifier, result[verifier]['nodelist'][failed]))
1921         raise errors.OpExecError("ssh/hostname verification failed.")
1922
1923     # Distribute updated /etc/hosts and known_hosts to all nodes,
1924     # including the node just added
1925     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1926     dist_nodes = self.cfg.GetNodeList()
1927     if not self.op.readd:
1928       dist_nodes.append(node)
1929     if myself.name in dist_nodes:
1930       dist_nodes.remove(myself.name)
1931
1932     logging.debug("Copying hosts and known_hosts to all nodes")
1933     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1934       result = self.rpc.call_upload_file(dist_nodes, fname)
1935       for to_node in dist_nodes:
1936         if not result[to_node]:
1937           logging.error("Copy of file %s to node %s failed", fname, to_node)
1938
1939     to_copy = []
1940     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1941       to_copy.append(constants.VNC_PASSWORD_FILE)
1942     for fname in to_copy:
1943       result = self.rpc.call_upload_file([node], fname)
1944       if not result[node]:
1945         logging.error("Could not copy file %s to node %s", fname, node)
1946
1947     if self.op.readd:
1948       self.context.ReaddNode(new_node)
1949     else:
1950       self.context.AddNode(new_node)
1951
1952
1953 class LUQueryClusterInfo(NoHooksLU):
1954   """Query cluster configuration.
1955
1956   """
1957   _OP_REQP = []
1958   REQ_BGL = False
1959
1960   def ExpandNames(self):
1961     self.needed_locks = {}
1962
1963   def CheckPrereq(self):
1964     """No prerequsites needed for this LU.
1965
1966     """
1967     pass
1968
1969   def Exec(self, feedback_fn):
1970     """Return cluster config.
1971
1972     """
1973     cluster = self.cfg.GetClusterInfo()
1974     result = {
1975       "software_version": constants.RELEASE_VERSION,
1976       "protocol_version": constants.PROTOCOL_VERSION,
1977       "config_version": constants.CONFIG_VERSION,
1978       "os_api_version": constants.OS_API_VERSION,
1979       "export_version": constants.EXPORT_VERSION,
1980       "architecture": (platform.architecture()[0], platform.machine()),
1981       "name": cluster.cluster_name,
1982       "master": cluster.master_node,
1983       "default_hypervisor": cluster.default_hypervisor,
1984       "enabled_hypervisors": cluster.enabled_hypervisors,
1985       "hvparams": cluster.hvparams,
1986       "beparams": cluster.beparams,
1987       }
1988
1989     return result
1990
1991
1992 class LUQueryConfigValues(NoHooksLU):
1993   """Return configuration values.
1994
1995   """
1996   _OP_REQP = []
1997   REQ_BGL = False
1998   _FIELDS_DYNAMIC = utils.FieldSet()
1999   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2000
2001   def ExpandNames(self):
2002     self.needed_locks = {}
2003
2004     _CheckOutputFields(static=self._FIELDS_STATIC,
2005                        dynamic=self._FIELDS_DYNAMIC,
2006                        selected=self.op.output_fields)
2007
2008   def CheckPrereq(self):
2009     """No prerequisites.
2010
2011     """
2012     pass
2013
2014   def Exec(self, feedback_fn):
2015     """Dump a representation of the cluster config to the standard output.
2016
2017     """
2018     values = []
2019     for field in self.op.output_fields:
2020       if field == "cluster_name":
2021         entry = self.cfg.GetClusterName()
2022       elif field == "master_node":
2023         entry = self.cfg.GetMasterNode()
2024       elif field == "drain_flag":
2025         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2026       else:
2027         raise errors.ParameterError(field)
2028       values.append(entry)
2029     return values
2030
2031
2032 class LUActivateInstanceDisks(NoHooksLU):
2033   """Bring up an instance's disks.
2034
2035   """
2036   _OP_REQP = ["instance_name"]
2037   REQ_BGL = False
2038
2039   def ExpandNames(self):
2040     self._ExpandAndLockInstance()
2041     self.needed_locks[locking.LEVEL_NODE] = []
2042     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2043
2044   def DeclareLocks(self, level):
2045     if level == locking.LEVEL_NODE:
2046       self._LockInstancesNodes()
2047
2048   def CheckPrereq(self):
2049     """Check prerequisites.
2050
2051     This checks that the instance is in the cluster.
2052
2053     """
2054     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2055     assert self.instance is not None, \
2056       "Cannot retrieve locked instance %s" % self.op.instance_name
2057
2058   def Exec(self, feedback_fn):
2059     """Activate the disks.
2060
2061     """
2062     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2063     if not disks_ok:
2064       raise errors.OpExecError("Cannot activate block devices")
2065
2066     return disks_info
2067
2068
2069 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2070   """Prepare the block devices for an instance.
2071
2072   This sets up the block devices on all nodes.
2073
2074   @type lu: L{LogicalUnit}
2075   @param lu: the logical unit on whose behalf we execute
2076   @type instance: L{objects.Instance}
2077   @param instance: the instance for whose disks we assemble
2078   @type ignore_secondaries: boolean
2079   @param ignore_secondaries: if true, errors on secondary nodes
2080       won't result in an error return from the function
2081   @return: False if the operation failed, otherwise a list of
2082       (host, instance_visible_name, node_visible_name)
2083       with the mapping from node devices to instance devices
2084
2085   """
2086   device_info = []
2087   disks_ok = True
2088   iname = instance.name
2089   # With the two passes mechanism we try to reduce the window of
2090   # opportunity for the race condition of switching DRBD to primary
2091   # before handshaking occured, but we do not eliminate it
2092
2093   # The proper fix would be to wait (with some limits) until the
2094   # connection has been made and drbd transitions from WFConnection
2095   # into any other network-connected state (Connected, SyncTarget,
2096   # SyncSource, etc.)
2097
2098   # 1st pass, assemble on all nodes in secondary mode
2099   for inst_disk in instance.disks:
2100     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2101       lu.cfg.SetDiskID(node_disk, node)
2102       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2103       if not result:
2104         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2105                            " (is_primary=False, pass=1)",
2106                            inst_disk.iv_name, node)
2107         if not ignore_secondaries:
2108           disks_ok = False
2109
2110   # FIXME: race condition on drbd migration to primary
2111
2112   # 2nd pass, do only the primary node
2113   for inst_disk in instance.disks:
2114     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2115       if node != instance.primary_node:
2116         continue
2117       lu.cfg.SetDiskID(node_disk, node)
2118       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2119       if not result:
2120         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2121                            " (is_primary=True, pass=2)",
2122                            inst_disk.iv_name, node)
2123         disks_ok = False
2124     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2125
2126   # leave the disks configured for the primary node
2127   # this is a workaround that would be fixed better by
2128   # improving the logical/physical id handling
2129   for disk in instance.disks:
2130     lu.cfg.SetDiskID(disk, instance.primary_node)
2131
2132   return disks_ok, device_info
2133
2134
2135 def _StartInstanceDisks(lu, instance, force):
2136   """Start the disks of an instance.
2137
2138   """
2139   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2140                                            ignore_secondaries=force)
2141   if not disks_ok:
2142     _ShutdownInstanceDisks(lu, instance)
2143     if force is not None and not force:
2144       lu.proc.LogWarning("", hint="If the message above refers to a"
2145                          " secondary node,"
2146                          " you can retry the operation using '--force'.")
2147     raise errors.OpExecError("Disk consistency error")
2148
2149
2150 class LUDeactivateInstanceDisks(NoHooksLU):
2151   """Shutdown an instance's disks.
2152
2153   """
2154   _OP_REQP = ["instance_name"]
2155   REQ_BGL = False
2156
2157   def ExpandNames(self):
2158     self._ExpandAndLockInstance()
2159     self.needed_locks[locking.LEVEL_NODE] = []
2160     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2161
2162   def DeclareLocks(self, level):
2163     if level == locking.LEVEL_NODE:
2164       self._LockInstancesNodes()
2165
2166   def CheckPrereq(self):
2167     """Check prerequisites.
2168
2169     This checks that the instance is in the cluster.
2170
2171     """
2172     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2173     assert self.instance is not None, \
2174       "Cannot retrieve locked instance %s" % self.op.instance_name
2175
2176   def Exec(self, feedback_fn):
2177     """Deactivate the disks
2178
2179     """
2180     instance = self.instance
2181     _SafeShutdownInstanceDisks(self, instance)
2182
2183
2184 def _SafeShutdownInstanceDisks(lu, instance):
2185   """Shutdown block devices of an instance.
2186
2187   This function checks if an instance is running, before calling
2188   _ShutdownInstanceDisks.
2189
2190   """
2191   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2192                                       [instance.hypervisor])
2193   ins_l = ins_l[instance.primary_node]
2194   if not type(ins_l) is list:
2195     raise errors.OpExecError("Can't contact node '%s'" %
2196                              instance.primary_node)
2197
2198   if instance.name in ins_l:
2199     raise errors.OpExecError("Instance is running, can't shutdown"
2200                              " block devices.")
2201
2202   _ShutdownInstanceDisks(lu, instance)
2203
2204
2205 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2206   """Shutdown block devices of an instance.
2207
2208   This does the shutdown on all nodes of the instance.
2209
2210   If the ignore_primary is false, errors on the primary node are
2211   ignored.
2212
2213   """
2214   result = True
2215   for disk in instance.disks:
2216     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2217       lu.cfg.SetDiskID(top_disk, node)
2218       if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2219         logging.error("Could not shutdown block device %s on node %s",
2220                       disk.iv_name, node)
2221         if not ignore_primary or node != instance.primary_node:
2222           result = False
2223   return result
2224
2225
2226 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2227   """Checks if a node has enough free memory.
2228
2229   This function check if a given node has the needed amount of free
2230   memory. In case the node has less memory or we cannot get the
2231   information from the node, this function raise an OpPrereqError
2232   exception.
2233
2234   @type lu: C{LogicalUnit}
2235   @param lu: a logical unit from which we get configuration data
2236   @type node: C{str}
2237   @param node: the node to check
2238   @type reason: C{str}
2239   @param reason: string to use in the error message
2240   @type requested: C{int}
2241   @param requested: the amount of memory in MiB to check for
2242   @type hypervisor: C{str}
2243   @param hypervisor: the hypervisor to ask for memory stats
2244   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2245       we cannot check the node
2246
2247   """
2248   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2249   if not nodeinfo or not isinstance(nodeinfo, dict):
2250     raise errors.OpPrereqError("Could not contact node %s for resource"
2251                              " information" % (node,))
2252
2253   free_mem = nodeinfo[node].get('memory_free')
2254   if not isinstance(free_mem, int):
2255     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2256                              " was '%s'" % (node, free_mem))
2257   if requested > free_mem:
2258     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2259                              " needed %s MiB, available %s MiB" %
2260                              (node, reason, requested, free_mem))
2261
2262
2263 class LUStartupInstance(LogicalUnit):
2264   """Starts an instance.
2265
2266   """
2267   HPATH = "instance-start"
2268   HTYPE = constants.HTYPE_INSTANCE
2269   _OP_REQP = ["instance_name", "force"]
2270   REQ_BGL = False
2271
2272   def ExpandNames(self):
2273     self._ExpandAndLockInstance()
2274
2275   def BuildHooksEnv(self):
2276     """Build hooks env.
2277
2278     This runs on master, primary and secondary nodes of the instance.
2279
2280     """
2281     env = {
2282       "FORCE": self.op.force,
2283       }
2284     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2285     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2286           list(self.instance.secondary_nodes))
2287     return env, nl, nl
2288
2289   def CheckPrereq(self):
2290     """Check prerequisites.
2291
2292     This checks that the instance is in the cluster.
2293
2294     """
2295     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2296     assert self.instance is not None, \
2297       "Cannot retrieve locked instance %s" % self.op.instance_name
2298
2299     bep = self.cfg.GetClusterInfo().FillBE(instance)
2300     # check bridges existance
2301     _CheckInstanceBridgesExist(self, instance)
2302
2303     _CheckNodeFreeMemory(self, instance.primary_node,
2304                          "starting instance %s" % instance.name,
2305                          bep[constants.BE_MEMORY], instance.hypervisor)
2306
2307   def Exec(self, feedback_fn):
2308     """Start the instance.
2309
2310     """
2311     instance = self.instance
2312     force = self.op.force
2313     extra_args = getattr(self.op, "extra_args", "")
2314
2315     self.cfg.MarkInstanceUp(instance.name)
2316
2317     node_current = instance.primary_node
2318
2319     _StartInstanceDisks(self, instance, force)
2320
2321     if not self.rpc.call_instance_start(node_current, instance, extra_args):
2322       _ShutdownInstanceDisks(self, instance)
2323       raise errors.OpExecError("Could not start instance")
2324
2325
2326 class LURebootInstance(LogicalUnit):
2327   """Reboot an instance.
2328
2329   """
2330   HPATH = "instance-reboot"
2331   HTYPE = constants.HTYPE_INSTANCE
2332   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2333   REQ_BGL = False
2334
2335   def ExpandNames(self):
2336     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2337                                    constants.INSTANCE_REBOOT_HARD,
2338                                    constants.INSTANCE_REBOOT_FULL]:
2339       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2340                                   (constants.INSTANCE_REBOOT_SOFT,
2341                                    constants.INSTANCE_REBOOT_HARD,
2342                                    constants.INSTANCE_REBOOT_FULL))
2343     self._ExpandAndLockInstance()
2344
2345   def BuildHooksEnv(self):
2346     """Build hooks env.
2347
2348     This runs on master, primary and secondary nodes of the instance.
2349
2350     """
2351     env = {
2352       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2353       }
2354     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2355     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2356           list(self.instance.secondary_nodes))
2357     return env, nl, nl
2358
2359   def CheckPrereq(self):
2360     """Check prerequisites.
2361
2362     This checks that the instance is in the cluster.
2363
2364     """
2365     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2366     assert self.instance is not None, \
2367       "Cannot retrieve locked instance %s" % self.op.instance_name
2368
2369     # check bridges existance
2370     _CheckInstanceBridgesExist(self, instance)
2371
2372   def Exec(self, feedback_fn):
2373     """Reboot the instance.
2374
2375     """
2376     instance = self.instance
2377     ignore_secondaries = self.op.ignore_secondaries
2378     reboot_type = self.op.reboot_type
2379     extra_args = getattr(self.op, "extra_args", "")
2380
2381     node_current = instance.primary_node
2382
2383     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2384                        constants.INSTANCE_REBOOT_HARD]:
2385       if not self.rpc.call_instance_reboot(node_current, instance,
2386                                            reboot_type, extra_args):
2387         raise errors.OpExecError("Could not reboot instance")
2388     else:
2389       if not self.rpc.call_instance_shutdown(node_current, instance):
2390         raise errors.OpExecError("could not shutdown instance for full reboot")
2391       _ShutdownInstanceDisks(self, instance)
2392       _StartInstanceDisks(self, instance, ignore_secondaries)
2393       if not self.rpc.call_instance_start(node_current, instance, extra_args):
2394         _ShutdownInstanceDisks(self, instance)
2395         raise errors.OpExecError("Could not start instance for full reboot")
2396
2397     self.cfg.MarkInstanceUp(instance.name)
2398
2399
2400 class LUShutdownInstance(LogicalUnit):
2401   """Shutdown an instance.
2402
2403   """
2404   HPATH = "instance-stop"
2405   HTYPE = constants.HTYPE_INSTANCE
2406   _OP_REQP = ["instance_name"]
2407   REQ_BGL = False
2408
2409   def ExpandNames(self):
2410     self._ExpandAndLockInstance()
2411
2412   def BuildHooksEnv(self):
2413     """Build hooks env.
2414
2415     This runs on master, primary and secondary nodes of the instance.
2416
2417     """
2418     env = _BuildInstanceHookEnvByObject(self, self.instance)
2419     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2420           list(self.instance.secondary_nodes))
2421     return env, nl, nl
2422
2423   def CheckPrereq(self):
2424     """Check prerequisites.
2425
2426     This checks that the instance is in the cluster.
2427
2428     """
2429     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2430     assert self.instance is not None, \
2431       "Cannot retrieve locked instance %s" % self.op.instance_name
2432
2433   def Exec(self, feedback_fn):
2434     """Shutdown the instance.
2435
2436     """
2437     instance = self.instance
2438     node_current = instance.primary_node
2439     self.cfg.MarkInstanceDown(instance.name)
2440     if not self.rpc.call_instance_shutdown(node_current, instance):
2441       self.proc.LogWarning("Could not shutdown instance")
2442
2443     _ShutdownInstanceDisks(self, instance)
2444
2445
2446 class LUReinstallInstance(LogicalUnit):
2447   """Reinstall an instance.
2448
2449   """
2450   HPATH = "instance-reinstall"
2451   HTYPE = constants.HTYPE_INSTANCE
2452   _OP_REQP = ["instance_name"]
2453   REQ_BGL = False
2454
2455   def ExpandNames(self):
2456     self._ExpandAndLockInstance()
2457
2458   def BuildHooksEnv(self):
2459     """Build hooks env.
2460
2461     This runs on master, primary and secondary nodes of the instance.
2462
2463     """
2464     env = _BuildInstanceHookEnvByObject(self, self.instance)
2465     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2466           list(self.instance.secondary_nodes))
2467     return env, nl, nl
2468
2469   def CheckPrereq(self):
2470     """Check prerequisites.
2471
2472     This checks that the instance is in the cluster and is not running.
2473
2474     """
2475     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2476     assert instance is not None, \
2477       "Cannot retrieve locked instance %s" % self.op.instance_name
2478
2479     if instance.disk_template == constants.DT_DISKLESS:
2480       raise errors.OpPrereqError("Instance '%s' has no disks" %
2481                                  self.op.instance_name)
2482     if instance.status != "down":
2483       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2484                                  self.op.instance_name)
2485     remote_info = self.rpc.call_instance_info(instance.primary_node,
2486                                               instance.name,
2487                                               instance.hypervisor)
2488     if remote_info:
2489       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2490                                  (self.op.instance_name,
2491                                   instance.primary_node))
2492
2493     self.op.os_type = getattr(self.op, "os_type", None)
2494     if self.op.os_type is not None:
2495       # OS verification
2496       pnode = self.cfg.GetNodeInfo(
2497         self.cfg.ExpandNodeName(instance.primary_node))
2498       if pnode is None:
2499         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2500                                    self.op.pnode)
2501       os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2502       if not os_obj:
2503         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2504                                    " primary node"  % self.op.os_type)
2505
2506     self.instance = instance
2507
2508   def Exec(self, feedback_fn):
2509     """Reinstall the instance.
2510
2511     """
2512     inst = self.instance
2513
2514     if self.op.os_type is not None:
2515       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2516       inst.os = self.op.os_type
2517       self.cfg.Update(inst)
2518
2519     _StartInstanceDisks(self, inst, None)
2520     try:
2521       feedback_fn("Running the instance OS create scripts...")
2522       if not self.rpc.call_instance_os_add(inst.primary_node, inst):
2523         raise errors.OpExecError("Could not install OS for instance %s"
2524                                  " on node %s" %
2525                                  (inst.name, inst.primary_node))
2526     finally:
2527       _ShutdownInstanceDisks(self, inst)
2528
2529
2530 class LURenameInstance(LogicalUnit):
2531   """Rename an instance.
2532
2533   """
2534   HPATH = "instance-rename"
2535   HTYPE = constants.HTYPE_INSTANCE
2536   _OP_REQP = ["instance_name", "new_name"]
2537
2538   def BuildHooksEnv(self):
2539     """Build hooks env.
2540
2541     This runs on master, primary and secondary nodes of the instance.
2542
2543     """
2544     env = _BuildInstanceHookEnvByObject(self, self.instance)
2545     env["INSTANCE_NEW_NAME"] = self.op.new_name
2546     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2547           list(self.instance.secondary_nodes))
2548     return env, nl, nl
2549
2550   def CheckPrereq(self):
2551     """Check prerequisites.
2552
2553     This checks that the instance is in the cluster and is not running.
2554
2555     """
2556     instance = self.cfg.GetInstanceInfo(
2557       self.cfg.ExpandInstanceName(self.op.instance_name))
2558     if instance is None:
2559       raise errors.OpPrereqError("Instance '%s' not known" %
2560                                  self.op.instance_name)
2561     if instance.status != "down":
2562       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2563                                  self.op.instance_name)
2564     remote_info = self.rpc.call_instance_info(instance.primary_node,
2565                                               instance.name,
2566                                               instance.hypervisor)
2567     if remote_info:
2568       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2569                                  (self.op.instance_name,
2570                                   instance.primary_node))
2571     self.instance = instance
2572
2573     # new name verification
2574     name_info = utils.HostInfo(self.op.new_name)
2575
2576     self.op.new_name = new_name = name_info.name
2577     instance_list = self.cfg.GetInstanceList()
2578     if new_name in instance_list:
2579       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2580                                  new_name)
2581
2582     if not getattr(self.op, "ignore_ip", False):
2583       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2584         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2585                                    (name_info.ip, new_name))
2586
2587
2588   def Exec(self, feedback_fn):
2589     """Reinstall the instance.
2590
2591     """
2592     inst = self.instance
2593     old_name = inst.name
2594
2595     if inst.disk_template == constants.DT_FILE:
2596       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2597
2598     self.cfg.RenameInstance(inst.name, self.op.new_name)
2599     # Change the instance lock. This is definitely safe while we hold the BGL
2600     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
2601     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2602
2603     # re-read the instance from the configuration after rename
2604     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2605
2606     if inst.disk_template == constants.DT_FILE:
2607       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2608       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2609                                                      old_file_storage_dir,
2610                                                      new_file_storage_dir)
2611
2612       if not result:
2613         raise errors.OpExecError("Could not connect to node '%s' to rename"
2614                                  " directory '%s' to '%s' (but the instance"
2615                                  " has been renamed in Ganeti)" % (
2616                                  inst.primary_node, old_file_storage_dir,
2617                                  new_file_storage_dir))
2618
2619       if not result[0]:
2620         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2621                                  " (but the instance has been renamed in"
2622                                  " Ganeti)" % (old_file_storage_dir,
2623                                                new_file_storage_dir))
2624
2625     _StartInstanceDisks(self, inst, None)
2626     try:
2627       if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2628                                                old_name):
2629         msg = ("Could not run OS rename script for instance %s on node %s"
2630                " (but the instance has been renamed in Ganeti)" %
2631                (inst.name, inst.primary_node))
2632         self.proc.LogWarning(msg)
2633     finally:
2634       _ShutdownInstanceDisks(self, inst)
2635
2636
2637 class LURemoveInstance(LogicalUnit):
2638   """Remove an instance.
2639
2640   """
2641   HPATH = "instance-remove"
2642   HTYPE = constants.HTYPE_INSTANCE
2643   _OP_REQP = ["instance_name", "ignore_failures"]
2644   REQ_BGL = False
2645
2646   def ExpandNames(self):
2647     self._ExpandAndLockInstance()
2648     self.needed_locks[locking.LEVEL_NODE] = []
2649     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2650
2651   def DeclareLocks(self, level):
2652     if level == locking.LEVEL_NODE:
2653       self._LockInstancesNodes()
2654
2655   def BuildHooksEnv(self):
2656     """Build hooks env.
2657
2658     This runs on master, primary and secondary nodes of the instance.
2659
2660     """
2661     env = _BuildInstanceHookEnvByObject(self, self.instance)
2662     nl = [self.cfg.GetMasterNode()]
2663     return env, nl, nl
2664
2665   def CheckPrereq(self):
2666     """Check prerequisites.
2667
2668     This checks that the instance is in the cluster.
2669
2670     """
2671     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2672     assert self.instance is not None, \
2673       "Cannot retrieve locked instance %s" % self.op.instance_name
2674
2675   def Exec(self, feedback_fn):
2676     """Remove the instance.
2677
2678     """
2679     instance = self.instance
2680     logging.info("Shutting down instance %s on node %s",
2681                  instance.name, instance.primary_node)
2682
2683     if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2684       if self.op.ignore_failures:
2685         feedback_fn("Warning: can't shutdown instance")
2686       else:
2687         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2688                                  (instance.name, instance.primary_node))
2689
2690     logging.info("Removing block devices for instance %s", instance.name)
2691
2692     if not _RemoveDisks(self, instance):
2693       if self.op.ignore_failures:
2694         feedback_fn("Warning: can't remove instance's disks")
2695       else:
2696         raise errors.OpExecError("Can't remove instance's disks")
2697
2698     logging.info("Removing instance %s out of cluster config", instance.name)
2699
2700     self.cfg.RemoveInstance(instance.name)
2701     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2702
2703
2704 class LUQueryInstances(NoHooksLU):
2705   """Logical unit for querying instances.
2706
2707   """
2708   _OP_REQP = ["output_fields", "names"]
2709   REQ_BGL = False
2710   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
2711                                     "admin_state", "admin_ram",
2712                                     "disk_template", "ip", "mac", "bridge",
2713                                     "sda_size", "sdb_size", "vcpus", "tags",
2714                                     "network_port", "beparams",
2715                                     "(disk).(size)/([0-9]+)",
2716                                     "(disk).(sizes)",
2717                                     "(nic).(mac|ip|bridge)/([0-9]+)",
2718                                     "(nic).(macs|ips|bridges)",
2719                                     "(disk|nic).(count)",
2720                                     "serial_no", "hypervisor", "hvparams",] +
2721                                   ["hv/%s" % name
2722                                    for name in constants.HVS_PARAMETERS] +
2723                                   ["be/%s" % name
2724                                    for name in constants.BES_PARAMETERS])
2725   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
2726
2727
2728   def ExpandNames(self):
2729     _CheckOutputFields(static=self._FIELDS_STATIC,
2730                        dynamic=self._FIELDS_DYNAMIC,
2731                        selected=self.op.output_fields)
2732
2733     self.needed_locks = {}
2734     self.share_locks[locking.LEVEL_INSTANCE] = 1
2735     self.share_locks[locking.LEVEL_NODE] = 1
2736
2737     if self.op.names:
2738       self.wanted = _GetWantedInstances(self, self.op.names)
2739     else:
2740       self.wanted = locking.ALL_SET
2741
2742     self.do_locking = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2743     if self.do_locking:
2744       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2745       self.needed_locks[locking.LEVEL_NODE] = []
2746       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2747
2748   def DeclareLocks(self, level):
2749     if level == locking.LEVEL_NODE and self.do_locking:
2750       self._LockInstancesNodes()
2751
2752   def CheckPrereq(self):
2753     """Check prerequisites.
2754
2755     """
2756     pass
2757
2758   def Exec(self, feedback_fn):
2759     """Computes the list of nodes and their attributes.
2760
2761     """
2762     all_info = self.cfg.GetAllInstancesInfo()
2763     if self.do_locking:
2764       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2765     elif self.wanted != locking.ALL_SET:
2766       instance_names = self.wanted
2767       missing = set(instance_names).difference(all_info.keys())
2768       if missing:
2769         raise errors.OpExecError(
2770           "Some instances were removed before retrieving their data: %s"
2771           % missing)
2772     else:
2773       instance_names = all_info.keys()
2774
2775     instance_names = utils.NiceSort(instance_names)
2776     instance_list = [all_info[iname] for iname in instance_names]
2777
2778     # begin data gathering
2779
2780     nodes = frozenset([inst.primary_node for inst in instance_list])
2781     hv_list = list(set([inst.hypervisor for inst in instance_list]))
2782
2783     bad_nodes = []
2784     if self.do_locking:
2785       live_data = {}
2786       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2787       for name in nodes:
2788         result = node_data[name]
2789         if result:
2790           live_data.update(result)
2791         elif result == False:
2792           bad_nodes.append(name)
2793         # else no instance is alive
2794     else:
2795       live_data = dict([(name, {}) for name in instance_names])
2796
2797     # end data gathering
2798
2799     HVPREFIX = "hv/"
2800     BEPREFIX = "be/"
2801     output = []
2802     for instance in instance_list:
2803       iout = []
2804       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2805       i_be = self.cfg.GetClusterInfo().FillBE(instance)
2806       for field in self.op.output_fields:
2807         st_match = self._FIELDS_STATIC.Matches(field)
2808         if field == "name":
2809           val = instance.name
2810         elif field == "os":
2811           val = instance.os
2812         elif field == "pnode":
2813           val = instance.primary_node
2814         elif field == "snodes":
2815           val = list(instance.secondary_nodes)
2816         elif field == "admin_state":
2817           val = (instance.status != "down")
2818         elif field == "oper_state":
2819           if instance.primary_node in bad_nodes:
2820             val = None
2821           else:
2822             val = bool(live_data.get(instance.name))
2823         elif field == "status":
2824           if instance.primary_node in bad_nodes:
2825             val = "ERROR_nodedown"
2826           else:
2827             running = bool(live_data.get(instance.name))
2828             if running:
2829               if instance.status != "down":
2830                 val = "running"
2831               else:
2832                 val = "ERROR_up"
2833             else:
2834               if instance.status != "down":
2835                 val = "ERROR_down"
2836               else:
2837                 val = "ADMIN_down"
2838         elif field == "oper_ram":
2839           if instance.primary_node in bad_nodes:
2840             val = None
2841           elif instance.name in live_data:
2842             val = live_data[instance.name].get("memory", "?")
2843           else:
2844             val = "-"
2845         elif field == "disk_template":
2846           val = instance.disk_template
2847         elif field == "ip":
2848           val = instance.nics[0].ip
2849         elif field == "bridge":
2850           val = instance.nics[0].bridge
2851         elif field == "mac":
2852           val = instance.nics[0].mac
2853         elif field == "sda_size" or field == "sdb_size":
2854           idx = ord(field[2]) - ord('a')
2855           try:
2856             val = instance.FindDisk(idx).size
2857           except errors.OpPrereqError:
2858             val = None
2859         elif field == "tags":
2860           val = list(instance.GetTags())
2861         elif field == "serial_no":
2862           val = instance.serial_no
2863         elif field == "network_port":
2864           val = instance.network_port
2865         elif field == "hypervisor":
2866           val = instance.hypervisor
2867         elif field == "hvparams":
2868           val = i_hv
2869         elif (field.startswith(HVPREFIX) and
2870               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2871           val = i_hv.get(field[len(HVPREFIX):], None)
2872         elif field == "beparams":
2873           val = i_be
2874         elif (field.startswith(BEPREFIX) and
2875               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2876           val = i_be.get(field[len(BEPREFIX):], None)
2877         elif st_match and st_match.groups():
2878           # matches a variable list
2879           st_groups = st_match.groups()
2880           if st_groups and st_groups[0] == "disk":
2881             if st_groups[1] == "count":
2882               val = len(instance.disks)
2883             elif st_groups[1] == "sizes":
2884               val = [disk.size for disk in instance.disks]
2885             elif st_groups[1] == "size":
2886               try:
2887                 val = instance.FindDisk(st_groups[2]).size
2888               except errors.OpPrereqError:
2889                 val = None
2890             else:
2891               assert False, "Unhandled disk parameter"
2892           elif st_groups[0] == "nic":
2893             if st_groups[1] == "count":
2894               val = len(instance.nics)
2895             elif st_groups[1] == "macs":
2896               val = [nic.mac for nic in instance.nics]
2897             elif st_groups[1] == "ips":
2898               val = [nic.ip for nic in instance.nics]
2899             elif st_groups[1] == "bridges":
2900               val = [nic.bridge for nic in instance.nics]
2901             else:
2902               # index-based item
2903               nic_idx = int(st_groups[2])
2904               if nic_idx >= len(instance.nics):
2905                 val = None
2906               else:
2907                 if st_groups[1] == "mac":
2908                   val = instance.nics[nic_idx].mac
2909                 elif st_groups[1] == "ip":
2910                   val = instance.nics[nic_idx].ip
2911                 elif st_groups[1] == "bridge":
2912                   val = instance.nics[nic_idx].bridge
2913                 else:
2914                   assert False, "Unhandled NIC parameter"
2915           else:
2916             assert False, "Unhandled variable parameter"
2917         else:
2918           raise errors.ParameterError(field)
2919         iout.append(val)
2920       output.append(iout)
2921
2922     return output
2923
2924
2925 class LUFailoverInstance(LogicalUnit):
2926   """Failover an instance.
2927
2928   """
2929   HPATH = "instance-failover"
2930   HTYPE = constants.HTYPE_INSTANCE
2931   _OP_REQP = ["instance_name", "ignore_consistency"]
2932   REQ_BGL = False
2933
2934   def ExpandNames(self):
2935     self._ExpandAndLockInstance()
2936     self.needed_locks[locking.LEVEL_NODE] = []
2937     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2938
2939   def DeclareLocks(self, level):
2940     if level == locking.LEVEL_NODE:
2941       self._LockInstancesNodes()
2942
2943   def BuildHooksEnv(self):
2944     """Build hooks env.
2945
2946     This runs on master, primary and secondary nodes of the instance.
2947
2948     """
2949     env = {
2950       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2951       }
2952     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2953     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2954     return env, nl, nl
2955
2956   def CheckPrereq(self):
2957     """Check prerequisites.
2958
2959     This checks that the instance is in the cluster.
2960
2961     """
2962     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2963     assert self.instance is not None, \
2964       "Cannot retrieve locked instance %s" % self.op.instance_name
2965
2966     bep = self.cfg.GetClusterInfo().FillBE(instance)
2967     if instance.disk_template not in constants.DTS_NET_MIRROR:
2968       raise errors.OpPrereqError("Instance's disk layout is not"
2969                                  " network mirrored, cannot failover.")
2970
2971     secondary_nodes = instance.secondary_nodes
2972     if not secondary_nodes:
2973       raise errors.ProgrammerError("no secondary node but using "
2974                                    "a mirrored disk template")
2975
2976     target_node = secondary_nodes[0]
2977     # check memory requirements on the secondary node
2978     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2979                          instance.name, bep[constants.BE_MEMORY],
2980                          instance.hypervisor)
2981
2982     # check bridge existance
2983     brlist = [nic.bridge for nic in instance.nics]
2984     if not self.rpc.call_bridges_exist(target_node, brlist):
2985       raise errors.OpPrereqError("One or more target bridges %s does not"
2986                                  " exist on destination node '%s'" %
2987                                  (brlist, target_node))
2988
2989   def Exec(self, feedback_fn):
2990     """Failover an instance.
2991
2992     The failover is done by shutting it down on its present node and
2993     starting it on the secondary.
2994
2995     """
2996     instance = self.instance
2997
2998     source_node = instance.primary_node
2999     target_node = instance.secondary_nodes[0]
3000
3001     feedback_fn("* checking disk consistency between source and target")
3002     for dev in instance.disks:
3003       # for drbd, these are drbd over lvm
3004       if not _CheckDiskConsistency(self, dev, target_node, False):
3005         if instance.status == "up" and not self.op.ignore_consistency:
3006           raise errors.OpExecError("Disk %s is degraded on target node,"
3007                                    " aborting failover." % dev.iv_name)
3008
3009     feedback_fn("* shutting down instance on source node")
3010     logging.info("Shutting down instance %s on node %s",
3011                  instance.name, source_node)
3012
3013     if not self.rpc.call_instance_shutdown(source_node, instance):
3014       if self.op.ignore_consistency:
3015         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3016                              " Proceeding"
3017                              " anyway. Please make sure node %s is down",
3018                              instance.name, source_node, source_node)
3019       else:
3020         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
3021                                  (instance.name, source_node))
3022
3023     feedback_fn("* deactivating the instance's disks on source node")
3024     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3025       raise errors.OpExecError("Can't shut down the instance's disks.")
3026
3027     instance.primary_node = target_node
3028     # distribute new instance config to the other nodes
3029     self.cfg.Update(instance)
3030
3031     # Only start the instance if it's marked as up
3032     if instance.status == "up":
3033       feedback_fn("* activating the instance's disks on target node")
3034       logging.info("Starting instance %s on node %s",
3035                    instance.name, target_node)
3036
3037       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3038                                                ignore_secondaries=True)
3039       if not disks_ok:
3040         _ShutdownInstanceDisks(self, instance)
3041         raise errors.OpExecError("Can't activate the instance's disks")
3042
3043       feedback_fn("* starting the instance on the target node")
3044       if not self.rpc.call_instance_start(target_node, instance, None):
3045         _ShutdownInstanceDisks(self, instance)
3046         raise errors.OpExecError("Could not start instance %s on node %s." %
3047                                  (instance.name, target_node))
3048
3049
3050 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
3051   """Create a tree of block devices on the primary node.
3052
3053   This always creates all devices.
3054
3055   """
3056   if device.children:
3057     for child in device.children:
3058       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
3059         return False
3060
3061   lu.cfg.SetDiskID(device, node)
3062   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3063                                        instance.name, True, info)
3064   if not new_id:
3065     return False
3066   if device.physical_id is None:
3067     device.physical_id = new_id
3068   return True
3069
3070
3071 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
3072   """Create a tree of block devices on a secondary node.
3073
3074   If this device type has to be created on secondaries, create it and
3075   all its children.
3076
3077   If not, just recurse to children keeping the same 'force' value.
3078
3079   """
3080   if device.CreateOnSecondary():
3081     force = True
3082   if device.children:
3083     for child in device.children:
3084       if not _CreateBlockDevOnSecondary(lu, node, instance,
3085                                         child, force, info):
3086         return False
3087
3088   if not force:
3089     return True
3090   lu.cfg.SetDiskID(device, node)
3091   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3092                                        instance.name, False, info)
3093   if not new_id:
3094     return False
3095   if device.physical_id is None:
3096     device.physical_id = new_id
3097   return True
3098
3099
3100 def _GenerateUniqueNames(lu, exts):
3101   """Generate a suitable LV name.
3102
3103   This will generate a logical volume name for the given instance.
3104
3105   """
3106   results = []
3107   for val in exts:
3108     new_id = lu.cfg.GenerateUniqueID()
3109     results.append("%s%s" % (new_id, val))
3110   return results
3111
3112
3113 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3114                          p_minor, s_minor):
3115   """Generate a drbd8 device complete with its children.
3116
3117   """
3118   port = lu.cfg.AllocatePort()
3119   vgname = lu.cfg.GetVGName()
3120   shared_secret = lu.cfg.GenerateDRBDSecret()
3121   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3122                           logical_id=(vgname, names[0]))
3123   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3124                           logical_id=(vgname, names[1]))
3125   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3126                           logical_id=(primary, secondary, port,
3127                                       p_minor, s_minor,
3128                                       shared_secret),
3129                           children=[dev_data, dev_meta],
3130                           iv_name=iv_name)
3131   return drbd_dev
3132
3133
3134 def _GenerateDiskTemplate(lu, template_name,
3135                           instance_name, primary_node,
3136                           secondary_nodes, disk_info,
3137                           file_storage_dir, file_driver,
3138                           base_index):
3139   """Generate the entire disk layout for a given template type.
3140
3141   """
3142   #TODO: compute space requirements
3143
3144   vgname = lu.cfg.GetVGName()
3145   disk_count = len(disk_info)
3146   disks = []
3147   if template_name == constants.DT_DISKLESS:
3148     pass
3149   elif template_name == constants.DT_PLAIN:
3150     if len(secondary_nodes) != 0:
3151       raise errors.ProgrammerError("Wrong template configuration")
3152
3153     names = _GenerateUniqueNames(lu, [".disk%d" % i
3154                                       for i in range(disk_count)])
3155     for idx, disk in enumerate(disk_info):
3156       disk_index = idx + base_index
3157       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
3158                               logical_id=(vgname, names[idx]),
3159                               iv_name="disk/%d" % disk_index)
3160       disks.append(disk_dev)
3161   elif template_name == constants.DT_DRBD8:
3162     if len(secondary_nodes) != 1:
3163       raise errors.ProgrammerError("Wrong template configuration")
3164     remote_node = secondary_nodes[0]
3165     minors = lu.cfg.AllocateDRBDMinor(
3166       [primary_node, remote_node] * len(disk_info), instance_name)
3167
3168     names = _GenerateUniqueNames(lu,
3169                                  [".disk%d_%s" % (i, s)
3170                                   for i in range(disk_count)
3171                                   for s in ("data", "meta")
3172                                   ])
3173     for idx, disk in enumerate(disk_info):
3174       disk_index = idx + base_index
3175       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3176                                       disk["size"], names[idx*2:idx*2+2],
3177                                       "disk/%d" % disk_index,
3178                                       minors[idx*2], minors[idx*2+1])
3179       disks.append(disk_dev)
3180   elif template_name == constants.DT_FILE:
3181     if len(secondary_nodes) != 0:
3182       raise errors.ProgrammerError("Wrong template configuration")
3183
3184     for idx, disk in enumerate(disk_info):
3185       disk_index = idx + base_index
3186       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
3187                               iv_name="disk/%d" % disk_index,
3188                               logical_id=(file_driver,
3189                                           "%s/disk%d" % (file_storage_dir,
3190                                                          idx)))
3191       disks.append(disk_dev)
3192   else:
3193     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3194   return disks
3195
3196
3197 def _GetInstanceInfoText(instance):
3198   """Compute that text that should be added to the disk's metadata.
3199
3200   """
3201   return "originstname+%s" % instance.name
3202
3203
3204 def _CreateDisks(lu, instance):
3205   """Create all disks for an instance.
3206
3207   This abstracts away some work from AddInstance.
3208
3209   @type lu: L{LogicalUnit}
3210   @param lu: the logical unit on whose behalf we execute
3211   @type instance: L{objects.Instance}
3212   @param instance: the instance whose disks we should create
3213   @rtype: boolean
3214   @return: the success of the creation
3215
3216   """
3217   info = _GetInstanceInfoText(instance)
3218
3219   if instance.disk_template == constants.DT_FILE:
3220     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3221     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3222                                                  file_storage_dir)
3223
3224     if not result:
3225       logging.error("Could not connect to node '%s'", instance.primary_node)
3226       return False
3227
3228     if not result[0]:
3229       logging.error("Failed to create directory '%s'", file_storage_dir)
3230       return False
3231
3232   # Note: this needs to be kept in sync with adding of disks in
3233   # LUSetInstanceParams
3234   for device in instance.disks:
3235     logging.info("Creating volume %s for instance %s",
3236                  device.iv_name, instance.name)
3237     #HARDCODE
3238     for secondary_node in instance.secondary_nodes:
3239       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3240                                         device, False, info):
3241         logging.error("Failed to create volume %s (%s) on secondary node %s!",
3242                       device.iv_name, device, secondary_node)
3243         return False
3244     #HARDCODE
3245     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3246                                     instance, device, info):
3247       logging.error("Failed to create volume %s on primary!", device.iv_name)
3248       return False
3249
3250   return True
3251
3252
3253 def _RemoveDisks(lu, instance):
3254   """Remove all disks for an instance.
3255
3256   This abstracts away some work from `AddInstance()` and
3257   `RemoveInstance()`. Note that in case some of the devices couldn't
3258   be removed, the removal will continue with the other ones (compare
3259   with `_CreateDisks()`).
3260
3261   @type lu: L{LogicalUnit}
3262   @param lu: the logical unit on whose behalf we execute
3263   @type instance: L{objects.Instance}
3264   @param instance: the instance whose disks we should remove
3265   @rtype: boolean
3266   @return: the success of the removal
3267
3268   """
3269   logging.info("Removing block devices for instance %s", instance.name)
3270
3271   result = True
3272   for device in instance.disks:
3273     for node, disk in device.ComputeNodeTree(instance.primary_node):
3274       lu.cfg.SetDiskID(disk, node)
3275       if not lu.rpc.call_blockdev_remove(node, disk):
3276         lu.proc.LogWarning("Could not remove block device %s on node %s,"
3277                            " continuing anyway", device.iv_name, node)
3278         result = False
3279
3280   if instance.disk_template == constants.DT_FILE:
3281     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3282     if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3283                                                file_storage_dir):
3284       logging.error("Could not remove directory '%s'", file_storage_dir)
3285       result = False
3286
3287   return result
3288
3289
3290 def _ComputeDiskSize(disk_template, disks):
3291   """Compute disk size requirements in the volume group
3292
3293   """
3294   # Required free disk space as a function of disk and swap space
3295   req_size_dict = {
3296     constants.DT_DISKLESS: None,
3297     constants.DT_PLAIN: sum(d["size"] for d in disks),
3298     # 128 MB are added for drbd metadata for each disk
3299     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
3300     constants.DT_FILE: None,
3301   }
3302
3303   if disk_template not in req_size_dict:
3304     raise errors.ProgrammerError("Disk template '%s' size requirement"
3305                                  " is unknown" %  disk_template)
3306
3307   return req_size_dict[disk_template]
3308
3309
3310 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3311   """Hypervisor parameter validation.
3312
3313   This function abstract the hypervisor parameter validation to be
3314   used in both instance create and instance modify.
3315
3316   @type lu: L{LogicalUnit}
3317   @param lu: the logical unit for which we check
3318   @type nodenames: list
3319   @param nodenames: the list of nodes on which we should check
3320   @type hvname: string
3321   @param hvname: the name of the hypervisor we should use
3322   @type hvparams: dict
3323   @param hvparams: the parameters which we need to check
3324   @raise errors.OpPrereqError: if the parameters are not valid
3325
3326   """
3327   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3328                                                   hvname,
3329                                                   hvparams)
3330   for node in nodenames:
3331     info = hvinfo.get(node, None)
3332     if not info or not isinstance(info, (tuple, list)):
3333       raise errors.OpPrereqError("Cannot get current information"
3334                                  " from node '%s' (%s)" % (node, info))
3335     if not info[0]:
3336       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3337                                  " %s" % info[1])
3338
3339
3340 class LUCreateInstance(LogicalUnit):
3341   """Create an instance.
3342
3343   """
3344   HPATH = "instance-add"
3345   HTYPE = constants.HTYPE_INSTANCE
3346   _OP_REQP = ["instance_name", "disks", "disk_template",
3347               "mode", "start",
3348               "wait_for_sync", "ip_check", "nics",
3349               "hvparams", "beparams"]
3350   REQ_BGL = False
3351
3352   def _ExpandNode(self, node):
3353     """Expands and checks one node name.
3354
3355     """
3356     node_full = self.cfg.ExpandNodeName(node)
3357     if node_full is None:
3358       raise errors.OpPrereqError("Unknown node %s" % node)
3359     return node_full
3360
3361   def ExpandNames(self):
3362     """ExpandNames for CreateInstance.
3363
3364     Figure out the right locks for instance creation.
3365
3366     """
3367     self.needed_locks = {}
3368
3369     # set optional parameters to none if they don't exist
3370     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3371       if not hasattr(self.op, attr):
3372         setattr(self.op, attr, None)
3373
3374     # cheap checks, mostly valid constants given
3375
3376     # verify creation mode
3377     if self.op.mode not in (constants.INSTANCE_CREATE,
3378                             constants.INSTANCE_IMPORT):
3379       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3380                                  self.op.mode)
3381
3382     # disk template and mirror node verification
3383     if self.op.disk_template not in constants.DISK_TEMPLATES:
3384       raise errors.OpPrereqError("Invalid disk template name")
3385
3386     if self.op.hypervisor is None:
3387       self.op.hypervisor = self.cfg.GetHypervisorType()
3388
3389     cluster = self.cfg.GetClusterInfo()
3390     enabled_hvs = cluster.enabled_hypervisors
3391     if self.op.hypervisor not in enabled_hvs:
3392       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3393                                  " cluster (%s)" % (self.op.hypervisor,
3394                                   ",".join(enabled_hvs)))
3395
3396     # check hypervisor parameter syntax (locally)
3397
3398     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3399                                   self.op.hvparams)
3400     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3401     hv_type.CheckParameterSyntax(filled_hvp)
3402
3403     # fill and remember the beparams dict
3404     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3405                                     self.op.beparams)
3406
3407     #### instance parameters check
3408
3409     # instance name verification
3410     hostname1 = utils.HostInfo(self.op.instance_name)
3411     self.op.instance_name = instance_name = hostname1.name
3412
3413     # this is just a preventive check, but someone might still add this
3414     # instance in the meantime, and creation will fail at lock-add time
3415     if instance_name in self.cfg.GetInstanceList():
3416       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3417                                  instance_name)
3418
3419     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3420
3421     # NIC buildup
3422     self.nics = []
3423     for nic in self.op.nics:
3424       # ip validity checks
3425       ip = nic.get("ip", None)
3426       if ip is None or ip.lower() == "none":
3427         nic_ip = None
3428       elif ip.lower() == constants.VALUE_AUTO:
3429         nic_ip = hostname1.ip
3430       else:
3431         if not utils.IsValidIP(ip):
3432           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
3433                                      " like a valid IP" % ip)
3434         nic_ip = ip
3435
3436       # MAC address verification
3437       mac = nic.get("mac", constants.VALUE_AUTO)
3438       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3439         if not utils.IsValidMac(mac.lower()):
3440           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
3441                                      mac)
3442       # bridge verification
3443       bridge = nic.get("bridge", self.cfg.GetDefBridge())
3444       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
3445
3446     # disk checks/pre-build
3447     self.disks = []
3448     for disk in self.op.disks:
3449       mode = disk.get("mode", constants.DISK_RDWR)
3450       if mode not in constants.DISK_ACCESS_SET:
3451         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
3452                                    mode)
3453       size = disk.get("size", None)
3454       if size is None:
3455         raise errors.OpPrereqError("Missing disk size")
3456       try:
3457         size = int(size)
3458       except ValueError:
3459         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
3460       self.disks.append({"size": size, "mode": mode})
3461
3462     # used in CheckPrereq for ip ping check
3463     self.check_ip = hostname1.ip
3464
3465     # file storage checks
3466     if (self.op.file_driver and
3467         not self.op.file_driver in constants.FILE_DRIVER):
3468       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3469                                  self.op.file_driver)
3470
3471     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3472       raise errors.OpPrereqError("File storage directory path not absolute")
3473
3474     ### Node/iallocator related checks
3475     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3476       raise errors.OpPrereqError("One and only one of iallocator and primary"
3477                                  " node must be given")
3478
3479     if self.op.iallocator:
3480       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3481     else:
3482       self.op.pnode = self._ExpandNode(self.op.pnode)
3483       nodelist = [self.op.pnode]
3484       if self.op.snode is not None:
3485         self.op.snode = self._ExpandNode(self.op.snode)
3486         nodelist.append(self.op.snode)
3487       self.needed_locks[locking.LEVEL_NODE] = nodelist
3488
3489     # in case of import lock the source node too
3490     if self.op.mode == constants.INSTANCE_IMPORT:
3491       src_node = getattr(self.op, "src_node", None)
3492       src_path = getattr(self.op, "src_path", None)
3493
3494       if src_node is None or src_path is None:
3495         raise errors.OpPrereqError("Importing an instance requires source"
3496                                    " node and path options")
3497
3498       if not os.path.isabs(src_path):
3499         raise errors.OpPrereqError("The source path must be absolute")
3500
3501       self.op.src_node = src_node = self._ExpandNode(src_node)
3502       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3503         self.needed_locks[locking.LEVEL_NODE].append(src_node)
3504
3505     else: # INSTANCE_CREATE
3506       if getattr(self.op, "os_type", None) is None:
3507         raise errors.OpPrereqError("No guest OS specified")
3508
3509   def _RunAllocator(self):
3510     """Run the allocator based on input opcode.
3511
3512     """
3513     nics = [n.ToDict() for n in self.nics]
3514     ial = IAllocator(self,
3515                      mode=constants.IALLOCATOR_MODE_ALLOC,
3516                      name=self.op.instance_name,
3517                      disk_template=self.op.disk_template,
3518                      tags=[],
3519                      os=self.op.os_type,
3520                      vcpus=self.be_full[constants.BE_VCPUS],
3521                      mem_size=self.be_full[constants.BE_MEMORY],
3522                      disks=self.disks,
3523                      nics=nics,
3524                      hypervisor=self.op.hypervisor,
3525                      )
3526
3527     ial.Run(self.op.iallocator)
3528
3529     if not ial.success:
3530       raise errors.OpPrereqError("Can't compute nodes using"
3531                                  " iallocator '%s': %s" % (self.op.iallocator,
3532                                                            ial.info))
3533     if len(ial.nodes) != ial.required_nodes:
3534       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3535                                  " of nodes (%s), required %s" %
3536                                  (self.op.iallocator, len(ial.nodes),
3537                                   ial.required_nodes))
3538     self.op.pnode = ial.nodes[0]
3539     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3540                  self.op.instance_name, self.op.iallocator,
3541                  ", ".join(ial.nodes))
3542     if ial.required_nodes == 2:
3543       self.op.snode = ial.nodes[1]
3544
3545   def BuildHooksEnv(self):
3546     """Build hooks env.
3547
3548     This runs on master, primary and secondary nodes of the instance.
3549
3550     """
3551     env = {
3552       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3553       "INSTANCE_DISK_SIZE": ",".join(str(d["size"]) for d in self.disks),
3554       "INSTANCE_ADD_MODE": self.op.mode,
3555       }
3556     if self.op.mode == constants.INSTANCE_IMPORT:
3557       env["INSTANCE_SRC_NODE"] = self.op.src_node
3558       env["INSTANCE_SRC_PATH"] = self.op.src_path
3559       env["INSTANCE_SRC_IMAGES"] = self.src_images
3560
3561     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3562       primary_node=self.op.pnode,
3563       secondary_nodes=self.secondaries,
3564       status=self.instance_status,
3565       os_type=self.op.os_type,
3566       memory=self.be_full[constants.BE_MEMORY],
3567       vcpus=self.be_full[constants.BE_VCPUS],
3568       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
3569     ))
3570
3571     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3572           self.secondaries)
3573     return env, nl, nl
3574
3575
3576   def CheckPrereq(self):
3577     """Check prerequisites.
3578
3579     """
3580     if (not self.cfg.GetVGName() and
3581         self.op.disk_template not in constants.DTS_NOT_LVM):
3582       raise errors.OpPrereqError("Cluster does not support lvm-based"
3583                                  " instances")
3584
3585
3586     if self.op.mode == constants.INSTANCE_IMPORT:
3587       src_node = self.op.src_node
3588       src_path = self.op.src_path
3589
3590       export_info = self.rpc.call_export_info(src_node, src_path)
3591
3592       if not export_info:
3593         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3594
3595       if not export_info.has_section(constants.INISECT_EXP):
3596         raise errors.ProgrammerError("Corrupted export config")
3597
3598       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3599       if (int(ei_version) != constants.EXPORT_VERSION):
3600         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3601                                    (ei_version, constants.EXPORT_VERSION))
3602
3603       # Check that the new instance doesn't have less disks than the export
3604       instance_disks = len(self.disks)
3605       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
3606       if instance_disks < export_disks:
3607         raise errors.OpPrereqError("Not enough disks to import."
3608                                    " (instance: %d, export: %d)" %
3609                                    (2, export_disks))
3610
3611       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3612       disk_images = []
3613       for idx in range(export_disks):
3614         option = 'disk%d_dump' % idx
3615         if export_info.has_option(constants.INISECT_INS, option):
3616           # FIXME: are the old os-es, disk sizes, etc. useful?
3617           export_name = export_info.get(constants.INISECT_INS, option)
3618           image = os.path.join(src_path, export_name)
3619           disk_images.append(image)
3620         else:
3621           disk_images.append(False)
3622
3623       self.src_images = disk_images
3624
3625       old_name = export_info.get(constants.INISECT_INS, 'name')
3626       # FIXME: int() here could throw a ValueError on broken exports
3627       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
3628       if self.op.instance_name == old_name:
3629         for idx, nic in enumerate(self.nics):
3630           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
3631             nic_mac_ini = 'nic%d_mac' % idx
3632             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
3633
3634     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3635     if self.op.start and not self.op.ip_check:
3636       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3637                                  " adding an instance in start mode")
3638
3639     if self.op.ip_check:
3640       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3641         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3642                                    (self.check_ip, self.op.instance_name))
3643
3644     #### allocator run
3645
3646     if self.op.iallocator is not None:
3647       self._RunAllocator()
3648
3649     #### node related checks
3650
3651     # check primary node
3652     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3653     assert self.pnode is not None, \
3654       "Cannot retrieve locked node %s" % self.op.pnode
3655     self.secondaries = []
3656
3657     # mirror node verification
3658     if self.op.disk_template in constants.DTS_NET_MIRROR:
3659       if self.op.snode is None:
3660         raise errors.OpPrereqError("The networked disk templates need"
3661                                    " a mirror node")
3662       if self.op.snode == pnode.name:
3663         raise errors.OpPrereqError("The secondary node cannot be"
3664                                    " the primary node.")
3665       self.secondaries.append(self.op.snode)
3666
3667     nodenames = [pnode.name] + self.secondaries
3668
3669     req_size = _ComputeDiskSize(self.op.disk_template,
3670                                 self.disks)
3671
3672     # Check lv size requirements
3673     if req_size is not None:
3674       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3675                                          self.op.hypervisor)
3676       for node in nodenames:
3677         info = nodeinfo.get(node, None)
3678         if not info:
3679           raise errors.OpPrereqError("Cannot get current information"
3680                                      " from node '%s'" % node)
3681         vg_free = info.get('vg_free', None)
3682         if not isinstance(vg_free, int):
3683           raise errors.OpPrereqError("Can't compute free disk space on"
3684                                      " node %s" % node)
3685         if req_size > info['vg_free']:
3686           raise errors.OpPrereqError("Not enough disk space on target node %s."
3687                                      " %d MB available, %d MB required" %
3688                                      (node, info['vg_free'], req_size))
3689
3690     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3691
3692     # os verification
3693     os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3694     if not os_obj:
3695       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3696                                  " primary node"  % self.op.os_type)
3697
3698     # bridge check on primary node
3699     bridges = [n.bridge for n in self.nics]
3700     if not self.rpc.call_bridges_exist(self.pnode.name, bridges):
3701       raise errors.OpPrereqError("one of the target bridges '%s' does not"
3702                                  " exist on"
3703                                  " destination node '%s'" %
3704                                  (",".join(bridges), pnode.name))
3705
3706     # memory check on primary node
3707     if self.op.start:
3708       _CheckNodeFreeMemory(self, self.pnode.name,
3709                            "creating instance %s" % self.op.instance_name,
3710                            self.be_full[constants.BE_MEMORY],
3711                            self.op.hypervisor)
3712
3713     if self.op.start:
3714       self.instance_status = 'up'
3715     else:
3716       self.instance_status = 'down'
3717
3718   def Exec(self, feedback_fn):
3719     """Create and add the instance to the cluster.
3720
3721     """
3722     instance = self.op.instance_name
3723     pnode_name = self.pnode.name
3724
3725     for nic in self.nics:
3726       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
3727         nic.mac = self.cfg.GenerateMAC()
3728
3729     ht_kind = self.op.hypervisor
3730     if ht_kind in constants.HTS_REQ_PORT:
3731       network_port = self.cfg.AllocatePort()
3732     else:
3733       network_port = None
3734
3735     ##if self.op.vnc_bind_address is None:
3736     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3737
3738     # this is needed because os.path.join does not accept None arguments
3739     if self.op.file_storage_dir is None:
3740       string_file_storage_dir = ""
3741     else:
3742       string_file_storage_dir = self.op.file_storage_dir
3743
3744     # build the full file storage dir path
3745     file_storage_dir = os.path.normpath(os.path.join(
3746                                         self.cfg.GetFileStorageDir(),
3747                                         string_file_storage_dir, instance))
3748
3749
3750     disks = _GenerateDiskTemplate(self,
3751                                   self.op.disk_template,
3752                                   instance, pnode_name,
3753                                   self.secondaries,
3754                                   self.disks,
3755                                   file_storage_dir,
3756                                   self.op.file_driver,
3757                                   0)
3758
3759     iobj = objects.Instance(name=instance, os=self.op.os_type,
3760                             primary_node=pnode_name,
3761                             nics=self.nics, disks=disks,
3762                             disk_template=self.op.disk_template,
3763                             status=self.instance_status,
3764                             network_port=network_port,
3765                             beparams=self.op.beparams,
3766                             hvparams=self.op.hvparams,
3767                             hypervisor=self.op.hypervisor,
3768                             )
3769
3770     feedback_fn("* creating instance disks...")
3771     if not _CreateDisks(self, iobj):
3772       _RemoveDisks(self, iobj)
3773       self.cfg.ReleaseDRBDMinors(instance)
3774       raise errors.OpExecError("Device creation failed, reverting...")
3775
3776     feedback_fn("adding instance %s to cluster config" % instance)
3777
3778     self.cfg.AddInstance(iobj)
3779     # Declare that we don't want to remove the instance lock anymore, as we've
3780     # added the instance to the config
3781     del self.remove_locks[locking.LEVEL_INSTANCE]
3782     # Remove the temp. assignements for the instance's drbds
3783     self.cfg.ReleaseDRBDMinors(instance)
3784     # Unlock all the nodes
3785     self.context.glm.release(locking.LEVEL_NODE)
3786     del self.acquired_locks[locking.LEVEL_NODE]
3787
3788     if self.op.wait_for_sync:
3789       disk_abort = not _WaitForSync(self, iobj)
3790     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3791       # make sure the disks are not degraded (still sync-ing is ok)
3792       time.sleep(15)
3793       feedback_fn("* checking mirrors status")
3794       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3795     else:
3796       disk_abort = False
3797
3798     if disk_abort:
3799       _RemoveDisks(self, iobj)
3800       self.cfg.RemoveInstance(iobj.name)
3801       # Make sure the instance lock gets removed
3802       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3803       raise errors.OpExecError("There are some degraded disks for"
3804                                " this instance")
3805
3806     feedback_fn("creating os for instance %s on node %s" %
3807                 (instance, pnode_name))
3808
3809     if iobj.disk_template != constants.DT_DISKLESS:
3810       if self.op.mode == constants.INSTANCE_CREATE:
3811         feedback_fn("* running the instance OS create scripts...")
3812         if not self.rpc.call_instance_os_add(pnode_name, iobj):
3813           raise errors.OpExecError("could not add os for instance %s"
3814                                    " on node %s" %
3815                                    (instance, pnode_name))
3816
3817       elif self.op.mode == constants.INSTANCE_IMPORT:
3818         feedback_fn("* running the instance OS import scripts...")
3819         src_node = self.op.src_node
3820         src_images = self.src_images
3821         cluster_name = self.cfg.GetClusterName()
3822         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
3823                                                          src_node, src_images,
3824                                                          cluster_name)
3825         for idx, result in enumerate(import_result):
3826           if not result:
3827             self.LogWarning("Could not image %s for on instance %s, disk %d,"
3828                             " on node %s" % (src_images[idx], instance, idx,
3829                                              pnode_name))
3830       else:
3831         # also checked in the prereq part
3832         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3833                                      % self.op.mode)
3834
3835     if self.op.start:
3836       logging.info("Starting instance %s on node %s", instance, pnode_name)
3837       feedback_fn("* starting instance...")
3838       if not self.rpc.call_instance_start(pnode_name, iobj, None):
3839         raise errors.OpExecError("Could not start instance")
3840
3841
3842 class LUConnectConsole(NoHooksLU):
3843   """Connect to an instance's console.
3844
3845   This is somewhat special in that it returns the command line that
3846   you need to run on the master node in order to connect to the
3847   console.
3848
3849   """
3850   _OP_REQP = ["instance_name"]
3851   REQ_BGL = False
3852
3853   def ExpandNames(self):
3854     self._ExpandAndLockInstance()
3855
3856   def CheckPrereq(self):
3857     """Check prerequisites.
3858
3859     This checks that the instance is in the cluster.
3860
3861     """
3862     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3863     assert self.instance is not None, \
3864       "Cannot retrieve locked instance %s" % self.op.instance_name
3865
3866   def Exec(self, feedback_fn):
3867     """Connect to the console of an instance
3868
3869     """
3870     instance = self.instance
3871     node = instance.primary_node
3872
3873     node_insts = self.rpc.call_instance_list([node],
3874                                              [instance.hypervisor])[node]
3875     if node_insts is False:
3876       raise errors.OpExecError("Can't connect to node %s." % node)
3877
3878     if instance.name not in node_insts:
3879       raise errors.OpExecError("Instance %s is not running." % instance.name)
3880
3881     logging.debug("Connecting to console of %s on %s", instance.name, node)
3882
3883     hyper = hypervisor.GetHypervisor(instance.hypervisor)
3884     console_cmd = hyper.GetShellCommandForConsole(instance)
3885
3886     # build ssh cmdline
3887     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3888
3889
3890 class LUReplaceDisks(LogicalUnit):
3891   """Replace the disks of an instance.
3892
3893   """
3894   HPATH = "mirrors-replace"
3895   HTYPE = constants.HTYPE_INSTANCE
3896   _OP_REQP = ["instance_name", "mode", "disks"]
3897   REQ_BGL = False
3898
3899   def ExpandNames(self):
3900     self._ExpandAndLockInstance()
3901
3902     if not hasattr(self.op, "remote_node"):
3903       self.op.remote_node = None
3904
3905     ia_name = getattr(self.op, "iallocator", None)
3906     if ia_name is not None:
3907       if self.op.remote_node is not None:
3908         raise errors.OpPrereqError("Give either the iallocator or the new"
3909                                    " secondary, not both")
3910       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3911     elif self.op.remote_node is not None:
3912       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3913       if remote_node is None:
3914         raise errors.OpPrereqError("Node '%s' not known" %
3915                                    self.op.remote_node)
3916       self.op.remote_node = remote_node
3917       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3918       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3919     else:
3920       self.needed_locks[locking.LEVEL_NODE] = []
3921       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3922
3923   def DeclareLocks(self, level):
3924     # If we're not already locking all nodes in the set we have to declare the
3925     # instance's primary/secondary nodes.
3926     if (level == locking.LEVEL_NODE and
3927         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3928       self._LockInstancesNodes()
3929
3930   def _RunAllocator(self):
3931     """Compute a new secondary node using an IAllocator.
3932
3933     """
3934     ial = IAllocator(self,
3935                      mode=constants.IALLOCATOR_MODE_RELOC,
3936                      name=self.op.instance_name,
3937                      relocate_from=[self.sec_node])
3938
3939     ial.Run(self.op.iallocator)
3940
3941     if not ial.success:
3942       raise errors.OpPrereqError("Can't compute nodes using"
3943                                  " iallocator '%s': %s" % (self.op.iallocator,
3944                                                            ial.info))
3945     if len(ial.nodes) != ial.required_nodes:
3946       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3947                                  " of nodes (%s), required %s" %
3948                                  (len(ial.nodes), ial.required_nodes))
3949     self.op.remote_node = ial.nodes[0]
3950     self.LogInfo("Selected new secondary for the instance: %s",
3951                  self.op.remote_node)
3952
3953   def BuildHooksEnv(self):
3954     """Build hooks env.
3955
3956     This runs on the master, the primary and all the secondaries.
3957
3958     """
3959     env = {
3960       "MODE": self.op.mode,
3961       "NEW_SECONDARY": self.op.remote_node,
3962       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3963       }
3964     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3965     nl = [
3966       self.cfg.GetMasterNode(),
3967       self.instance.primary_node,
3968       ]
3969     if self.op.remote_node is not None:
3970       nl.append(self.op.remote_node)
3971     return env, nl, nl
3972
3973   def CheckPrereq(self):
3974     """Check prerequisites.
3975
3976     This checks that the instance is in the cluster.
3977
3978     """
3979     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3980     assert instance is not None, \
3981       "Cannot retrieve locked instance %s" % self.op.instance_name
3982     self.instance = instance
3983
3984     if instance.disk_template not in constants.DTS_NET_MIRROR:
3985       raise errors.OpPrereqError("Instance's disk layout is not"
3986                                  " network mirrored.")
3987
3988     if len(instance.secondary_nodes) != 1:
3989       raise errors.OpPrereqError("The instance has a strange layout,"
3990                                  " expected one secondary but found %d" %
3991                                  len(instance.secondary_nodes))
3992
3993     self.sec_node = instance.secondary_nodes[0]
3994
3995     ia_name = getattr(self.op, "iallocator", None)
3996     if ia_name is not None:
3997       self._RunAllocator()
3998
3999     remote_node = self.op.remote_node
4000     if remote_node is not None:
4001       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
4002       assert self.remote_node_info is not None, \
4003         "Cannot retrieve locked node %s" % remote_node
4004     else:
4005       self.remote_node_info = None
4006     if remote_node == instance.primary_node:
4007       raise errors.OpPrereqError("The specified node is the primary node of"
4008                                  " the instance.")
4009     elif remote_node == self.sec_node:
4010       if self.op.mode == constants.REPLACE_DISK_SEC:
4011         # this is for DRBD8, where we can't execute the same mode of
4012         # replacement as for drbd7 (no different port allocated)
4013         raise errors.OpPrereqError("Same secondary given, cannot execute"
4014                                    " replacement")
4015     if instance.disk_template == constants.DT_DRBD8:
4016       if (self.op.mode == constants.REPLACE_DISK_ALL and
4017           remote_node is not None):
4018         # switch to replace secondary mode
4019         self.op.mode = constants.REPLACE_DISK_SEC
4020
4021       if self.op.mode == constants.REPLACE_DISK_ALL:
4022         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
4023                                    " secondary disk replacement, not"
4024                                    " both at once")
4025       elif self.op.mode == constants.REPLACE_DISK_PRI:
4026         if remote_node is not None:
4027           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
4028                                      " the secondary while doing a primary"
4029                                      " node disk replacement")
4030         self.tgt_node = instance.primary_node
4031         self.oth_node = instance.secondary_nodes[0]
4032       elif self.op.mode == constants.REPLACE_DISK_SEC:
4033         self.new_node = remote_node # this can be None, in which case
4034                                     # we don't change the secondary
4035         self.tgt_node = instance.secondary_nodes[0]
4036         self.oth_node = instance.primary_node
4037       else:
4038         raise errors.ProgrammerError("Unhandled disk replace mode")
4039
4040     if not self.op.disks:
4041       self.op.disks = range(len(instance.disks))
4042
4043     for disk_idx in self.op.disks:
4044       instance.FindDisk(disk_idx)
4045
4046   def _ExecD8DiskOnly(self, feedback_fn):
4047     """Replace a disk on the primary or secondary for dbrd8.
4048
4049     The algorithm for replace is quite complicated:
4050
4051       1. for each disk to be replaced:
4052
4053         1. create new LVs on the target node with unique names
4054         1. detach old LVs from the drbd device
4055         1. rename old LVs to name_replaced.<time_t>
4056         1. rename new LVs to old LVs
4057         1. attach the new LVs (with the old names now) to the drbd device
4058
4059       1. wait for sync across all devices
4060
4061       1. for each modified disk:
4062
4063         1. remove old LVs (which have the name name_replaces.<time_t>)
4064
4065     Failures are not very well handled.
4066
4067     """
4068     steps_total = 6
4069     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4070     instance = self.instance
4071     iv_names = {}
4072     vgname = self.cfg.GetVGName()
4073     # start of work
4074     cfg = self.cfg
4075     tgt_node = self.tgt_node
4076     oth_node = self.oth_node
4077
4078     # Step: check device activation
4079     self.proc.LogStep(1, steps_total, "check device existence")
4080     info("checking volume groups")
4081     my_vg = cfg.GetVGName()
4082     results = self.rpc.call_vg_list([oth_node, tgt_node])
4083     if not results:
4084       raise errors.OpExecError("Can't list volume groups on the nodes")
4085     for node in oth_node, tgt_node:
4086       res = results.get(node, False)
4087       if not res or my_vg not in res:
4088         raise errors.OpExecError("Volume group '%s' not found on %s" %
4089                                  (my_vg, node))
4090     for idx, dev in enumerate(instance.disks):
4091       if idx not in self.op.disks:
4092         continue
4093       for node in tgt_node, oth_node:
4094         info("checking disk/%d on %s" % (idx, node))
4095         cfg.SetDiskID(dev, node)
4096         if not self.rpc.call_blockdev_find(node, dev):
4097           raise errors.OpExecError("Can't find disk/%d on node %s" %
4098                                    (idx, node))
4099
4100     # Step: check other node consistency
4101     self.proc.LogStep(2, steps_total, "check peer consistency")
4102     for idx, dev in enumerate(instance.disks):
4103       if idx not in self.op.disks:
4104         continue
4105       info("checking disk/%d consistency on %s" % (idx, oth_node))
4106       if not _CheckDiskConsistency(self, dev, oth_node,
4107                                    oth_node==instance.primary_node):
4108         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
4109                                  " to replace disks on this node (%s)" %
4110                                  (oth_node, tgt_node))
4111
4112     # Step: create new storage
4113     self.proc.LogStep(3, steps_total, "allocate new storage")
4114     for idx, dev in enumerate(instance.disks):
4115       if idx not in self.op.disks:
4116         continue
4117       size = dev.size
4118       cfg.SetDiskID(dev, tgt_node)
4119       lv_names = [".disk%d_%s" % (idx, suf)
4120                   for suf in ["data", "meta"]]
4121       names = _GenerateUniqueNames(self, lv_names)
4122       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4123                              logical_id=(vgname, names[0]))
4124       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4125                              logical_id=(vgname, names[1]))
4126       new_lvs = [lv_data, lv_meta]
4127       old_lvs = dev.children
4128       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
4129       info("creating new local storage on %s for %s" %
4130            (tgt_node, dev.iv_name))
4131       # since we *always* want to create this LV, we use the
4132       # _Create...OnPrimary (which forces the creation), even if we
4133       # are talking about the secondary node
4134       for new_lv in new_lvs:
4135         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
4136                                         _GetInstanceInfoText(instance)):
4137           raise errors.OpExecError("Failed to create new LV named '%s' on"
4138                                    " node '%s'" %
4139                                    (new_lv.logical_id[1], tgt_node))
4140
4141     # Step: for each lv, detach+rename*2+attach
4142     self.proc.LogStep(4, steps_total, "change drbd configuration")
4143     for dev, old_lvs, new_lvs in iv_names.itervalues():
4144       info("detaching %s drbd from local storage" % dev.iv_name)
4145       if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4146         raise errors.OpExecError("Can't detach drbd from local storage on node"
4147                                  " %s for device %s" % (tgt_node, dev.iv_name))
4148       #dev.children = []
4149       #cfg.Update(instance)
4150
4151       # ok, we created the new LVs, so now we know we have the needed
4152       # storage; as such, we proceed on the target node to rename
4153       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4154       # using the assumption that logical_id == physical_id (which in
4155       # turn is the unique_id on that node)
4156
4157       # FIXME(iustin): use a better name for the replaced LVs
4158       temp_suffix = int(time.time())
4159       ren_fn = lambda d, suff: (d.physical_id[0],
4160                                 d.physical_id[1] + "_replaced-%s" % suff)
4161       # build the rename list based on what LVs exist on the node
4162       rlist = []
4163       for to_ren in old_lvs:
4164         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4165         if find_res is not None: # device exists
4166           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4167
4168       info("renaming the old LVs on the target node")
4169       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4170         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4171       # now we rename the new LVs to the old LVs
4172       info("renaming the new LVs on the target node")
4173       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4174       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4175         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4176
4177       for old, new in zip(old_lvs, new_lvs):
4178         new.logical_id = old.logical_id
4179         cfg.SetDiskID(new, tgt_node)
4180
4181       for disk in old_lvs:
4182         disk.logical_id = ren_fn(disk, temp_suffix)
4183         cfg.SetDiskID(disk, tgt_node)
4184
4185       # now that the new lvs have the old name, we can add them to the device
4186       info("adding new mirror component on %s" % tgt_node)
4187       if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4188         for new_lv in new_lvs:
4189           if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4190             warning("Can't rollback device %s", hint="manually cleanup unused"
4191                     " logical volumes")
4192         raise errors.OpExecError("Can't add local storage to drbd")
4193
4194       dev.children = new_lvs
4195       cfg.Update(instance)
4196
4197     # Step: wait for sync
4198
4199     # this can fail as the old devices are degraded and _WaitForSync
4200     # does a combined result over all disks, so we don't check its
4201     # return value
4202     self.proc.LogStep(5, steps_total, "sync devices")
4203     _WaitForSync(self, instance, unlock=True)
4204
4205     # so check manually all the devices
4206     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4207       cfg.SetDiskID(dev, instance.primary_node)
4208       is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4209       if is_degr:
4210         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4211
4212     # Step: remove old storage
4213     self.proc.LogStep(6, steps_total, "removing old storage")
4214     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4215       info("remove logical volumes for %s" % name)
4216       for lv in old_lvs:
4217         cfg.SetDiskID(lv, tgt_node)
4218         if not self.rpc.call_blockdev_remove(tgt_node, lv):
4219           warning("Can't remove old LV", hint="manually remove unused LVs")
4220           continue
4221
4222   def _ExecD8Secondary(self, feedback_fn):
4223     """Replace the secondary node for drbd8.
4224
4225     The algorithm for replace is quite complicated:
4226       - for all disks of the instance:
4227         - create new LVs on the new node with same names
4228         - shutdown the drbd device on the old secondary
4229         - disconnect the drbd network on the primary
4230         - create the drbd device on the new secondary
4231         - network attach the drbd on the primary, using an artifice:
4232           the drbd code for Attach() will connect to the network if it
4233           finds a device which is connected to the good local disks but
4234           not network enabled
4235       - wait for sync across all devices
4236       - remove all disks from the old secondary
4237
4238     Failures are not very well handled.
4239
4240     """
4241     steps_total = 6
4242     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4243     instance = self.instance
4244     iv_names = {}
4245     vgname = self.cfg.GetVGName()
4246     # start of work
4247     cfg = self.cfg
4248     old_node = self.tgt_node
4249     new_node = self.new_node
4250     pri_node = instance.primary_node
4251
4252     # Step: check device activation
4253     self.proc.LogStep(1, steps_total, "check device existence")
4254     info("checking volume groups")
4255     my_vg = cfg.GetVGName()
4256     results = self.rpc.call_vg_list([pri_node, new_node])
4257     if not results:
4258       raise errors.OpExecError("Can't list volume groups on the nodes")
4259     for node in pri_node, new_node:
4260       res = results.get(node, False)
4261       if not res or my_vg not in res:
4262         raise errors.OpExecError("Volume group '%s' not found on %s" %
4263                                  (my_vg, node))
4264     for idx, dev in enumerate(instance.disks):
4265       if idx not in self.op.disks:
4266         continue
4267       info("checking disk/%d on %s" % (idx, pri_node))
4268       cfg.SetDiskID(dev, pri_node)
4269       if not self.rpc.call_blockdev_find(pri_node, dev):
4270         raise errors.OpExecError("Can't find disk/%d on node %s" %
4271                                  (idx, pri_node))
4272
4273     # Step: check other node consistency
4274     self.proc.LogStep(2, steps_total, "check peer consistency")
4275     for idx, dev in enumerate(instance.disks):
4276       if idx not in self.op.disks:
4277         continue
4278       info("checking disk/%d consistency on %s" % (idx, pri_node))
4279       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4280         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4281                                  " unsafe to replace the secondary" %
4282                                  pri_node)
4283
4284     # Step: create new storage
4285     self.proc.LogStep(3, steps_total, "allocate new storage")
4286     for idx, dev in enumerate(instance.disks):
4287       size = dev.size
4288       info("adding new local storage on %s for disk/%d" %
4289            (new_node, idx))
4290       # since we *always* want to create this LV, we use the
4291       # _Create...OnPrimary (which forces the creation), even if we
4292       # are talking about the secondary node
4293       for new_lv in dev.children:
4294         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4295                                         _GetInstanceInfoText(instance)):
4296           raise errors.OpExecError("Failed to create new LV named '%s' on"
4297                                    " node '%s'" %
4298                                    (new_lv.logical_id[1], new_node))
4299
4300     # Step 4: dbrd minors and drbd setups changes
4301     # after this, we must manually remove the drbd minors on both the
4302     # error and the success paths
4303     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4304                                    instance.name)
4305     logging.debug("Allocated minors %s" % (minors,))
4306     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4307     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
4308       size = dev.size
4309       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
4310       # create new devices on new_node
4311       if pri_node == dev.logical_id[0]:
4312         new_logical_id = (pri_node, new_node,
4313                           dev.logical_id[2], dev.logical_id[3], new_minor,
4314                           dev.logical_id[5])
4315       else:
4316         new_logical_id = (new_node, pri_node,
4317                           dev.logical_id[2], new_minor, dev.logical_id[4],
4318                           dev.logical_id[5])
4319       iv_names[idx] = (dev, dev.children, new_logical_id)
4320       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4321                     new_logical_id)
4322       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4323                               logical_id=new_logical_id,
4324                               children=dev.children)
4325       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4326                                         new_drbd, False,
4327                                         _GetInstanceInfoText(instance)):
4328         self.cfg.ReleaseDRBDMinors(instance.name)
4329         raise errors.OpExecError("Failed to create new DRBD on"
4330                                  " node '%s'" % new_node)
4331
4332     for idx, dev in enumerate(instance.disks):
4333       # we have new devices, shutdown the drbd on the old secondary
4334       info("shutting down drbd for disk/%d on old node" % idx)
4335       cfg.SetDiskID(dev, old_node)
4336       if not self.rpc.call_blockdev_shutdown(old_node, dev):
4337         warning("Failed to shutdown drbd for disk/%d on old node" % idx,
4338                 hint="Please cleanup this device manually as soon as possible")
4339
4340     info("detaching primary drbds from the network (=> standalone)")
4341     done = 0
4342     for idx, dev in enumerate(instance.disks):
4343       cfg.SetDiskID(dev, pri_node)
4344       # set the network part of the physical (unique in bdev terms) id
4345       # to None, meaning detach from network
4346       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4347       # and 'find' the device, which will 'fix' it to match the
4348       # standalone state
4349       if self.rpc.call_blockdev_find(pri_node, dev):
4350         done += 1
4351       else:
4352         warning("Failed to detach drbd disk/%d from network, unusual case" %
4353                 idx)
4354
4355     if not done:
4356       # no detaches succeeded (very unlikely)
4357       self.cfg.ReleaseDRBDMinors(instance.name)
4358       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4359
4360     # if we managed to detach at least one, we update all the disks of
4361     # the instance to point to the new secondary
4362     info("updating instance configuration")
4363     for dev, _, new_logical_id in iv_names.itervalues():
4364       dev.logical_id = new_logical_id
4365       cfg.SetDiskID(dev, pri_node)
4366     cfg.Update(instance)
4367     # we can remove now the temp minors as now the new values are
4368     # written to the config file (and therefore stable)
4369     self.cfg.ReleaseDRBDMinors(instance.name)
4370
4371     # and now perform the drbd attach
4372     info("attaching primary drbds to new secondary (standalone => connected)")
4373     failures = []
4374     for idx, dev in enumerate(instance.disks):
4375       info("attaching primary drbd for disk/%d to new secondary node" % idx)
4376       # since the attach is smart, it's enough to 'find' the device,
4377       # it will automatically activate the network, if the physical_id
4378       # is correct
4379       cfg.SetDiskID(dev, pri_node)
4380       logging.debug("Disk to attach: %s", dev)
4381       if not self.rpc.call_blockdev_find(pri_node, dev):
4382         warning("can't attach drbd disk/%d to new secondary!" % idx,
4383                 "please do a gnt-instance info to see the status of disks")
4384
4385     # this can fail as the old devices are degraded and _WaitForSync
4386     # does a combined result over all disks, so we don't check its
4387     # return value
4388     self.proc.LogStep(5, steps_total, "sync devices")
4389     _WaitForSync(self, instance, unlock=True)
4390
4391     # so check manually all the devices
4392     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4393       cfg.SetDiskID(dev, pri_node)
4394       is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4395       if is_degr:
4396         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
4397
4398     self.proc.LogStep(6, steps_total, "removing old storage")
4399     for idx, (dev, old_lvs, _) in iv_names.iteritems():
4400       info("remove logical volumes for disk/%d" % idx)
4401       for lv in old_lvs:
4402         cfg.SetDiskID(lv, old_node)
4403         if not self.rpc.call_blockdev_remove(old_node, lv):
4404           warning("Can't remove LV on old secondary",
4405                   hint="Cleanup stale volumes by hand")
4406
4407   def Exec(self, feedback_fn):
4408     """Execute disk replacement.
4409
4410     This dispatches the disk replacement to the appropriate handler.
4411
4412     """
4413     instance = self.instance
4414
4415     # Activate the instance disks if we're replacing them on a down instance
4416     if instance.status == "down":
4417       _StartInstanceDisks(self, instance, True)
4418
4419     if instance.disk_template == constants.DT_DRBD8:
4420       if self.op.remote_node is None:
4421         fn = self._ExecD8DiskOnly
4422       else:
4423         fn = self._ExecD8Secondary
4424     else:
4425       raise errors.ProgrammerError("Unhandled disk replacement case")
4426
4427     ret = fn(feedback_fn)
4428
4429     # Deactivate the instance disks if we're replacing them on a down instance
4430     if instance.status == "down":
4431       _SafeShutdownInstanceDisks(self, instance)
4432
4433     return ret
4434
4435
4436 class LUGrowDisk(LogicalUnit):
4437   """Grow a disk of an instance.
4438
4439   """
4440   HPATH = "disk-grow"
4441   HTYPE = constants.HTYPE_INSTANCE
4442   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4443   REQ_BGL = False
4444
4445   def ExpandNames(self):
4446     self._ExpandAndLockInstance()
4447     self.needed_locks[locking.LEVEL_NODE] = []
4448     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4449
4450   def DeclareLocks(self, level):
4451     if level == locking.LEVEL_NODE:
4452       self._LockInstancesNodes()
4453
4454   def BuildHooksEnv(self):
4455     """Build hooks env.
4456
4457     This runs on the master, the primary and all the secondaries.
4458
4459     """
4460     env = {
4461       "DISK": self.op.disk,
4462       "AMOUNT": self.op.amount,
4463       }
4464     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4465     nl = [
4466       self.cfg.GetMasterNode(),
4467       self.instance.primary_node,
4468       ]
4469     return env, nl, nl
4470
4471   def CheckPrereq(self):
4472     """Check prerequisites.
4473
4474     This checks that the instance is in the cluster.
4475
4476     """
4477     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4478     assert instance is not None, \
4479       "Cannot retrieve locked instance %s" % self.op.instance_name
4480
4481     self.instance = instance
4482
4483     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4484       raise errors.OpPrereqError("Instance's disk layout does not support"
4485                                  " growing.")
4486
4487     self.disk = instance.FindDisk(self.op.disk)
4488
4489     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4490     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4491                                        instance.hypervisor)
4492     for node in nodenames:
4493       info = nodeinfo.get(node, None)
4494       if not info:
4495         raise errors.OpPrereqError("Cannot get current information"
4496                                    " from node '%s'" % node)
4497       vg_free = info.get('vg_free', None)
4498       if not isinstance(vg_free, int):
4499         raise errors.OpPrereqError("Can't compute free disk space on"
4500                                    " node %s" % node)
4501       if self.op.amount > info['vg_free']:
4502         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4503                                    " %d MiB available, %d MiB required" %
4504                                    (node, info['vg_free'], self.op.amount))
4505
4506   def Exec(self, feedback_fn):
4507     """Execute disk grow.
4508
4509     """
4510     instance = self.instance
4511     disk = self.disk
4512     for node in (instance.secondary_nodes + (instance.primary_node,)):
4513       self.cfg.SetDiskID(disk, node)
4514       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4515       if (not result or not isinstance(result, (list, tuple)) or
4516           len(result) != 2):
4517         raise errors.OpExecError("grow request failed to node %s" % node)
4518       elif not result[0]:
4519         raise errors.OpExecError("grow request failed to node %s: %s" %
4520                                  (node, result[1]))
4521     disk.RecordGrow(self.op.amount)
4522     self.cfg.Update(instance)
4523     if self.op.wait_for_sync:
4524       disk_abort = not _WaitForSync(self, instance)
4525       if disk_abort:
4526         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
4527                              " status.\nPlease check the instance.")
4528
4529
4530 class LUQueryInstanceData(NoHooksLU):
4531   """Query runtime instance data.
4532
4533   """
4534   _OP_REQP = ["instances", "static"]
4535   REQ_BGL = False
4536
4537   def ExpandNames(self):
4538     self.needed_locks = {}
4539     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4540
4541     if not isinstance(self.op.instances, list):
4542       raise errors.OpPrereqError("Invalid argument type 'instances'")
4543
4544     if self.op.instances:
4545       self.wanted_names = []
4546       for name in self.op.instances:
4547         full_name = self.cfg.ExpandInstanceName(name)
4548         if full_name is None:
4549           raise errors.OpPrereqError("Instance '%s' not known" %
4550                                      self.op.instance_name)
4551         self.wanted_names.append(full_name)
4552       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4553     else:
4554       self.wanted_names = None
4555       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4556
4557     self.needed_locks[locking.LEVEL_NODE] = []
4558     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4559
4560   def DeclareLocks(self, level):
4561     if level == locking.LEVEL_NODE:
4562       self._LockInstancesNodes()
4563
4564   def CheckPrereq(self):
4565     """Check prerequisites.
4566
4567     This only checks the optional instance list against the existing names.
4568
4569     """
4570     if self.wanted_names is None:
4571       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4572
4573     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4574                              in self.wanted_names]
4575     return
4576
4577   def _ComputeDiskStatus(self, instance, snode, dev):
4578     """Compute block device status.
4579
4580     """
4581     static = self.op.static
4582     if not static:
4583       self.cfg.SetDiskID(dev, instance.primary_node)
4584       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4585     else:
4586       dev_pstatus = None
4587
4588     if dev.dev_type in constants.LDS_DRBD:
4589       # we change the snode then (otherwise we use the one passed in)
4590       if dev.logical_id[0] == instance.primary_node:
4591         snode = dev.logical_id[1]
4592       else:
4593         snode = dev.logical_id[0]
4594
4595     if snode and not static:
4596       self.cfg.SetDiskID(dev, snode)
4597       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4598     else:
4599       dev_sstatus = None
4600
4601     if dev.children:
4602       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4603                       for child in dev.children]
4604     else:
4605       dev_children = []
4606
4607     data = {
4608       "iv_name": dev.iv_name,
4609       "dev_type": dev.dev_type,
4610       "logical_id": dev.logical_id,
4611       "physical_id": dev.physical_id,
4612       "pstatus": dev_pstatus,
4613       "sstatus": dev_sstatus,
4614       "children": dev_children,
4615       "mode": dev.mode,
4616       }
4617
4618     return data
4619
4620   def Exec(self, feedback_fn):
4621     """Gather and return data"""
4622     result = {}
4623
4624     cluster = self.cfg.GetClusterInfo()
4625
4626     for instance in self.wanted_instances:
4627       if not self.op.static:
4628         remote_info = self.rpc.call_instance_info(instance.primary_node,
4629                                                   instance.name,
4630                                                   instance.hypervisor)
4631         if remote_info and "state" in remote_info:
4632           remote_state = "up"
4633         else:
4634           remote_state = "down"
4635       else:
4636         remote_state = None
4637       if instance.status == "down":
4638         config_state = "down"
4639       else:
4640         config_state = "up"
4641
4642       disks = [self._ComputeDiskStatus(instance, None, device)
4643                for device in instance.disks]
4644
4645       idict = {
4646         "name": instance.name,
4647         "config_state": config_state,
4648         "run_state": remote_state,
4649         "pnode": instance.primary_node,
4650         "snodes": instance.secondary_nodes,
4651         "os": instance.os,
4652         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4653         "disks": disks,
4654         "hypervisor": instance.hypervisor,
4655         "network_port": instance.network_port,
4656         "hv_instance": instance.hvparams,
4657         "hv_actual": cluster.FillHV(instance),
4658         "be_instance": instance.beparams,
4659         "be_actual": cluster.FillBE(instance),
4660         }
4661
4662       result[instance.name] = idict
4663
4664     return result
4665
4666
4667 class LUSetInstanceParams(LogicalUnit):
4668   """Modifies an instances's parameters.
4669
4670   """
4671   HPATH = "instance-modify"
4672   HTYPE = constants.HTYPE_INSTANCE
4673   _OP_REQP = ["instance_name"]
4674   REQ_BGL = False
4675
4676   def CheckArguments(self):
4677     if not hasattr(self.op, 'nics'):
4678       self.op.nics = []
4679     if not hasattr(self.op, 'disks'):
4680       self.op.disks = []
4681     if not hasattr(self.op, 'beparams'):
4682       self.op.beparams = {}
4683     if not hasattr(self.op, 'hvparams'):
4684       self.op.hvparams = {}
4685     self.op.force = getattr(self.op, "force", False)
4686     if not (self.op.nics or self.op.disks or
4687             self.op.hvparams or self.op.beparams):
4688       raise errors.OpPrereqError("No changes submitted")
4689
4690     for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4691       val = self.op.beparams.get(item, None)
4692       if val is not None:
4693         try:
4694           val = int(val)
4695         except ValueError, err:
4696           raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4697         self.op.beparams[item] = val
4698     # Disk validation
4699     disk_addremove = 0
4700     for disk_op, disk_dict in self.op.disks:
4701       if disk_op == constants.DDM_REMOVE:
4702         disk_addremove += 1
4703         continue
4704       elif disk_op == constants.DDM_ADD:
4705         disk_addremove += 1
4706       else:
4707         if not isinstance(disk_op, int):
4708           raise errors.OpPrereqError("Invalid disk index")
4709       if disk_op == constants.DDM_ADD:
4710         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
4711         if mode not in (constants.DISK_RDONLY, constants.DISK_RDWR):
4712           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
4713         size = disk_dict.get('size', None)
4714         if size is None:
4715           raise errors.OpPrereqError("Required disk parameter size missing")
4716         try:
4717           size = int(size)
4718         except ValueError, err:
4719           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
4720                                      str(err))
4721         disk_dict['size'] = size
4722       else:
4723         # modification of disk
4724         if 'size' in disk_dict:
4725           raise errors.OpPrereqError("Disk size change not possible, use"
4726                                      " grow-disk")
4727
4728     if disk_addremove > 1:
4729       raise errors.OpPrereqError("Only one disk add or remove operation"
4730                                  " supported at a time")
4731
4732     # NIC validation
4733     nic_addremove = 0
4734     for nic_op, nic_dict in self.op.nics:
4735       if nic_op == constants.DDM_REMOVE:
4736         nic_addremove += 1
4737         continue
4738       elif nic_op == constants.DDM_ADD:
4739         nic_addremove += 1
4740       else:
4741         if not isinstance(nic_op, int):
4742           raise errors.OpPrereqError("Invalid nic index")
4743
4744       # nic_dict should be a dict
4745       nic_ip = nic_dict.get('ip', None)
4746       if nic_ip is not None:
4747         if nic_ip.lower() == "none":
4748           nic_dict['ip'] = None
4749         else:
4750           if not utils.IsValidIP(nic_ip):
4751             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
4752       # we can only check None bridges and assign the default one
4753       nic_bridge = nic_dict.get('bridge', None)
4754       if nic_bridge is None:
4755         nic_dict['bridge'] = self.cfg.GetDefBridge()
4756       # but we can validate MACs
4757       nic_mac = nic_dict.get('mac', None)
4758       if nic_mac is not None:
4759         if self.cfg.IsMacInUse(nic_mac):
4760           raise errors.OpPrereqError("MAC address %s already in use"
4761                                      " in cluster" % nic_mac)
4762         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4763           if not utils.IsValidMac(nic_mac):
4764             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
4765     if nic_addremove > 1:
4766       raise errors.OpPrereqError("Only one NIC add or remove operation"
4767                                  " supported at a time")
4768
4769   def ExpandNames(self):
4770     self._ExpandAndLockInstance()
4771     self.needed_locks[locking.LEVEL_NODE] = []
4772     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4773
4774   def DeclareLocks(self, level):
4775     if level == locking.LEVEL_NODE:
4776       self._LockInstancesNodes()
4777
4778   def BuildHooksEnv(self):
4779     """Build hooks env.
4780
4781     This runs on the master, primary and secondaries.
4782
4783     """
4784     args = dict()
4785     if constants.BE_MEMORY in self.be_new:
4786       args['memory'] = self.be_new[constants.BE_MEMORY]
4787     if constants.BE_VCPUS in self.be_new:
4788       args['vcpus'] = self.be_new[constants.BE_VCPUS]
4789     # FIXME: readd disk/nic changes
4790     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4791     nl = [self.cfg.GetMasterNode(),
4792           self.instance.primary_node] + list(self.instance.secondary_nodes)
4793     return env, nl, nl
4794
4795   def CheckPrereq(self):
4796     """Check prerequisites.
4797
4798     This only checks the instance list against the existing names.
4799
4800     """
4801     force = self.force = self.op.force
4802
4803     # checking the new params on the primary/secondary nodes
4804
4805     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4806     assert self.instance is not None, \
4807       "Cannot retrieve locked instance %s" % self.op.instance_name
4808     pnode = self.instance.primary_node
4809     nodelist = [pnode]
4810     nodelist.extend(instance.secondary_nodes)
4811
4812     # hvparams processing
4813     if self.op.hvparams:
4814       i_hvdict = copy.deepcopy(instance.hvparams)
4815       for key, val in self.op.hvparams.iteritems():
4816         if val is None:
4817           try:
4818             del i_hvdict[key]
4819           except KeyError:
4820             pass
4821         else:
4822           i_hvdict[key] = val
4823       cluster = self.cfg.GetClusterInfo()
4824       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4825                                 i_hvdict)
4826       # local check
4827       hypervisor.GetHypervisor(
4828         instance.hypervisor).CheckParameterSyntax(hv_new)
4829       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4830       self.hv_new = hv_new # the new actual values
4831       self.hv_inst = i_hvdict # the new dict (without defaults)
4832     else:
4833       self.hv_new = self.hv_inst = {}
4834
4835     # beparams processing
4836     if self.op.beparams:
4837       i_bedict = copy.deepcopy(instance.beparams)
4838       for key, val in self.op.beparams.iteritems():
4839         if val is None:
4840           try:
4841             del i_bedict[key]
4842           except KeyError:
4843             pass
4844         else:
4845           i_bedict[key] = val
4846       cluster = self.cfg.GetClusterInfo()
4847       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4848                                 i_bedict)
4849       self.be_new = be_new # the new actual values
4850       self.be_inst = i_bedict # the new dict (without defaults)
4851     else:
4852       self.be_new = self.be_inst = {}
4853
4854     self.warn = []
4855
4856     if constants.BE_MEMORY in self.op.beparams and not self.force:
4857       mem_check_list = [pnode]
4858       if be_new[constants.BE_AUTO_BALANCE]:
4859         # either we changed auto_balance to yes or it was from before
4860         mem_check_list.extend(instance.secondary_nodes)
4861       instance_info = self.rpc.call_instance_info(pnode, instance.name,
4862                                                   instance.hypervisor)
4863       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4864                                          instance.hypervisor)
4865
4866       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4867         # Assume the primary node is unreachable and go ahead
4868         self.warn.append("Can't get info from primary node %s" % pnode)
4869       else:
4870         if instance_info:
4871           current_mem = instance_info['memory']
4872         else:
4873           # Assume instance not running
4874           # (there is a slight race condition here, but it's not very probable,
4875           # and we have no other way to check)
4876           current_mem = 0
4877         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4878                     nodeinfo[pnode]['memory_free'])
4879         if miss_mem > 0:
4880           raise errors.OpPrereqError("This change will prevent the instance"
4881                                      " from starting, due to %d MB of memory"
4882                                      " missing on its primary node" % miss_mem)
4883
4884       if be_new[constants.BE_AUTO_BALANCE]:
4885         for node in instance.secondary_nodes:
4886           if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4887             self.warn.append("Can't get info from secondary node %s" % node)
4888           elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4889             self.warn.append("Not enough memory to failover instance to"
4890                              " secondary node %s" % node)
4891
4892     # NIC processing
4893     for nic_op, nic_dict in self.op.nics:
4894       if nic_op == constants.DDM_REMOVE:
4895         if not instance.nics:
4896           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
4897         continue
4898       if nic_op != constants.DDM_ADD:
4899         # an existing nic
4900         if nic_op < 0 or nic_op >= len(instance.nics):
4901           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
4902                                      " are 0 to %d" %
4903                                      (nic_op, len(instance.nics)))
4904       nic_bridge = nic_dict.get('bridge', None)
4905       if nic_bridge is not None:
4906         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
4907           msg = ("Bridge '%s' doesn't exist on one of"
4908                  " the instance nodes" % nic_bridge)
4909           if self.force:
4910             self.warn.append(msg)
4911           else:
4912             raise errors.OpPrereqError(msg)
4913
4914     # DISK processing
4915     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
4916       raise errors.OpPrereqError("Disk operations not supported for"
4917                                  " diskless instances")
4918     for disk_op, disk_dict in self.op.disks:
4919       if disk_op == constants.DDM_REMOVE:
4920         if len(instance.disks) == 1:
4921           raise errors.OpPrereqError("Cannot remove the last disk of"
4922                                      " an instance")
4923         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
4924         ins_l = ins_l[pnode]
4925         if not type(ins_l) is list:
4926           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
4927         if instance.name in ins_l:
4928           raise errors.OpPrereqError("Instance is running, can't remove"
4929                                      " disks.")
4930
4931       if (disk_op == constants.DDM_ADD and
4932           len(instance.nics) >= constants.MAX_DISKS):
4933         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
4934                                    " add more" % constants.MAX_DISKS)
4935       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
4936         # an existing disk
4937         if disk_op < 0 or disk_op >= len(instance.disks):
4938           raise errors.OpPrereqError("Invalid disk index %s, valid values"
4939                                      " are 0 to %d" %
4940                                      (disk_op, len(instance.disks)))
4941
4942     return
4943
4944   def Exec(self, feedback_fn):
4945     """Modifies an instance.
4946
4947     All parameters take effect only at the next restart of the instance.
4948
4949     """
4950     # Process here the warnings from CheckPrereq, as we don't have a
4951     # feedback_fn there.
4952     for warn in self.warn:
4953       feedback_fn("WARNING: %s" % warn)
4954
4955     result = []
4956     instance = self.instance
4957     # disk changes
4958     for disk_op, disk_dict in self.op.disks:
4959       if disk_op == constants.DDM_REMOVE:
4960         # remove the last disk
4961         device = instance.disks.pop()
4962         device_idx = len(instance.disks)
4963         for node, disk in device.ComputeNodeTree(instance.primary_node):
4964           self.cfg.SetDiskID(disk, node)
4965           if not self.rpc.call_blockdev_remove(node, disk):
4966             self.proc.LogWarning("Could not remove disk/%d on node %s,"
4967                                  " continuing anyway", device_idx, node)
4968         result.append(("disk/%d" % device_idx, "remove"))
4969       elif disk_op == constants.DDM_ADD:
4970         # add a new disk
4971         if instance.disk_template == constants.DT_FILE:
4972           file_driver, file_path = instance.disks[0].logical_id
4973           file_path = os.path.dirname(file_path)
4974         else:
4975           file_driver = file_path = None
4976         disk_idx_base = len(instance.disks)
4977         new_disk = _GenerateDiskTemplate(self,
4978                                          instance.disk_template,
4979                                          instance, instance.primary_node,
4980                                          instance.secondary_nodes,
4981                                          [disk_dict],
4982                                          file_path,
4983                                          file_driver,
4984                                          disk_idx_base)[0]
4985         new_disk.mode = disk_dict['mode']
4986         instance.disks.append(new_disk)
4987         info = _GetInstanceInfoText(instance)
4988
4989         logging.info("Creating volume %s for instance %s",
4990                      new_disk.iv_name, instance.name)
4991         # Note: this needs to be kept in sync with _CreateDisks
4992         #HARDCODE
4993         for secondary_node in instance.secondary_nodes:
4994           if not _CreateBlockDevOnSecondary(self, secondary_node, instance,
4995                                             new_disk, False, info):
4996             self.LogWarning("Failed to create volume %s (%s) on"
4997                             " secondary node %s!",
4998                             new_disk.iv_name, new_disk, secondary_node)
4999         #HARDCODE
5000         if not _CreateBlockDevOnPrimary(self, instance.primary_node,
5001                                         instance, new_disk, info):
5002           self.LogWarning("Failed to create volume %s on primary!",
5003                           new_disk.iv_name)
5004         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
5005                        (new_disk.size, new_disk.mode)))
5006       else:
5007         # change a given disk
5008         instance.disks[disk_op].mode = disk_dict['mode']
5009         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
5010     # NIC changes
5011     for nic_op, nic_dict in self.op.nics:
5012       if nic_op == constants.DDM_REMOVE:
5013         # remove the last nic
5014         del instance.nics[-1]
5015         result.append(("nic.%d" % len(instance.nics), "remove"))
5016       elif nic_op == constants.DDM_ADD:
5017         # add a new nic
5018         if 'mac' not in nic_dict:
5019           mac = constants.VALUE_GENERATE
5020         else:
5021           mac = nic_dict['mac']
5022         if mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5023           mac = self.cfg.GenerateMAC()
5024         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
5025                               bridge=nic_dict.get('bridge', None))
5026         instance.nics.append(new_nic)
5027         result.append(("nic.%d" % (len(instance.nics) - 1),
5028                        "add:mac=%s,ip=%s,bridge=%s" %
5029                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
5030       else:
5031         # change a given nic
5032         for key in 'mac', 'ip', 'bridge':
5033           if key in nic_dict:
5034             setattr(instance.nics[nic_op], key, nic_dict[key])
5035             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
5036
5037     # hvparams changes
5038     if self.op.hvparams:
5039       instance.hvparams = self.hv_new
5040       for key, val in self.op.hvparams.iteritems():
5041         result.append(("hv/%s" % key, val))
5042
5043     # beparams changes
5044     if self.op.beparams:
5045       instance.beparams = self.be_inst
5046       for key, val in self.op.beparams.iteritems():
5047         result.append(("be/%s" % key, val))
5048
5049     self.cfg.Update(instance)
5050
5051     return result
5052
5053
5054 class LUQueryExports(NoHooksLU):
5055   """Query the exports list
5056
5057   """
5058   _OP_REQP = ['nodes']
5059   REQ_BGL = False
5060
5061   def ExpandNames(self):
5062     self.needed_locks = {}
5063     self.share_locks[locking.LEVEL_NODE] = 1
5064     if not self.op.nodes:
5065       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5066     else:
5067       self.needed_locks[locking.LEVEL_NODE] = \
5068         _GetWantedNodes(self, self.op.nodes)
5069
5070   def CheckPrereq(self):
5071     """Check prerequisites.
5072
5073     """
5074     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
5075
5076   def Exec(self, feedback_fn):
5077     """Compute the list of all the exported system images.
5078
5079     @rtype: dict
5080     @return: a dictionary with the structure node->(export-list)
5081         where export-list is a list of the instances exported on
5082         that node.
5083
5084     """
5085     return self.rpc.call_export_list(self.nodes)
5086
5087
5088 class LUExportInstance(LogicalUnit):
5089   """Export an instance to an image in the cluster.
5090
5091   """
5092   HPATH = "instance-export"
5093   HTYPE = constants.HTYPE_INSTANCE
5094   _OP_REQP = ["instance_name", "target_node", "shutdown"]
5095   REQ_BGL = False
5096
5097   def ExpandNames(self):
5098     self._ExpandAndLockInstance()
5099     # FIXME: lock only instance primary and destination node
5100     #
5101     # Sad but true, for now we have do lock all nodes, as we don't know where
5102     # the previous export might be, and and in this LU we search for it and
5103     # remove it from its current node. In the future we could fix this by:
5104     #  - making a tasklet to search (share-lock all), then create the new one,
5105     #    then one to remove, after
5106     #  - removing the removal operation altoghether
5107     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5108
5109   def DeclareLocks(self, level):
5110     """Last minute lock declaration."""
5111     # All nodes are locked anyway, so nothing to do here.
5112
5113   def BuildHooksEnv(self):
5114     """Build hooks env.
5115
5116     This will run on the master, primary node and target node.
5117
5118     """
5119     env = {
5120       "EXPORT_NODE": self.op.target_node,
5121       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
5122       }
5123     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5124     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
5125           self.op.target_node]
5126     return env, nl, nl
5127
5128   def CheckPrereq(self):
5129     """Check prerequisites.
5130
5131     This checks that the instance and node names are valid.
5132
5133     """
5134     instance_name = self.op.instance_name
5135     self.instance = self.cfg.GetInstanceInfo(instance_name)
5136     assert self.instance is not None, \
5137           "Cannot retrieve locked instance %s" % self.op.instance_name
5138
5139     self.dst_node = self.cfg.GetNodeInfo(
5140       self.cfg.ExpandNodeName(self.op.target_node))
5141
5142     assert self.dst_node is not None, \
5143           "Cannot retrieve locked node %s" % self.op.target_node
5144
5145     # instance disk type verification
5146     for disk in self.instance.disks:
5147       if disk.dev_type == constants.LD_FILE:
5148         raise errors.OpPrereqError("Export not supported for instances with"
5149                                    " file-based disks")
5150
5151   def Exec(self, feedback_fn):
5152     """Export an instance to an image in the cluster.
5153
5154     """
5155     instance = self.instance
5156     dst_node = self.dst_node
5157     src_node = instance.primary_node
5158     if self.op.shutdown:
5159       # shutdown the instance, but not the disks
5160       if not self.rpc.call_instance_shutdown(src_node, instance):
5161         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
5162                                  (instance.name, src_node))
5163
5164     vgname = self.cfg.GetVGName()
5165
5166     snap_disks = []
5167
5168     try:
5169       for disk in instance.disks:
5170         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
5171         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
5172
5173         if not new_dev_name:
5174           self.LogWarning("Could not snapshot block device %s on node %s",
5175                           disk.logical_id[1], src_node)
5176           snap_disks.append(False)
5177         else:
5178           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
5179                                  logical_id=(vgname, new_dev_name),
5180                                  physical_id=(vgname, new_dev_name),
5181                                  iv_name=disk.iv_name)
5182           snap_disks.append(new_dev)
5183
5184     finally:
5185       if self.op.shutdown and instance.status == "up":
5186         if not self.rpc.call_instance_start(src_node, instance, None):
5187           _ShutdownInstanceDisks(self, instance)
5188           raise errors.OpExecError("Could not start instance")
5189
5190     # TODO: check for size
5191
5192     cluster_name = self.cfg.GetClusterName()
5193     for idx, dev in enumerate(snap_disks):
5194       if dev:
5195         if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
5196                                              instance, cluster_name, idx):
5197           self.LogWarning("Could not export block device %s from node %s to"
5198                           " node %s", dev.logical_id[1], src_node,
5199                           dst_node.name)
5200         if not self.rpc.call_blockdev_remove(src_node, dev):
5201           self.LogWarning("Could not remove snapshot block device %s from node"
5202                           " %s", dev.logical_id[1], src_node)
5203
5204     if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
5205       self.LogWarning("Could not finalize export for instance %s on node %s",
5206                       instance.name, dst_node.name)
5207
5208     nodelist = self.cfg.GetNodeList()
5209     nodelist.remove(dst_node.name)
5210
5211     # on one-node clusters nodelist will be empty after the removal
5212     # if we proceed the backup would be removed because OpQueryExports
5213     # substitutes an empty list with the full cluster node list.
5214     if nodelist:
5215       exportlist = self.rpc.call_export_list(nodelist)
5216       for node in exportlist:
5217         if instance.name in exportlist[node]:
5218           if not self.rpc.call_export_remove(node, instance.name):
5219             self.LogWarning("Could not remove older export for instance %s"
5220                             " on node %s", instance.name, node)
5221
5222
5223 class LURemoveExport(NoHooksLU):
5224   """Remove exports related to the named instance.
5225
5226   """
5227   _OP_REQP = ["instance_name"]
5228   REQ_BGL = False
5229
5230   def ExpandNames(self):
5231     self.needed_locks = {}
5232     # We need all nodes to be locked in order for RemoveExport to work, but we
5233     # don't need to lock the instance itself, as nothing will happen to it (and
5234     # we can remove exports also for a removed instance)
5235     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5236
5237   def CheckPrereq(self):
5238     """Check prerequisites.
5239     """
5240     pass
5241
5242   def Exec(self, feedback_fn):
5243     """Remove any export.
5244
5245     """
5246     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
5247     # If the instance was not found we'll try with the name that was passed in.
5248     # This will only work if it was an FQDN, though.
5249     fqdn_warn = False
5250     if not instance_name:
5251       fqdn_warn = True
5252       instance_name = self.op.instance_name
5253
5254     exportlist = self.rpc.call_export_list(self.acquired_locks[
5255       locking.LEVEL_NODE])
5256     found = False
5257     for node in exportlist:
5258       if instance_name in exportlist[node]:
5259         found = True
5260         if not self.rpc.call_export_remove(node, instance_name):
5261           logging.error("Could not remove export for instance %s"
5262                         " on node %s", instance_name, node)
5263
5264     if fqdn_warn and not found:
5265       feedback_fn("Export not found. If trying to remove an export belonging"
5266                   " to a deleted instance please use its Fully Qualified"
5267                   " Domain Name.")
5268
5269
5270 class TagsLU(NoHooksLU):
5271   """Generic tags LU.
5272
5273   This is an abstract class which is the parent of all the other tags LUs.
5274
5275   """
5276
5277   def ExpandNames(self):
5278     self.needed_locks = {}
5279     if self.op.kind == constants.TAG_NODE:
5280       name = self.cfg.ExpandNodeName(self.op.name)
5281       if name is None:
5282         raise errors.OpPrereqError("Invalid node name (%s)" %
5283                                    (self.op.name,))
5284       self.op.name = name
5285       self.needed_locks[locking.LEVEL_NODE] = name
5286     elif self.op.kind == constants.TAG_INSTANCE:
5287       name = self.cfg.ExpandInstanceName(self.op.name)
5288       if name is None:
5289         raise errors.OpPrereqError("Invalid instance name (%s)" %
5290                                    (self.op.name,))
5291       self.op.name = name
5292       self.needed_locks[locking.LEVEL_INSTANCE] = name
5293
5294   def CheckPrereq(self):
5295     """Check prerequisites.
5296
5297     """
5298     if self.op.kind == constants.TAG_CLUSTER:
5299       self.target = self.cfg.GetClusterInfo()
5300     elif self.op.kind == constants.TAG_NODE:
5301       self.target = self.cfg.GetNodeInfo(self.op.name)
5302     elif self.op.kind == constants.TAG_INSTANCE:
5303       self.target = self.cfg.GetInstanceInfo(self.op.name)
5304     else:
5305       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5306                                  str(self.op.kind))
5307
5308
5309 class LUGetTags(TagsLU):
5310   """Returns the tags of a given object.
5311
5312   """
5313   _OP_REQP = ["kind", "name"]
5314   REQ_BGL = False
5315
5316   def Exec(self, feedback_fn):
5317     """Returns the tag list.
5318
5319     """
5320     return list(self.target.GetTags())
5321
5322
5323 class LUSearchTags(NoHooksLU):
5324   """Searches the tags for a given pattern.
5325
5326   """
5327   _OP_REQP = ["pattern"]
5328   REQ_BGL = False
5329
5330   def ExpandNames(self):
5331     self.needed_locks = {}
5332
5333   def CheckPrereq(self):
5334     """Check prerequisites.
5335
5336     This checks the pattern passed for validity by compiling it.
5337
5338     """
5339     try:
5340       self.re = re.compile(self.op.pattern)
5341     except re.error, err:
5342       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5343                                  (self.op.pattern, err))
5344
5345   def Exec(self, feedback_fn):
5346     """Returns the tag list.
5347
5348     """
5349     cfg = self.cfg
5350     tgts = [("/cluster", cfg.GetClusterInfo())]
5351     ilist = cfg.GetAllInstancesInfo().values()
5352     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5353     nlist = cfg.GetAllNodesInfo().values()
5354     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5355     results = []
5356     for path, target in tgts:
5357       for tag in target.GetTags():
5358         if self.re.search(tag):
5359           results.append((path, tag))
5360     return results
5361
5362
5363 class LUAddTags(TagsLU):
5364   """Sets a tag on a given object.
5365
5366   """
5367   _OP_REQP = ["kind", "name", "tags"]
5368   REQ_BGL = False
5369
5370   def CheckPrereq(self):
5371     """Check prerequisites.
5372
5373     This checks the type and length of the tag name and value.
5374
5375     """
5376     TagsLU.CheckPrereq(self)
5377     for tag in self.op.tags:
5378       objects.TaggableObject.ValidateTag(tag)
5379
5380   def Exec(self, feedback_fn):
5381     """Sets the tag.
5382
5383     """
5384     try:
5385       for tag in self.op.tags:
5386         self.target.AddTag(tag)
5387     except errors.TagError, err:
5388       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5389     try:
5390       self.cfg.Update(self.target)
5391     except errors.ConfigurationError:
5392       raise errors.OpRetryError("There has been a modification to the"
5393                                 " config file and the operation has been"
5394                                 " aborted. Please retry.")
5395
5396
5397 class LUDelTags(TagsLU):
5398   """Delete a list of tags from a given object.
5399
5400   """
5401   _OP_REQP = ["kind", "name", "tags"]
5402   REQ_BGL = False
5403
5404   def CheckPrereq(self):
5405     """Check prerequisites.
5406
5407     This checks that we have the given tag.
5408
5409     """
5410     TagsLU.CheckPrereq(self)
5411     for tag in self.op.tags:
5412       objects.TaggableObject.ValidateTag(tag)
5413     del_tags = frozenset(self.op.tags)
5414     cur_tags = self.target.GetTags()
5415     if not del_tags <= cur_tags:
5416       diff_tags = del_tags - cur_tags
5417       diff_names = ["'%s'" % tag for tag in diff_tags]
5418       diff_names.sort()
5419       raise errors.OpPrereqError("Tag(s) %s not found" %
5420                                  (",".join(diff_names)))
5421
5422   def Exec(self, feedback_fn):
5423     """Remove the tag from the object.
5424
5425     """
5426     for tag in self.op.tags:
5427       self.target.RemoveTag(tag)
5428     try:
5429       self.cfg.Update(self.target)
5430     except errors.ConfigurationError:
5431       raise errors.OpRetryError("There has been a modification to the"
5432                                 " config file and the operation has been"
5433                                 " aborted. Please retry.")
5434
5435
5436 class LUTestDelay(NoHooksLU):
5437   """Sleep for a specified amount of time.
5438
5439   This LU sleeps on the master and/or nodes for a specified amount of
5440   time.
5441
5442   """
5443   _OP_REQP = ["duration", "on_master", "on_nodes"]
5444   REQ_BGL = False
5445
5446   def ExpandNames(self):
5447     """Expand names and set required locks.
5448
5449     This expands the node list, if any.
5450
5451     """
5452     self.needed_locks = {}
5453     if self.op.on_nodes:
5454       # _GetWantedNodes can be used here, but is not always appropriate to use
5455       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5456       # more information.
5457       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5458       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5459
5460   def CheckPrereq(self):
5461     """Check prerequisites.
5462
5463     """
5464
5465   def Exec(self, feedback_fn):
5466     """Do the actual sleep.
5467
5468     """
5469     if self.op.on_master:
5470       if not utils.TestDelay(self.op.duration):
5471         raise errors.OpExecError("Error during master delay test")
5472     if self.op.on_nodes:
5473       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5474       if not result:
5475         raise errors.OpExecError("Complete failure from rpc call")
5476       for node, node_result in result.items():
5477         if not node_result:
5478           raise errors.OpExecError("Failure during rpc call to node %s,"
5479                                    " result: %s" % (node, node_result))
5480
5481
5482 class IAllocator(object):
5483   """IAllocator framework.
5484
5485   An IAllocator instance has three sets of attributes:
5486     - cfg that is needed to query the cluster
5487     - input data (all members of the _KEYS class attribute are required)
5488     - four buffer attributes (in|out_data|text), that represent the
5489       input (to the external script) in text and data structure format,
5490       and the output from it, again in two formats
5491     - the result variables from the script (success, info, nodes) for
5492       easy usage
5493
5494   """
5495   _ALLO_KEYS = [
5496     "mem_size", "disks", "disk_template",
5497     "os", "tags", "nics", "vcpus", "hypervisor",
5498     ]
5499   _RELO_KEYS = [
5500     "relocate_from",
5501     ]
5502
5503   def __init__(self, lu, mode, name, **kwargs):
5504     self.lu = lu
5505     # init buffer variables
5506     self.in_text = self.out_text = self.in_data = self.out_data = None
5507     # init all input fields so that pylint is happy
5508     self.mode = mode
5509     self.name = name
5510     self.mem_size = self.disks = self.disk_template = None
5511     self.os = self.tags = self.nics = self.vcpus = None
5512     self.relocate_from = None
5513     # computed fields
5514     self.required_nodes = None
5515     # init result fields
5516     self.success = self.info = self.nodes = None
5517     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5518       keyset = self._ALLO_KEYS
5519     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5520       keyset = self._RELO_KEYS
5521     else:
5522       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5523                                    " IAllocator" % self.mode)
5524     for key in kwargs:
5525       if key not in keyset:
5526         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5527                                      " IAllocator" % key)
5528       setattr(self, key, kwargs[key])
5529     for key in keyset:
5530       if key not in kwargs:
5531         raise errors.ProgrammerError("Missing input parameter '%s' to"
5532                                      " IAllocator" % key)
5533     self._BuildInputData()
5534
5535   def _ComputeClusterData(self):
5536     """Compute the generic allocator input data.
5537
5538     This is the data that is independent of the actual operation.
5539
5540     """
5541     cfg = self.lu.cfg
5542     cluster_info = cfg.GetClusterInfo()
5543     # cluster data
5544     data = {
5545       "version": 1,
5546       "cluster_name": cfg.GetClusterName(),
5547       "cluster_tags": list(cluster_info.GetTags()),
5548       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5549       # we don't have job IDs
5550       }
5551     iinfo = cfg.GetAllInstancesInfo().values()
5552     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
5553
5554     # node data
5555     node_results = {}
5556     node_list = cfg.GetNodeList()
5557
5558     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5559       hypervisor = self.hypervisor
5560     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5561       hypervisor = cfg.GetInstanceInfo(self.name).hypervisor
5562
5563     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5564                                            hypervisor)
5565     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
5566                        cluster_info.enabled_hypervisors)
5567     for nname in node_list:
5568       ninfo = cfg.GetNodeInfo(nname)
5569       if nname not in node_data or not isinstance(node_data[nname], dict):
5570         raise errors.OpExecError("Can't get data for node %s" % nname)
5571       remote_info = node_data[nname]
5572       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5573                    'vg_size', 'vg_free', 'cpu_total']:
5574         if attr not in remote_info:
5575           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5576                                    (nname, attr))
5577         try:
5578           remote_info[attr] = int(remote_info[attr])
5579         except ValueError, err:
5580           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5581                                    " %s" % (nname, attr, str(err)))
5582       # compute memory used by primary instances
5583       i_p_mem = i_p_up_mem = 0
5584       for iinfo, beinfo in i_list:
5585         if iinfo.primary_node == nname:
5586           i_p_mem += beinfo[constants.BE_MEMORY]
5587           if iinfo.name not in node_iinfo[nname]:
5588             i_used_mem = 0
5589           else:
5590             i_used_mem = int(node_iinfo[nname][iinfo.name]['memory'])
5591           i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
5592           remote_info['memory_free'] -= max(0, i_mem_diff)
5593
5594           if iinfo.status == "up":
5595             i_p_up_mem += beinfo[constants.BE_MEMORY]
5596
5597       # compute memory used by instances
5598       pnr = {
5599         "tags": list(ninfo.GetTags()),
5600         "total_memory": remote_info['memory_total'],
5601         "reserved_memory": remote_info['memory_dom0'],
5602         "free_memory": remote_info['memory_free'],
5603         "i_pri_memory": i_p_mem,
5604         "i_pri_up_memory": i_p_up_mem,
5605         "total_disk": remote_info['vg_size'],
5606         "free_disk": remote_info['vg_free'],
5607         "primary_ip": ninfo.primary_ip,
5608         "secondary_ip": ninfo.secondary_ip,
5609         "total_cpus": remote_info['cpu_total'],
5610         }
5611       node_results[nname] = pnr
5612     data["nodes"] = node_results
5613
5614     # instance data
5615     instance_data = {}
5616     for iinfo, beinfo in i_list:
5617       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5618                   for n in iinfo.nics]
5619       pir = {
5620         "tags": list(iinfo.GetTags()),
5621         "should_run": iinfo.status == "up",
5622         "vcpus": beinfo[constants.BE_VCPUS],
5623         "memory": beinfo[constants.BE_MEMORY],
5624         "os": iinfo.os,
5625         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5626         "nics": nic_data,
5627         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5628         "disk_template": iinfo.disk_template,
5629         "hypervisor": iinfo.hypervisor,
5630         }
5631       instance_data[iinfo.name] = pir
5632
5633     data["instances"] = instance_data
5634
5635     self.in_data = data
5636
5637   def _AddNewInstance(self):
5638     """Add new instance data to allocator structure.
5639
5640     This in combination with _AllocatorGetClusterData will create the
5641     correct structure needed as input for the allocator.
5642
5643     The checks for the completeness of the opcode must have already been
5644     done.
5645
5646     """
5647     data = self.in_data
5648     if len(self.disks) != 2:
5649       raise errors.OpExecError("Only two-disk configurations supported")
5650
5651     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
5652
5653     if self.disk_template in constants.DTS_NET_MIRROR:
5654       self.required_nodes = 2
5655     else:
5656       self.required_nodes = 1
5657     request = {
5658       "type": "allocate",
5659       "name": self.name,
5660       "disk_template": self.disk_template,
5661       "tags": self.tags,
5662       "os": self.os,
5663       "vcpus": self.vcpus,
5664       "memory": self.mem_size,
5665       "disks": self.disks,
5666       "disk_space_total": disk_space,
5667       "nics": self.nics,
5668       "required_nodes": self.required_nodes,
5669       }
5670     data["request"] = request
5671
5672   def _AddRelocateInstance(self):
5673     """Add relocate instance data to allocator structure.
5674
5675     This in combination with _IAllocatorGetClusterData will create the
5676     correct structure needed as input for the allocator.
5677
5678     The checks for the completeness of the opcode must have already been
5679     done.
5680
5681     """
5682     instance = self.lu.cfg.GetInstanceInfo(self.name)
5683     if instance is None:
5684       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5685                                    " IAllocator" % self.name)
5686
5687     if instance.disk_template not in constants.DTS_NET_MIRROR:
5688       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5689
5690     if len(instance.secondary_nodes) != 1:
5691       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5692
5693     self.required_nodes = 1
5694     disk_sizes = [{'size': disk.size} for disk in instance.disks]
5695     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
5696
5697     request = {
5698       "type": "relocate",
5699       "name": self.name,
5700       "disk_space_total": disk_space,
5701       "required_nodes": self.required_nodes,
5702       "relocate_from": self.relocate_from,
5703       }
5704     self.in_data["request"] = request
5705
5706   def _BuildInputData(self):
5707     """Build input data structures.
5708
5709     """
5710     self._ComputeClusterData()
5711
5712     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5713       self._AddNewInstance()
5714     else:
5715       self._AddRelocateInstance()
5716
5717     self.in_text = serializer.Dump(self.in_data)
5718
5719   def Run(self, name, validate=True, call_fn=None):
5720     """Run an instance allocator and return the results.
5721
5722     """
5723     if call_fn is None:
5724       call_fn = self.lu.rpc.call_iallocator_runner
5725     data = self.in_text
5726
5727     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5728
5729     if not isinstance(result, (list, tuple)) or len(result) != 4:
5730       raise errors.OpExecError("Invalid result from master iallocator runner")
5731
5732     rcode, stdout, stderr, fail = result
5733
5734     if rcode == constants.IARUN_NOTFOUND:
5735       raise errors.OpExecError("Can't find allocator '%s'" % name)
5736     elif rcode == constants.IARUN_FAILURE:
5737       raise errors.OpExecError("Instance allocator call failed: %s,"
5738                                " output: %s" % (fail, stdout+stderr))
5739     self.out_text = stdout
5740     if validate:
5741       self._ValidateResult()
5742
5743   def _ValidateResult(self):
5744     """Process the allocator results.
5745
5746     This will process and if successful save the result in
5747     self.out_data and the other parameters.
5748
5749     """
5750     try:
5751       rdict = serializer.Load(self.out_text)
5752     except Exception, err:
5753       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5754
5755     if not isinstance(rdict, dict):
5756       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5757
5758     for key in "success", "info", "nodes":
5759       if key not in rdict:
5760         raise errors.OpExecError("Can't parse iallocator results:"
5761                                  " missing key '%s'" % key)
5762       setattr(self, key, rdict[key])
5763
5764     if not isinstance(rdict["nodes"], list):
5765       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5766                                " is not a list")
5767     self.out_data = rdict
5768
5769
5770 class LUTestAllocator(NoHooksLU):
5771   """Run allocator tests.
5772
5773   This LU runs the allocator tests
5774
5775   """
5776   _OP_REQP = ["direction", "mode", "name"]
5777
5778   def CheckPrereq(self):
5779     """Check prerequisites.
5780
5781     This checks the opcode parameters depending on the director and mode test.
5782
5783     """
5784     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5785       for attr in ["name", "mem_size", "disks", "disk_template",
5786                    "os", "tags", "nics", "vcpus"]:
5787         if not hasattr(self.op, attr):
5788           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5789                                      attr)
5790       iname = self.cfg.ExpandInstanceName(self.op.name)
5791       if iname is not None:
5792         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5793                                    iname)
5794       if not isinstance(self.op.nics, list):
5795         raise errors.OpPrereqError("Invalid parameter 'nics'")
5796       for row in self.op.nics:
5797         if (not isinstance(row, dict) or
5798             "mac" not in row or
5799             "ip" not in row or
5800             "bridge" not in row):
5801           raise errors.OpPrereqError("Invalid contents of the"
5802                                      " 'nics' parameter")
5803       if not isinstance(self.op.disks, list):
5804         raise errors.OpPrereqError("Invalid parameter 'disks'")
5805       if len(self.op.disks) != 2:
5806         raise errors.OpPrereqError("Only two-disk configurations supported")
5807       for row in self.op.disks:
5808         if (not isinstance(row, dict) or
5809             "size" not in row or
5810             not isinstance(row["size"], int) or
5811             "mode" not in row or
5812             row["mode"] not in ['r', 'w']):
5813           raise errors.OpPrereqError("Invalid contents of the"
5814                                      " 'disks' parameter")
5815       if self.op.hypervisor is None:
5816         self.op.hypervisor = self.cfg.GetHypervisorType()
5817     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5818       if not hasattr(self.op, "name"):
5819         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5820       fname = self.cfg.ExpandInstanceName(self.op.name)
5821       if fname is None:
5822         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5823                                    self.op.name)
5824       self.op.name = fname
5825       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5826     else:
5827       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5828                                  self.op.mode)
5829
5830     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5831       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5832         raise errors.OpPrereqError("Missing allocator name")
5833     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5834       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5835                                  self.op.direction)
5836
5837   def Exec(self, feedback_fn):
5838     """Run the allocator test.
5839
5840     """
5841     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5842       ial = IAllocator(self,
5843                        mode=self.op.mode,
5844                        name=self.op.name,
5845                        mem_size=self.op.mem_size,
5846                        disks=self.op.disks,
5847                        disk_template=self.op.disk_template,
5848                        os=self.op.os,
5849                        tags=self.op.tags,
5850                        nics=self.op.nics,
5851                        vcpus=self.op.vcpus,
5852                        hypervisor=self.op.hypervisor,
5853                        )
5854     else:
5855       ial = IAllocator(self,
5856                        mode=self.op.mode,
5857                        name=self.op.name,
5858                        relocate_from=list(self.relocate_from),
5859                        )
5860
5861     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5862       result = ial.in_text
5863     else:
5864       ial.Run(self.op.allocator, validate=False)
5865       result = ial.out_text
5866     return result