LURebootInstance: lock only primary when possible
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45
46
47 class LogicalUnit(object):
48   """Logical Unit base class.
49
50   Subclasses must follow these rules:
51     - implement ExpandNames
52     - implement CheckPrereq
53     - implement Exec
54     - implement BuildHooksEnv
55     - redefine HPATH and HTYPE
56     - optionally redefine their run requirements:
57         REQ_MASTER: the LU needs to run on the master node
58         REQ_WSSTORE: the LU needs a writable SimpleStore
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_MASTER = True
68   REQ_WSSTORE = False
69   REQ_BGL = True
70
71   def __init__(self, processor, op, context, sstore):
72     """Constructor for LogicalUnit.
73
74     This needs to be overriden in derived classes in order to check op
75     validity.
76
77     """
78     self.proc = processor
79     self.op = op
80     self.cfg = context.cfg
81     self.sstore = sstore
82     self.context = context
83     self.needed_locks = None
84     self.acquired_locks = {}
85     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89
90     for attr_name in self._OP_REQP:
91       attr_val = getattr(op, attr_name, None)
92       if attr_val is None:
93         raise errors.OpPrereqError("Required parameter '%s' missing" %
94                                    attr_name)
95
96     if not self.cfg.IsCluster():
97       raise errors.OpPrereqError("Cluster not initialized yet,"
98                                  " use 'gnt-cluster init' first.")
99     if self.REQ_MASTER:
100       master = sstore.GetMasterNode()
101       if master != utils.HostInfo().name:
102         raise errors.OpPrereqError("Commands must be run on the master"
103                                    " node %s" % master)
104
105   def __GetSSH(self):
106     """Returns the SshRunner object
107
108     """
109     if not self.__ssh:
110       self.__ssh = ssh.SshRunner(self.sstore)
111     return self.__ssh
112
113   ssh = property(fget=__GetSSH)
114
115   def ExpandNames(self):
116     """Expand names for this LU.
117
118     This method is called before starting to execute the opcode, and it should
119     update all the parameters of the opcode to their canonical form (e.g. a
120     short node name must be fully expanded after this method has successfully
121     completed). This way locking, hooks, logging, ecc. can work correctly.
122
123     LUs which implement this method must also populate the self.needed_locks
124     member, as a dict with lock levels as keys, and a list of needed lock names
125     as values. Rules:
126       - Use an empty dict if you don't need any lock
127       - If you don't need any lock at a particular level omit that level
128       - Don't put anything for the BGL level
129       - If you want all locks at a level use locking.ALL_SET as a value
130
131     If you need to share locks (rather than acquire them exclusively) at one
132     level you can modify self.share_locks, setting a true value (usually 1) for
133     that level. By default locks are not shared.
134
135     Examples:
136     # Acquire all nodes and one instance
137     self.needed_locks = {
138       locking.LEVEL_NODE: locking.ALL_SET,
139       locking.LEVEL_INSTANCES: ['instance1.example.tld'],
140     }
141     # Acquire just two nodes
142     self.needed_locks = {
143       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
144     }
145     # Acquire no locks
146     self.needed_locks = {} # No, you can't leave it to the default value None
147
148     """
149     # The implementation of this method is mandatory only if the new LU is
150     # concurrent, so that old LUs don't need to be changed all at the same
151     # time.
152     if self.REQ_BGL:
153       self.needed_locks = {} # Exclusive LUs don't need locks.
154     else:
155       raise NotImplementedError
156
157   def DeclareLocks(self, level):
158     """Declare LU locking needs for a level
159
160     While most LUs can just declare their locking needs at ExpandNames time,
161     sometimes there's the need to calculate some locks after having acquired
162     the ones before. This function is called just before acquiring locks at a
163     particular level, but after acquiring the ones at lower levels, and permits
164     such calculations. It can be used to modify self.needed_locks, and by
165     default it does nothing.
166
167     This function is only called if you have something already set in
168     self.needed_locks for the level.
169
170     @param level: Locking level which is going to be locked
171     @type level: member of ganeti.locking.LEVELS
172
173     """
174
175   def CheckPrereq(self):
176     """Check prerequisites for this LU.
177
178     This method should check that the prerequisites for the execution
179     of this LU are fulfilled. It can do internode communication, but
180     it should be idempotent - no cluster or system changes are
181     allowed.
182
183     The method should raise errors.OpPrereqError in case something is
184     not fulfilled. Its return value is ignored.
185
186     This method should also update all the parameters of the opcode to
187     their canonical form if it hasn't been done by ExpandNames before.
188
189     """
190     raise NotImplementedError
191
192   def Exec(self, feedback_fn):
193     """Execute the LU.
194
195     This method should implement the actual work. It should raise
196     errors.OpExecError for failures that are somewhat dealt with in
197     code, or expected.
198
199     """
200     raise NotImplementedError
201
202   def BuildHooksEnv(self):
203     """Build hooks environment for this LU.
204
205     This method should return a three-node tuple consisting of: a dict
206     containing the environment that will be used for running the
207     specific hook for this LU, a list of node names on which the hook
208     should run before the execution, and a list of node names on which
209     the hook should run after the execution.
210
211     The keys of the dict must not have 'GANETI_' prefixed as this will
212     be handled in the hooks runner. Also note additional keys will be
213     added by the hooks runner. If the LU doesn't define any
214     environment, an empty dict (and not None) should be returned.
215
216     No nodes should be returned as an empty list (and not None).
217
218     Note that if the HPATH for a LU class is None, this function will
219     not be called.
220
221     """
222     raise NotImplementedError
223
224   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
225     """Notify the LU about the results of its hooks.
226
227     This method is called every time a hooks phase is executed, and notifies
228     the Logical Unit about the hooks' result. The LU can then use it to alter
229     its result based on the hooks.  By default the method does nothing and the
230     previous result is passed back unchanged but any LU can define it if it
231     wants to use the local cluster hook-scripts somehow.
232
233     Args:
234       phase: the hooks phase that has just been run
235       hooks_results: the results of the multi-node hooks rpc call
236       feedback_fn: function to send feedback back to the caller
237       lu_result: the previous result this LU had, or None in the PRE phase.
238
239     """
240     return lu_result
241
242   def _ExpandAndLockInstance(self):
243     """Helper function to expand and lock an instance.
244
245     Many LUs that work on an instance take its name in self.op.instance_name
246     and need to expand it and then declare the expanded name for locking. This
247     function does it, and then updates self.op.instance_name to the expanded
248     name. It also initializes needed_locks as a dict, if this hasn't been done
249     before.
250
251     """
252     if self.needed_locks is None:
253       self.needed_locks = {}
254     else:
255       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
256         "_ExpandAndLockInstance called with instance-level locks set"
257     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
258     if expanded_name is None:
259       raise errors.OpPrereqError("Instance '%s' not known" %
260                                   self.op.instance_name)
261     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
262     self.op.instance_name = expanded_name
263
264   def _LockInstancesNodes(self, primary_only=False):
265     """Helper function to declare instances' nodes for locking.
266
267     This function should be called after locking one or more instances to lock
268     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
269     with all primary or secondary nodes for instances already locked and
270     present in self.needed_locks[locking.LEVEL_INSTANCE].
271
272     It should be called from DeclareLocks, and for safety only works if
273     self.recalculate_locks[locking.LEVEL_NODE] is set.
274
275     In the future it may grow parameters to just lock some instance's nodes, or
276     to just lock primaries or secondary nodes, if needed.
277
278     If should be called in DeclareLocks in a way similar to:
279
280     if level == locking.LEVEL_NODE:
281       self._LockInstancesNodes()
282
283     @type primary_only: boolean
284     @param primary_only: only lock primary nodes of locked instances
285
286     """
287     assert locking.LEVEL_NODE in self.recalculate_locks, \
288       "_LockInstancesNodes helper function called with no nodes to recalculate"
289
290     # TODO: check if we're really been called with the instance locks held
291
292     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
293     # future we might want to have different behaviors depending on the value
294     # of self.recalculate_locks[locking.LEVEL_NODE]
295     wanted_nodes = []
296     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
297       instance = self.context.cfg.GetInstanceInfo(instance_name)
298       wanted_nodes.append(instance.primary_node)
299       if not primary_only:
300         wanted_nodes.extend(instance.secondary_nodes)
301     self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
302
303     del self.recalculate_locks[locking.LEVEL_NODE]
304
305
306 class NoHooksLU(LogicalUnit):
307   """Simple LU which runs no hooks.
308
309   This LU is intended as a parent for other LogicalUnits which will
310   run no hooks, in order to reduce duplicate code.
311
312   """
313   HPATH = None
314   HTYPE = None
315
316
317 def _GetWantedNodes(lu, nodes):
318   """Returns list of checked and expanded node names.
319
320   Args:
321     nodes: List of nodes (strings) or None for all
322
323   """
324   if not isinstance(nodes, list):
325     raise errors.OpPrereqError("Invalid argument type 'nodes'")
326
327   if not nodes:
328     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
329       " non-empty list of nodes whose name is to be expanded.")
330
331   wanted = []
332   for name in nodes:
333     node = lu.cfg.ExpandNodeName(name)
334     if node is None:
335       raise errors.OpPrereqError("No such node name '%s'" % name)
336     wanted.append(node)
337
338   return utils.NiceSort(wanted)
339
340
341 def _GetWantedInstances(lu, instances):
342   """Returns list of checked and expanded instance names.
343
344   Args:
345     instances: List of instances (strings) or None for all
346
347   """
348   if not isinstance(instances, list):
349     raise errors.OpPrereqError("Invalid argument type 'instances'")
350
351   if instances:
352     wanted = []
353
354     for name in instances:
355       instance = lu.cfg.ExpandInstanceName(name)
356       if instance is None:
357         raise errors.OpPrereqError("No such instance name '%s'" % name)
358       wanted.append(instance)
359
360   else:
361     wanted = lu.cfg.GetInstanceList()
362   return utils.NiceSort(wanted)
363
364
365 def _CheckOutputFields(static, dynamic, selected):
366   """Checks whether all selected fields are valid.
367
368   Args:
369     static: Static fields
370     dynamic: Dynamic fields
371
372   """
373   static_fields = frozenset(static)
374   dynamic_fields = frozenset(dynamic)
375
376   all_fields = static_fields | dynamic_fields
377
378   if not all_fields.issuperset(selected):
379     raise errors.OpPrereqError("Unknown output fields selected: %s"
380                                % ",".join(frozenset(selected).
381                                           difference(all_fields)))
382
383
384 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
385                           memory, vcpus, nics):
386   """Builds instance related env variables for hooks from single variables.
387
388   Args:
389     secondary_nodes: List of secondary nodes as strings
390   """
391   env = {
392     "OP_TARGET": name,
393     "INSTANCE_NAME": name,
394     "INSTANCE_PRIMARY": primary_node,
395     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
396     "INSTANCE_OS_TYPE": os_type,
397     "INSTANCE_STATUS": status,
398     "INSTANCE_MEMORY": memory,
399     "INSTANCE_VCPUS": vcpus,
400   }
401
402   if nics:
403     nic_count = len(nics)
404     for idx, (ip, bridge, mac) in enumerate(nics):
405       if ip is None:
406         ip = ""
407       env["INSTANCE_NIC%d_IP" % idx] = ip
408       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
409       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
410   else:
411     nic_count = 0
412
413   env["INSTANCE_NIC_COUNT"] = nic_count
414
415   return env
416
417
418 def _BuildInstanceHookEnvByObject(instance, override=None):
419   """Builds instance related env variables for hooks from an object.
420
421   Args:
422     instance: objects.Instance object of instance
423     override: dict of values to override
424   """
425   args = {
426     'name': instance.name,
427     'primary_node': instance.primary_node,
428     'secondary_nodes': instance.secondary_nodes,
429     'os_type': instance.os,
430     'status': instance.os,
431     'memory': instance.memory,
432     'vcpus': instance.vcpus,
433     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
434   }
435   if override:
436     args.update(override)
437   return _BuildInstanceHookEnv(**args)
438
439
440 def _CheckInstanceBridgesExist(instance):
441   """Check that the brigdes needed by an instance exist.
442
443   """
444   # check bridges existance
445   brlist = [nic.bridge for nic in instance.nics]
446   if not rpc.call_bridges_exist(instance.primary_node, brlist):
447     raise errors.OpPrereqError("one or more target bridges %s does not"
448                                " exist on destination node '%s'" %
449                                (brlist, instance.primary_node))
450
451
452 class LUDestroyCluster(NoHooksLU):
453   """Logical unit for destroying the cluster.
454
455   """
456   _OP_REQP = []
457
458   def CheckPrereq(self):
459     """Check prerequisites.
460
461     This checks whether the cluster is empty.
462
463     Any errors are signalled by raising errors.OpPrereqError.
464
465     """
466     master = self.sstore.GetMasterNode()
467
468     nodelist = self.cfg.GetNodeList()
469     if len(nodelist) != 1 or nodelist[0] != master:
470       raise errors.OpPrereqError("There are still %d node(s) in"
471                                  " this cluster." % (len(nodelist) - 1))
472     instancelist = self.cfg.GetInstanceList()
473     if instancelist:
474       raise errors.OpPrereqError("There are still %d instance(s) in"
475                                  " this cluster." % len(instancelist))
476
477   def Exec(self, feedback_fn):
478     """Destroys the cluster.
479
480     """
481     master = self.sstore.GetMasterNode()
482     if not rpc.call_node_stop_master(master, False):
483       raise errors.OpExecError("Could not disable the master role")
484     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
485     utils.CreateBackup(priv_key)
486     utils.CreateBackup(pub_key)
487     return master
488
489
490 class LUVerifyCluster(LogicalUnit):
491   """Verifies the cluster status.
492
493   """
494   HPATH = "cluster-verify"
495   HTYPE = constants.HTYPE_CLUSTER
496   _OP_REQP = ["skip_checks"]
497
498   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
499                   remote_version, feedback_fn):
500     """Run multiple tests against a node.
501
502     Test list:
503       - compares ganeti version
504       - checks vg existance and size > 20G
505       - checks config file checksum
506       - checks ssh to other nodes
507
508     Args:
509       node: name of the node to check
510       file_list: required list of files
511       local_cksum: dictionary of local files and their checksums
512
513     """
514     # compares ganeti version
515     local_version = constants.PROTOCOL_VERSION
516     if not remote_version:
517       feedback_fn("  - ERROR: connection to %s failed" % (node))
518       return True
519
520     if local_version != remote_version:
521       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
522                       (local_version, node, remote_version))
523       return True
524
525     # checks vg existance and size > 20G
526
527     bad = False
528     if not vglist:
529       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
530                       (node,))
531       bad = True
532     else:
533       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
534                                             constants.MIN_VG_SIZE)
535       if vgstatus:
536         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
537         bad = True
538
539     # checks config file checksum
540     # checks ssh to any
541
542     if 'filelist' not in node_result:
543       bad = True
544       feedback_fn("  - ERROR: node hasn't returned file checksum data")
545     else:
546       remote_cksum = node_result['filelist']
547       for file_name in file_list:
548         if file_name not in remote_cksum:
549           bad = True
550           feedback_fn("  - ERROR: file '%s' missing" % file_name)
551         elif remote_cksum[file_name] != local_cksum[file_name]:
552           bad = True
553           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
554
555     if 'nodelist' not in node_result:
556       bad = True
557       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
558     else:
559       if node_result['nodelist']:
560         bad = True
561         for node in node_result['nodelist']:
562           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
563                           (node, node_result['nodelist'][node]))
564     if 'node-net-test' not in node_result:
565       bad = True
566       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
567     else:
568       if node_result['node-net-test']:
569         bad = True
570         nlist = utils.NiceSort(node_result['node-net-test'].keys())
571         for node in nlist:
572           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
573                           (node, node_result['node-net-test'][node]))
574
575     hyp_result = node_result.get('hypervisor', None)
576     if hyp_result is not None:
577       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
578     return bad
579
580   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
581                       node_instance, feedback_fn):
582     """Verify an instance.
583
584     This function checks to see if the required block devices are
585     available on the instance's node.
586
587     """
588     bad = False
589
590     node_current = instanceconfig.primary_node
591
592     node_vol_should = {}
593     instanceconfig.MapLVsByNode(node_vol_should)
594
595     for node in node_vol_should:
596       for volume in node_vol_should[node]:
597         if node not in node_vol_is or volume not in node_vol_is[node]:
598           feedback_fn("  - ERROR: volume %s missing on node %s" %
599                           (volume, node))
600           bad = True
601
602     if not instanceconfig.status == 'down':
603       if (node_current not in node_instance or
604           not instance in node_instance[node_current]):
605         feedback_fn("  - ERROR: instance %s not running on node %s" %
606                         (instance, node_current))
607         bad = True
608
609     for node in node_instance:
610       if (not node == node_current):
611         if instance in node_instance[node]:
612           feedback_fn("  - ERROR: instance %s should not run on node %s" %
613                           (instance, node))
614           bad = True
615
616     return bad
617
618   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
619     """Verify if there are any unknown volumes in the cluster.
620
621     The .os, .swap and backup volumes are ignored. All other volumes are
622     reported as unknown.
623
624     """
625     bad = False
626
627     for node in node_vol_is:
628       for volume in node_vol_is[node]:
629         if node not in node_vol_should or volume not in node_vol_should[node]:
630           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
631                       (volume, node))
632           bad = True
633     return bad
634
635   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
636     """Verify the list of running instances.
637
638     This checks what instances are running but unknown to the cluster.
639
640     """
641     bad = False
642     for node in node_instance:
643       for runninginstance in node_instance[node]:
644         if runninginstance not in instancelist:
645           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
646                           (runninginstance, node))
647           bad = True
648     return bad
649
650   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
651     """Verify N+1 Memory Resilience.
652
653     Check that if one single node dies we can still start all the instances it
654     was primary for.
655
656     """
657     bad = False
658
659     for node, nodeinfo in node_info.iteritems():
660       # This code checks that every node which is now listed as secondary has
661       # enough memory to host all instances it is supposed to should a single
662       # other node in the cluster fail.
663       # FIXME: not ready for failover to an arbitrary node
664       # FIXME: does not support file-backed instances
665       # WARNING: we currently take into account down instances as well as up
666       # ones, considering that even if they're down someone might want to start
667       # them even in the event of a node failure.
668       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
669         needed_mem = 0
670         for instance in instances:
671           needed_mem += instance_cfg[instance].memory
672         if nodeinfo['mfree'] < needed_mem:
673           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
674                       " failovers should node %s fail" % (node, prinode))
675           bad = True
676     return bad
677
678   def CheckPrereq(self):
679     """Check prerequisites.
680
681     Transform the list of checks we're going to skip into a set and check that
682     all its members are valid.
683
684     """
685     self.skip_set = frozenset(self.op.skip_checks)
686     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
687       raise errors.OpPrereqError("Invalid checks to be skipped specified")
688
689   def BuildHooksEnv(self):
690     """Build hooks env.
691
692     Cluster-Verify hooks just rone in the post phase and their failure makes
693     the output be logged in the verify output and the verification to fail.
694
695     """
696     all_nodes = self.cfg.GetNodeList()
697     # TODO: populate the environment with useful information for verify hooks
698     env = {}
699     return env, [], all_nodes
700
701   def Exec(self, feedback_fn):
702     """Verify integrity of cluster, performing various test on nodes.
703
704     """
705     bad = False
706     feedback_fn("* Verifying global settings")
707     for msg in self.cfg.VerifyConfig():
708       feedback_fn("  - ERROR: %s" % msg)
709
710     vg_name = self.cfg.GetVGName()
711     nodelist = utils.NiceSort(self.cfg.GetNodeList())
712     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
713     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
714     i_non_redundant = [] # Non redundant instances
715     node_volume = {}
716     node_instance = {}
717     node_info = {}
718     instance_cfg = {}
719
720     # FIXME: verify OS list
721     # do local checksums
722     file_names = list(self.sstore.GetFileList())
723     file_names.append(constants.SSL_CERT_FILE)
724     file_names.append(constants.CLUSTER_CONF_FILE)
725     local_checksums = utils.FingerprintFiles(file_names)
726
727     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
728     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
729     all_instanceinfo = rpc.call_instance_list(nodelist)
730     all_vglist = rpc.call_vg_list(nodelist)
731     node_verify_param = {
732       'filelist': file_names,
733       'nodelist': nodelist,
734       'hypervisor': None,
735       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
736                         for node in nodeinfo]
737       }
738     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
739     all_rversion = rpc.call_version(nodelist)
740     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
741
742     for node in nodelist:
743       feedback_fn("* Verifying node %s" % node)
744       result = self._VerifyNode(node, file_names, local_checksums,
745                                 all_vglist[node], all_nvinfo[node],
746                                 all_rversion[node], feedback_fn)
747       bad = bad or result
748
749       # node_volume
750       volumeinfo = all_volumeinfo[node]
751
752       if isinstance(volumeinfo, basestring):
753         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
754                     (node, volumeinfo[-400:].encode('string_escape')))
755         bad = True
756         node_volume[node] = {}
757       elif not isinstance(volumeinfo, dict):
758         feedback_fn("  - ERROR: connection to %s failed" % (node,))
759         bad = True
760         continue
761       else:
762         node_volume[node] = volumeinfo
763
764       # node_instance
765       nodeinstance = all_instanceinfo[node]
766       if type(nodeinstance) != list:
767         feedback_fn("  - ERROR: connection to %s failed" % (node,))
768         bad = True
769         continue
770
771       node_instance[node] = nodeinstance
772
773       # node_info
774       nodeinfo = all_ninfo[node]
775       if not isinstance(nodeinfo, dict):
776         feedback_fn("  - ERROR: connection to %s failed" % (node,))
777         bad = True
778         continue
779
780       try:
781         node_info[node] = {
782           "mfree": int(nodeinfo['memory_free']),
783           "dfree": int(nodeinfo['vg_free']),
784           "pinst": [],
785           "sinst": [],
786           # dictionary holding all instances this node is secondary for,
787           # grouped by their primary node. Each key is a cluster node, and each
788           # value is a list of instances which have the key as primary and the
789           # current node as secondary.  this is handy to calculate N+1 memory
790           # availability if you can only failover from a primary to its
791           # secondary.
792           "sinst-by-pnode": {},
793         }
794       except ValueError:
795         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
796         bad = True
797         continue
798
799     node_vol_should = {}
800
801     for instance in instancelist:
802       feedback_fn("* Verifying instance %s" % instance)
803       inst_config = self.cfg.GetInstanceInfo(instance)
804       result =  self._VerifyInstance(instance, inst_config, node_volume,
805                                      node_instance, feedback_fn)
806       bad = bad or result
807
808       inst_config.MapLVsByNode(node_vol_should)
809
810       instance_cfg[instance] = inst_config
811
812       pnode = inst_config.primary_node
813       if pnode in node_info:
814         node_info[pnode]['pinst'].append(instance)
815       else:
816         feedback_fn("  - ERROR: instance %s, connection to primary node"
817                     " %s failed" % (instance, pnode))
818         bad = True
819
820       # If the instance is non-redundant we cannot survive losing its primary
821       # node, so we are not N+1 compliant. On the other hand we have no disk
822       # templates with more than one secondary so that situation is not well
823       # supported either.
824       # FIXME: does not support file-backed instances
825       if len(inst_config.secondary_nodes) == 0:
826         i_non_redundant.append(instance)
827       elif len(inst_config.secondary_nodes) > 1:
828         feedback_fn("  - WARNING: multiple secondaries for instance %s"
829                     % instance)
830
831       for snode in inst_config.secondary_nodes:
832         if snode in node_info:
833           node_info[snode]['sinst'].append(instance)
834           if pnode not in node_info[snode]['sinst-by-pnode']:
835             node_info[snode]['sinst-by-pnode'][pnode] = []
836           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
837         else:
838           feedback_fn("  - ERROR: instance %s, connection to secondary node"
839                       " %s failed" % (instance, snode))
840
841     feedback_fn("* Verifying orphan volumes")
842     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
843                                        feedback_fn)
844     bad = bad or result
845
846     feedback_fn("* Verifying remaining instances")
847     result = self._VerifyOrphanInstances(instancelist, node_instance,
848                                          feedback_fn)
849     bad = bad or result
850
851     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
852       feedback_fn("* Verifying N+1 Memory redundancy")
853       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
854       bad = bad or result
855
856     feedback_fn("* Other Notes")
857     if i_non_redundant:
858       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
859                   % len(i_non_redundant))
860
861     return not bad
862
863   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
864     """Analize the post-hooks' result, handle it, and send some
865     nicely-formatted feedback back to the user.
866
867     Args:
868       phase: the hooks phase that has just been run
869       hooks_results: the results of the multi-node hooks rpc call
870       feedback_fn: function to send feedback back to the caller
871       lu_result: previous Exec result
872
873     """
874     # We only really run POST phase hooks, and are only interested in
875     # their results
876     if phase == constants.HOOKS_PHASE_POST:
877       # Used to change hooks' output to proper indentation
878       indent_re = re.compile('^', re.M)
879       feedback_fn("* Hooks Results")
880       if not hooks_results:
881         feedback_fn("  - ERROR: general communication failure")
882         lu_result = 1
883       else:
884         for node_name in hooks_results:
885           show_node_header = True
886           res = hooks_results[node_name]
887           if res is False or not isinstance(res, list):
888             feedback_fn("    Communication failure")
889             lu_result = 1
890             continue
891           for script, hkr, output in res:
892             if hkr == constants.HKR_FAIL:
893               # The node header is only shown once, if there are
894               # failing hooks on that node
895               if show_node_header:
896                 feedback_fn("  Node %s:" % node_name)
897                 show_node_header = False
898               feedback_fn("    ERROR: Script %s failed, output:" % script)
899               output = indent_re.sub('      ', output)
900               feedback_fn("%s" % output)
901               lu_result = 1
902
903       return lu_result
904
905
906 class LUVerifyDisks(NoHooksLU):
907   """Verifies the cluster disks status.
908
909   """
910   _OP_REQP = []
911
912   def CheckPrereq(self):
913     """Check prerequisites.
914
915     This has no prerequisites.
916
917     """
918     pass
919
920   def Exec(self, feedback_fn):
921     """Verify integrity of cluster disks.
922
923     """
924     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
925
926     vg_name = self.cfg.GetVGName()
927     nodes = utils.NiceSort(self.cfg.GetNodeList())
928     instances = [self.cfg.GetInstanceInfo(name)
929                  for name in self.cfg.GetInstanceList()]
930
931     nv_dict = {}
932     for inst in instances:
933       inst_lvs = {}
934       if (inst.status != "up" or
935           inst.disk_template not in constants.DTS_NET_MIRROR):
936         continue
937       inst.MapLVsByNode(inst_lvs)
938       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
939       for node, vol_list in inst_lvs.iteritems():
940         for vol in vol_list:
941           nv_dict[(node, vol)] = inst
942
943     if not nv_dict:
944       return result
945
946     node_lvs = rpc.call_volume_list(nodes, vg_name)
947
948     to_act = set()
949     for node in nodes:
950       # node_volume
951       lvs = node_lvs[node]
952
953       if isinstance(lvs, basestring):
954         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
955         res_nlvm[node] = lvs
956       elif not isinstance(lvs, dict):
957         logger.Info("connection to node %s failed or invalid data returned" %
958                     (node,))
959         res_nodes.append(node)
960         continue
961
962       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
963         inst = nv_dict.pop((node, lv_name), None)
964         if (not lv_online and inst is not None
965             and inst.name not in res_instances):
966           res_instances.append(inst.name)
967
968     # any leftover items in nv_dict are missing LVs, let's arrange the
969     # data better
970     for key, inst in nv_dict.iteritems():
971       if inst.name not in res_missing:
972         res_missing[inst.name] = []
973       res_missing[inst.name].append(key)
974
975     return result
976
977
978 class LURenameCluster(LogicalUnit):
979   """Rename the cluster.
980
981   """
982   HPATH = "cluster-rename"
983   HTYPE = constants.HTYPE_CLUSTER
984   _OP_REQP = ["name"]
985   REQ_WSSTORE = True
986
987   def BuildHooksEnv(self):
988     """Build hooks env.
989
990     """
991     env = {
992       "OP_TARGET": self.sstore.GetClusterName(),
993       "NEW_NAME": self.op.name,
994       }
995     mn = self.sstore.GetMasterNode()
996     return env, [mn], [mn]
997
998   def CheckPrereq(self):
999     """Verify that the passed name is a valid one.
1000
1001     """
1002     hostname = utils.HostInfo(self.op.name)
1003
1004     new_name = hostname.name
1005     self.ip = new_ip = hostname.ip
1006     old_name = self.sstore.GetClusterName()
1007     old_ip = self.sstore.GetMasterIP()
1008     if new_name == old_name and new_ip == old_ip:
1009       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1010                                  " cluster has changed")
1011     if new_ip != old_ip:
1012       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1013         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1014                                    " reachable on the network. Aborting." %
1015                                    new_ip)
1016
1017     self.op.name = new_name
1018
1019   def Exec(self, feedback_fn):
1020     """Rename the cluster.
1021
1022     """
1023     clustername = self.op.name
1024     ip = self.ip
1025     ss = self.sstore
1026
1027     # shutdown the master IP
1028     master = ss.GetMasterNode()
1029     if not rpc.call_node_stop_master(master, False):
1030       raise errors.OpExecError("Could not disable the master role")
1031
1032     try:
1033       # modify the sstore
1034       ss.SetKey(ss.SS_MASTER_IP, ip)
1035       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1036
1037       # Distribute updated ss config to all nodes
1038       myself = self.cfg.GetNodeInfo(master)
1039       dist_nodes = self.cfg.GetNodeList()
1040       if myself.name in dist_nodes:
1041         dist_nodes.remove(myself.name)
1042
1043       logger.Debug("Copying updated ssconf data to all nodes")
1044       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1045         fname = ss.KeyToFilename(keyname)
1046         result = rpc.call_upload_file(dist_nodes, fname)
1047         for to_node in dist_nodes:
1048           if not result[to_node]:
1049             logger.Error("copy of file %s to node %s failed" %
1050                          (fname, to_node))
1051     finally:
1052       if not rpc.call_node_start_master(master, False):
1053         logger.Error("Could not re-enable the master role on the master,"
1054                      " please restart manually.")
1055
1056
1057 def _RecursiveCheckIfLVMBased(disk):
1058   """Check if the given disk or its children are lvm-based.
1059
1060   Args:
1061     disk: ganeti.objects.Disk object
1062
1063   Returns:
1064     boolean indicating whether a LD_LV dev_type was found or not
1065
1066   """
1067   if disk.children:
1068     for chdisk in disk.children:
1069       if _RecursiveCheckIfLVMBased(chdisk):
1070         return True
1071   return disk.dev_type == constants.LD_LV
1072
1073
1074 class LUSetClusterParams(LogicalUnit):
1075   """Change the parameters of the cluster.
1076
1077   """
1078   HPATH = "cluster-modify"
1079   HTYPE = constants.HTYPE_CLUSTER
1080   _OP_REQP = []
1081
1082   def BuildHooksEnv(self):
1083     """Build hooks env.
1084
1085     """
1086     env = {
1087       "OP_TARGET": self.sstore.GetClusterName(),
1088       "NEW_VG_NAME": self.op.vg_name,
1089       }
1090     mn = self.sstore.GetMasterNode()
1091     return env, [mn], [mn]
1092
1093   def CheckPrereq(self):
1094     """Check prerequisites.
1095
1096     This checks whether the given params don't conflict and
1097     if the given volume group is valid.
1098
1099     """
1100     if not self.op.vg_name:
1101       instances = [self.cfg.GetInstanceInfo(name)
1102                    for name in self.cfg.GetInstanceList()]
1103       for inst in instances:
1104         for disk in inst.disks:
1105           if _RecursiveCheckIfLVMBased(disk):
1106             raise errors.OpPrereqError("Cannot disable lvm storage while"
1107                                        " lvm-based instances exist")
1108
1109     # if vg_name not None, checks given volume group on all nodes
1110     if self.op.vg_name:
1111       node_list = self.cfg.GetNodeList()
1112       vglist = rpc.call_vg_list(node_list)
1113       for node in node_list:
1114         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1115                                               constants.MIN_VG_SIZE)
1116         if vgstatus:
1117           raise errors.OpPrereqError("Error on node '%s': %s" %
1118                                      (node, vgstatus))
1119
1120   def Exec(self, feedback_fn):
1121     """Change the parameters of the cluster.
1122
1123     """
1124     if self.op.vg_name != self.cfg.GetVGName():
1125       self.cfg.SetVGName(self.op.vg_name)
1126     else:
1127       feedback_fn("Cluster LVM configuration already in desired"
1128                   " state, not changing")
1129
1130
1131 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1132   """Sleep and poll for an instance's disk to sync.
1133
1134   """
1135   if not instance.disks:
1136     return True
1137
1138   if not oneshot:
1139     proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1140
1141   node = instance.primary_node
1142
1143   for dev in instance.disks:
1144     cfgw.SetDiskID(dev, node)
1145
1146   retries = 0
1147   while True:
1148     max_time = 0
1149     done = True
1150     cumul_degraded = False
1151     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1152     if not rstats:
1153       proc.LogWarning("Can't get any data from node %s" % node)
1154       retries += 1
1155       if retries >= 10:
1156         raise errors.RemoteError("Can't contact node %s for mirror data,"
1157                                  " aborting." % node)
1158       time.sleep(6)
1159       continue
1160     retries = 0
1161     for i in range(len(rstats)):
1162       mstat = rstats[i]
1163       if mstat is None:
1164         proc.LogWarning("Can't compute data for node %s/%s" %
1165                         (node, instance.disks[i].iv_name))
1166         continue
1167       # we ignore the ldisk parameter
1168       perc_done, est_time, is_degraded, _ = mstat
1169       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1170       if perc_done is not None:
1171         done = False
1172         if est_time is not None:
1173           rem_time = "%d estimated seconds remaining" % est_time
1174           max_time = est_time
1175         else:
1176           rem_time = "no time estimate"
1177         proc.LogInfo("- device %s: %5.2f%% done, %s" %
1178                      (instance.disks[i].iv_name, perc_done, rem_time))
1179     if done or oneshot:
1180       break
1181
1182     time.sleep(min(60, max_time))
1183
1184   if done:
1185     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1186   return not cumul_degraded
1187
1188
1189 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1190   """Check that mirrors are not degraded.
1191
1192   The ldisk parameter, if True, will change the test from the
1193   is_degraded attribute (which represents overall non-ok status for
1194   the device(s)) to the ldisk (representing the local storage status).
1195
1196   """
1197   cfgw.SetDiskID(dev, node)
1198   if ldisk:
1199     idx = 6
1200   else:
1201     idx = 5
1202
1203   result = True
1204   if on_primary or dev.AssembleOnSecondary():
1205     rstats = rpc.call_blockdev_find(node, dev)
1206     if not rstats:
1207       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1208       result = False
1209     else:
1210       result = result and (not rstats[idx])
1211   if dev.children:
1212     for child in dev.children:
1213       result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1214
1215   return result
1216
1217
1218 class LUDiagnoseOS(NoHooksLU):
1219   """Logical unit for OS diagnose/query.
1220
1221   """
1222   _OP_REQP = ["output_fields", "names"]
1223   REQ_BGL = False
1224
1225   def ExpandNames(self):
1226     if self.op.names:
1227       raise errors.OpPrereqError("Selective OS query not supported")
1228
1229     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1230     _CheckOutputFields(static=[],
1231                        dynamic=self.dynamic_fields,
1232                        selected=self.op.output_fields)
1233
1234     # Lock all nodes, in shared mode
1235     self.needed_locks = {}
1236     self.share_locks[locking.LEVEL_NODE] = 1
1237     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1238
1239   def CheckPrereq(self):
1240     """Check prerequisites.
1241
1242     """
1243
1244   @staticmethod
1245   def _DiagnoseByOS(node_list, rlist):
1246     """Remaps a per-node return list into an a per-os per-node dictionary
1247
1248       Args:
1249         node_list: a list with the names of all nodes
1250         rlist: a map with node names as keys and OS objects as values
1251
1252       Returns:
1253         map: a map with osnames as keys and as value another map, with
1254              nodes as
1255              keys and list of OS objects as values
1256              e.g. {"debian-etch": {"node1": [<object>,...],
1257                                    "node2": [<object>,]}
1258                   }
1259
1260     """
1261     all_os = {}
1262     for node_name, nr in rlist.iteritems():
1263       if not nr:
1264         continue
1265       for os_obj in nr:
1266         if os_obj.name not in all_os:
1267           # build a list of nodes for this os containing empty lists
1268           # for each node in node_list
1269           all_os[os_obj.name] = {}
1270           for nname in node_list:
1271             all_os[os_obj.name][nname] = []
1272         all_os[os_obj.name][node_name].append(os_obj)
1273     return all_os
1274
1275   def Exec(self, feedback_fn):
1276     """Compute the list of OSes.
1277
1278     """
1279     node_list = self.acquired_locks[locking.LEVEL_NODE]
1280     node_data = rpc.call_os_diagnose(node_list)
1281     if node_data == False:
1282       raise errors.OpExecError("Can't gather the list of OSes")
1283     pol = self._DiagnoseByOS(node_list, node_data)
1284     output = []
1285     for os_name, os_data in pol.iteritems():
1286       row = []
1287       for field in self.op.output_fields:
1288         if field == "name":
1289           val = os_name
1290         elif field == "valid":
1291           val = utils.all([osl and osl[0] for osl in os_data.values()])
1292         elif field == "node_status":
1293           val = {}
1294           for node_name, nos_list in os_data.iteritems():
1295             val[node_name] = [(v.status, v.path) for v in nos_list]
1296         else:
1297           raise errors.ParameterError(field)
1298         row.append(val)
1299       output.append(row)
1300
1301     return output
1302
1303
1304 class LURemoveNode(LogicalUnit):
1305   """Logical unit for removing a node.
1306
1307   """
1308   HPATH = "node-remove"
1309   HTYPE = constants.HTYPE_NODE
1310   _OP_REQP = ["node_name"]
1311
1312   def BuildHooksEnv(self):
1313     """Build hooks env.
1314
1315     This doesn't run on the target node in the pre phase as a failed
1316     node would then be impossible to remove.
1317
1318     """
1319     env = {
1320       "OP_TARGET": self.op.node_name,
1321       "NODE_NAME": self.op.node_name,
1322       }
1323     all_nodes = self.cfg.GetNodeList()
1324     all_nodes.remove(self.op.node_name)
1325     return env, all_nodes, all_nodes
1326
1327   def CheckPrereq(self):
1328     """Check prerequisites.
1329
1330     This checks:
1331      - the node exists in the configuration
1332      - it does not have primary or secondary instances
1333      - it's not the master
1334
1335     Any errors are signalled by raising errors.OpPrereqError.
1336
1337     """
1338     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1339     if node is None:
1340       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1341
1342     instance_list = self.cfg.GetInstanceList()
1343
1344     masternode = self.sstore.GetMasterNode()
1345     if node.name == masternode:
1346       raise errors.OpPrereqError("Node is the master node,"
1347                                  " you need to failover first.")
1348
1349     for instance_name in instance_list:
1350       instance = self.cfg.GetInstanceInfo(instance_name)
1351       if node.name == instance.primary_node:
1352         raise errors.OpPrereqError("Instance %s still running on the node,"
1353                                    " please remove first." % instance_name)
1354       if node.name in instance.secondary_nodes:
1355         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1356                                    " please remove first." % instance_name)
1357     self.op.node_name = node.name
1358     self.node = node
1359
1360   def Exec(self, feedback_fn):
1361     """Removes the node from the cluster.
1362
1363     """
1364     node = self.node
1365     logger.Info("stopping the node daemon and removing configs from node %s" %
1366                 node.name)
1367
1368     self.context.RemoveNode(node.name)
1369
1370     rpc.call_node_leave_cluster(node.name)
1371
1372
1373 class LUQueryNodes(NoHooksLU):
1374   """Logical unit for querying nodes.
1375
1376   """
1377   _OP_REQP = ["output_fields", "names"]
1378   REQ_BGL = False
1379
1380   def ExpandNames(self):
1381     self.dynamic_fields = frozenset([
1382       "dtotal", "dfree",
1383       "mtotal", "mnode", "mfree",
1384       "bootid",
1385       "ctotal",
1386       ])
1387
1388     _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1389                                "pinst_list", "sinst_list",
1390                                "pip", "sip", "tags"],
1391                        dynamic=self.dynamic_fields,
1392                        selected=self.op.output_fields)
1393
1394     self.needed_locks = {}
1395     self.share_locks[locking.LEVEL_NODE] = 1
1396     # TODO: we could lock nodes only if the user asked for dynamic fields. For
1397     # that we need atomic ways to get info for a group of nodes from the
1398     # config, though.
1399     if not self.op.names:
1400       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1401     else:
1402       self.needed_locks[locking.LEVEL_NODE] = \
1403         _GetWantedNodes(self, self.op.names)
1404
1405   def CheckPrereq(self):
1406     """Check prerequisites.
1407
1408     """
1409     # This of course is valid only if we locked the nodes
1410     self.wanted = self.acquired_locks[locking.LEVEL_NODE]
1411
1412   def Exec(self, feedback_fn):
1413     """Computes the list of nodes and their attributes.
1414
1415     """
1416     nodenames = self.wanted
1417     nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1418
1419     # begin data gathering
1420
1421     if self.dynamic_fields.intersection(self.op.output_fields):
1422       live_data = {}
1423       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1424       for name in nodenames:
1425         nodeinfo = node_data.get(name, None)
1426         if nodeinfo:
1427           live_data[name] = {
1428             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1429             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1430             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1431             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1432             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1433             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1434             "bootid": nodeinfo['bootid'],
1435             }
1436         else:
1437           live_data[name] = {}
1438     else:
1439       live_data = dict.fromkeys(nodenames, {})
1440
1441     node_to_primary = dict([(name, set()) for name in nodenames])
1442     node_to_secondary = dict([(name, set()) for name in nodenames])
1443
1444     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1445                              "sinst_cnt", "sinst_list"))
1446     if inst_fields & frozenset(self.op.output_fields):
1447       instancelist = self.cfg.GetInstanceList()
1448
1449       for instance_name in instancelist:
1450         inst = self.cfg.GetInstanceInfo(instance_name)
1451         if inst.primary_node in node_to_primary:
1452           node_to_primary[inst.primary_node].add(inst.name)
1453         for secnode in inst.secondary_nodes:
1454           if secnode in node_to_secondary:
1455             node_to_secondary[secnode].add(inst.name)
1456
1457     # end data gathering
1458
1459     output = []
1460     for node in nodelist:
1461       node_output = []
1462       for field in self.op.output_fields:
1463         if field == "name":
1464           val = node.name
1465         elif field == "pinst_list":
1466           val = list(node_to_primary[node.name])
1467         elif field == "sinst_list":
1468           val = list(node_to_secondary[node.name])
1469         elif field == "pinst_cnt":
1470           val = len(node_to_primary[node.name])
1471         elif field == "sinst_cnt":
1472           val = len(node_to_secondary[node.name])
1473         elif field == "pip":
1474           val = node.primary_ip
1475         elif field == "sip":
1476           val = node.secondary_ip
1477         elif field == "tags":
1478           val = list(node.GetTags())
1479         elif field in self.dynamic_fields:
1480           val = live_data[node.name].get(field, None)
1481         else:
1482           raise errors.ParameterError(field)
1483         node_output.append(val)
1484       output.append(node_output)
1485
1486     return output
1487
1488
1489 class LUQueryNodeVolumes(NoHooksLU):
1490   """Logical unit for getting volumes on node(s).
1491
1492   """
1493   _OP_REQP = ["nodes", "output_fields"]
1494   REQ_BGL = False
1495
1496   def ExpandNames(self):
1497     _CheckOutputFields(static=["node"],
1498                        dynamic=["phys", "vg", "name", "size", "instance"],
1499                        selected=self.op.output_fields)
1500
1501     self.needed_locks = {}
1502     self.share_locks[locking.LEVEL_NODE] = 1
1503     if not self.op.nodes:
1504       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1505     else:
1506       self.needed_locks[locking.LEVEL_NODE] = \
1507         _GetWantedNodes(self, self.op.nodes)
1508
1509   def CheckPrereq(self):
1510     """Check prerequisites.
1511
1512     This checks that the fields required are valid output fields.
1513
1514     """
1515     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1516
1517   def Exec(self, feedback_fn):
1518     """Computes the list of nodes and their attributes.
1519
1520     """
1521     nodenames = self.nodes
1522     volumes = rpc.call_node_volumes(nodenames)
1523
1524     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1525              in self.cfg.GetInstanceList()]
1526
1527     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1528
1529     output = []
1530     for node in nodenames:
1531       if node not in volumes or not volumes[node]:
1532         continue
1533
1534       node_vols = volumes[node][:]
1535       node_vols.sort(key=lambda vol: vol['dev'])
1536
1537       for vol in node_vols:
1538         node_output = []
1539         for field in self.op.output_fields:
1540           if field == "node":
1541             val = node
1542           elif field == "phys":
1543             val = vol['dev']
1544           elif field == "vg":
1545             val = vol['vg']
1546           elif field == "name":
1547             val = vol['name']
1548           elif field == "size":
1549             val = int(float(vol['size']))
1550           elif field == "instance":
1551             for inst in ilist:
1552               if node not in lv_by_node[inst]:
1553                 continue
1554               if vol['name'] in lv_by_node[inst][node]:
1555                 val = inst.name
1556                 break
1557             else:
1558               val = '-'
1559           else:
1560             raise errors.ParameterError(field)
1561           node_output.append(str(val))
1562
1563         output.append(node_output)
1564
1565     return output
1566
1567
1568 class LUAddNode(LogicalUnit):
1569   """Logical unit for adding node to the cluster.
1570
1571   """
1572   HPATH = "node-add"
1573   HTYPE = constants.HTYPE_NODE
1574   _OP_REQP = ["node_name"]
1575
1576   def BuildHooksEnv(self):
1577     """Build hooks env.
1578
1579     This will run on all nodes before, and on all nodes + the new node after.
1580
1581     """
1582     env = {
1583       "OP_TARGET": self.op.node_name,
1584       "NODE_NAME": self.op.node_name,
1585       "NODE_PIP": self.op.primary_ip,
1586       "NODE_SIP": self.op.secondary_ip,
1587       }
1588     nodes_0 = self.cfg.GetNodeList()
1589     nodes_1 = nodes_0 + [self.op.node_name, ]
1590     return env, nodes_0, nodes_1
1591
1592   def CheckPrereq(self):
1593     """Check prerequisites.
1594
1595     This checks:
1596      - the new node is not already in the config
1597      - it is resolvable
1598      - its parameters (single/dual homed) matches the cluster
1599
1600     Any errors are signalled by raising errors.OpPrereqError.
1601
1602     """
1603     node_name = self.op.node_name
1604     cfg = self.cfg
1605
1606     dns_data = utils.HostInfo(node_name)
1607
1608     node = dns_data.name
1609     primary_ip = self.op.primary_ip = dns_data.ip
1610     secondary_ip = getattr(self.op, "secondary_ip", None)
1611     if secondary_ip is None:
1612       secondary_ip = primary_ip
1613     if not utils.IsValidIP(secondary_ip):
1614       raise errors.OpPrereqError("Invalid secondary IP given")
1615     self.op.secondary_ip = secondary_ip
1616
1617     node_list = cfg.GetNodeList()
1618     if not self.op.readd and node in node_list:
1619       raise errors.OpPrereqError("Node %s is already in the configuration" %
1620                                  node)
1621     elif self.op.readd and node not in node_list:
1622       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1623
1624     for existing_node_name in node_list:
1625       existing_node = cfg.GetNodeInfo(existing_node_name)
1626
1627       if self.op.readd and node == existing_node_name:
1628         if (existing_node.primary_ip != primary_ip or
1629             existing_node.secondary_ip != secondary_ip):
1630           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1631                                      " address configuration as before")
1632         continue
1633
1634       if (existing_node.primary_ip == primary_ip or
1635           existing_node.secondary_ip == primary_ip or
1636           existing_node.primary_ip == secondary_ip or
1637           existing_node.secondary_ip == secondary_ip):
1638         raise errors.OpPrereqError("New node ip address(es) conflict with"
1639                                    " existing node %s" % existing_node.name)
1640
1641     # check that the type of the node (single versus dual homed) is the
1642     # same as for the master
1643     myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1644     master_singlehomed = myself.secondary_ip == myself.primary_ip
1645     newbie_singlehomed = secondary_ip == primary_ip
1646     if master_singlehomed != newbie_singlehomed:
1647       if master_singlehomed:
1648         raise errors.OpPrereqError("The master has no private ip but the"
1649                                    " new node has one")
1650       else:
1651         raise errors.OpPrereqError("The master has a private ip but the"
1652                                    " new node doesn't have one")
1653
1654     # checks reachablity
1655     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1656       raise errors.OpPrereqError("Node not reachable by ping")
1657
1658     if not newbie_singlehomed:
1659       # check reachability from my secondary ip to newbie's secondary ip
1660       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1661                            source=myself.secondary_ip):
1662         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1663                                    " based ping to noded port")
1664
1665     self.new_node = objects.Node(name=node,
1666                                  primary_ip=primary_ip,
1667                                  secondary_ip=secondary_ip)
1668
1669   def Exec(self, feedback_fn):
1670     """Adds the new node to the cluster.
1671
1672     """
1673     new_node = self.new_node
1674     node = new_node.name
1675
1676     # check connectivity
1677     result = rpc.call_version([node])[node]
1678     if result:
1679       if constants.PROTOCOL_VERSION == result:
1680         logger.Info("communication to node %s fine, sw version %s match" %
1681                     (node, result))
1682       else:
1683         raise errors.OpExecError("Version mismatch master version %s,"
1684                                  " node version %s" %
1685                                  (constants.PROTOCOL_VERSION, result))
1686     else:
1687       raise errors.OpExecError("Cannot get version from the new node")
1688
1689     # setup ssh on node
1690     logger.Info("copy ssh key to node %s" % node)
1691     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1692     keyarray = []
1693     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1694                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1695                 priv_key, pub_key]
1696
1697     for i in keyfiles:
1698       f = open(i, 'r')
1699       try:
1700         keyarray.append(f.read())
1701       finally:
1702         f.close()
1703
1704     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1705                                keyarray[3], keyarray[4], keyarray[5])
1706
1707     if not result:
1708       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1709
1710     # Add node to our /etc/hosts, and add key to known_hosts
1711     utils.AddHostToEtcHosts(new_node.name)
1712
1713     if new_node.secondary_ip != new_node.primary_ip:
1714       if not rpc.call_node_tcp_ping(new_node.name,
1715                                     constants.LOCALHOST_IP_ADDRESS,
1716                                     new_node.secondary_ip,
1717                                     constants.DEFAULT_NODED_PORT,
1718                                     10, False):
1719         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1720                                  " you gave (%s). Please fix and re-run this"
1721                                  " command." % new_node.secondary_ip)
1722
1723     node_verify_list = [self.sstore.GetMasterNode()]
1724     node_verify_param = {
1725       'nodelist': [node],
1726       # TODO: do a node-net-test as well?
1727     }
1728
1729     result = rpc.call_node_verify(node_verify_list, node_verify_param)
1730     for verifier in node_verify_list:
1731       if not result[verifier]:
1732         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1733                                  " for remote verification" % verifier)
1734       if result[verifier]['nodelist']:
1735         for failed in result[verifier]['nodelist']:
1736           feedback_fn("ssh/hostname verification failed %s -> %s" %
1737                       (verifier, result[verifier]['nodelist'][failed]))
1738         raise errors.OpExecError("ssh/hostname verification failed.")
1739
1740     # Distribute updated /etc/hosts and known_hosts to all nodes,
1741     # including the node just added
1742     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1743     dist_nodes = self.cfg.GetNodeList()
1744     if not self.op.readd:
1745       dist_nodes.append(node)
1746     if myself.name in dist_nodes:
1747       dist_nodes.remove(myself.name)
1748
1749     logger.Debug("Copying hosts and known_hosts to all nodes")
1750     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1751       result = rpc.call_upload_file(dist_nodes, fname)
1752       for to_node in dist_nodes:
1753         if not result[to_node]:
1754           logger.Error("copy of file %s to node %s failed" %
1755                        (fname, to_node))
1756
1757     to_copy = self.sstore.GetFileList()
1758     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1759       to_copy.append(constants.VNC_PASSWORD_FILE)
1760     for fname in to_copy:
1761       result = rpc.call_upload_file([node], fname)
1762       if not result[node]:
1763         logger.Error("could not copy file %s to node %s" % (fname, node))
1764
1765     if self.op.readd:
1766       self.context.ReaddNode(new_node)
1767     else:
1768       self.context.AddNode(new_node)
1769
1770
1771 class LUQueryClusterInfo(NoHooksLU):
1772   """Query cluster configuration.
1773
1774   """
1775   _OP_REQP = []
1776   REQ_MASTER = False
1777   REQ_BGL = False
1778
1779   def ExpandNames(self):
1780     self.needed_locks = {}
1781
1782   def CheckPrereq(self):
1783     """No prerequsites needed for this LU.
1784
1785     """
1786     pass
1787
1788   def Exec(self, feedback_fn):
1789     """Return cluster config.
1790
1791     """
1792     result = {
1793       "name": self.sstore.GetClusterName(),
1794       "software_version": constants.RELEASE_VERSION,
1795       "protocol_version": constants.PROTOCOL_VERSION,
1796       "config_version": constants.CONFIG_VERSION,
1797       "os_api_version": constants.OS_API_VERSION,
1798       "export_version": constants.EXPORT_VERSION,
1799       "master": self.sstore.GetMasterNode(),
1800       "architecture": (platform.architecture()[0], platform.machine()),
1801       "hypervisor_type": self.sstore.GetHypervisorType(),
1802       }
1803
1804     return result
1805
1806
1807 class LUDumpClusterConfig(NoHooksLU):
1808   """Return a text-representation of the cluster-config.
1809
1810   """
1811   _OP_REQP = []
1812   REQ_BGL = False
1813
1814   def ExpandNames(self):
1815     self.needed_locks = {}
1816
1817   def CheckPrereq(self):
1818     """No prerequisites.
1819
1820     """
1821     pass
1822
1823   def Exec(self, feedback_fn):
1824     """Dump a representation of the cluster config to the standard output.
1825
1826     """
1827     return self.cfg.DumpConfig()
1828
1829
1830 class LUActivateInstanceDisks(NoHooksLU):
1831   """Bring up an instance's disks.
1832
1833   """
1834   _OP_REQP = ["instance_name"]
1835
1836   def CheckPrereq(self):
1837     """Check prerequisites.
1838
1839     This checks that the instance is in the cluster.
1840
1841     """
1842     instance = self.cfg.GetInstanceInfo(
1843       self.cfg.ExpandInstanceName(self.op.instance_name))
1844     if instance is None:
1845       raise errors.OpPrereqError("Instance '%s' not known" %
1846                                  self.op.instance_name)
1847     self.instance = instance
1848
1849
1850   def Exec(self, feedback_fn):
1851     """Activate the disks.
1852
1853     """
1854     disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1855     if not disks_ok:
1856       raise errors.OpExecError("Cannot activate block devices")
1857
1858     return disks_info
1859
1860
1861 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1862   """Prepare the block devices for an instance.
1863
1864   This sets up the block devices on all nodes.
1865
1866   Args:
1867     instance: a ganeti.objects.Instance object
1868     ignore_secondaries: if true, errors on secondary nodes won't result
1869                         in an error return from the function
1870
1871   Returns:
1872     false if the operation failed
1873     list of (host, instance_visible_name, node_visible_name) if the operation
1874          suceeded with the mapping from node devices to instance devices
1875   """
1876   device_info = []
1877   disks_ok = True
1878   iname = instance.name
1879   # With the two passes mechanism we try to reduce the window of
1880   # opportunity for the race condition of switching DRBD to primary
1881   # before handshaking occured, but we do not eliminate it
1882
1883   # The proper fix would be to wait (with some limits) until the
1884   # connection has been made and drbd transitions from WFConnection
1885   # into any other network-connected state (Connected, SyncTarget,
1886   # SyncSource, etc.)
1887
1888   # 1st pass, assemble on all nodes in secondary mode
1889   for inst_disk in instance.disks:
1890     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1891       cfg.SetDiskID(node_disk, node)
1892       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1893       if not result:
1894         logger.Error("could not prepare block device %s on node %s"
1895                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1896         if not ignore_secondaries:
1897           disks_ok = False
1898
1899   # FIXME: race condition on drbd migration to primary
1900
1901   # 2nd pass, do only the primary node
1902   for inst_disk in instance.disks:
1903     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1904       if node != instance.primary_node:
1905         continue
1906       cfg.SetDiskID(node_disk, node)
1907       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1908       if not result:
1909         logger.Error("could not prepare block device %s on node %s"
1910                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1911         disks_ok = False
1912     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1913
1914   # leave the disks configured for the primary node
1915   # this is a workaround that would be fixed better by
1916   # improving the logical/physical id handling
1917   for disk in instance.disks:
1918     cfg.SetDiskID(disk, instance.primary_node)
1919
1920   return disks_ok, device_info
1921
1922
1923 def _StartInstanceDisks(cfg, instance, force):
1924   """Start the disks of an instance.
1925
1926   """
1927   disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1928                                            ignore_secondaries=force)
1929   if not disks_ok:
1930     _ShutdownInstanceDisks(instance, cfg)
1931     if force is not None and not force:
1932       logger.Error("If the message above refers to a secondary node,"
1933                    " you can retry the operation using '--force'.")
1934     raise errors.OpExecError("Disk consistency error")
1935
1936
1937 class LUDeactivateInstanceDisks(NoHooksLU):
1938   """Shutdown an instance's disks.
1939
1940   """
1941   _OP_REQP = ["instance_name"]
1942
1943   def CheckPrereq(self):
1944     """Check prerequisites.
1945
1946     This checks that the instance is in the cluster.
1947
1948     """
1949     instance = self.cfg.GetInstanceInfo(
1950       self.cfg.ExpandInstanceName(self.op.instance_name))
1951     if instance is None:
1952       raise errors.OpPrereqError("Instance '%s' not known" %
1953                                  self.op.instance_name)
1954     self.instance = instance
1955
1956   def Exec(self, feedback_fn):
1957     """Deactivate the disks
1958
1959     """
1960     instance = self.instance
1961     ins_l = rpc.call_instance_list([instance.primary_node])
1962     ins_l = ins_l[instance.primary_node]
1963     if not type(ins_l) is list:
1964       raise errors.OpExecError("Can't contact node '%s'" %
1965                                instance.primary_node)
1966
1967     if self.instance.name in ins_l:
1968       raise errors.OpExecError("Instance is running, can't shutdown"
1969                                " block devices.")
1970
1971     _ShutdownInstanceDisks(instance, self.cfg)
1972
1973
1974 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1975   """Shutdown block devices of an instance.
1976
1977   This does the shutdown on all nodes of the instance.
1978
1979   If the ignore_primary is false, errors on the primary node are
1980   ignored.
1981
1982   """
1983   result = True
1984   for disk in instance.disks:
1985     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1986       cfg.SetDiskID(top_disk, node)
1987       if not rpc.call_blockdev_shutdown(node, top_disk):
1988         logger.Error("could not shutdown block device %s on node %s" %
1989                      (disk.iv_name, node))
1990         if not ignore_primary or node != instance.primary_node:
1991           result = False
1992   return result
1993
1994
1995 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1996   """Checks if a node has enough free memory.
1997
1998   This function check if a given node has the needed amount of free
1999   memory. In case the node has less memory or we cannot get the
2000   information from the node, this function raise an OpPrereqError
2001   exception.
2002
2003   Args:
2004     - cfg: a ConfigWriter instance
2005     - node: the node name
2006     - reason: string to use in the error message
2007     - requested: the amount of memory in MiB
2008
2009   """
2010   nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2011   if not nodeinfo or not isinstance(nodeinfo, dict):
2012     raise errors.OpPrereqError("Could not contact node %s for resource"
2013                              " information" % (node,))
2014
2015   free_mem = nodeinfo[node].get('memory_free')
2016   if not isinstance(free_mem, int):
2017     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2018                              " was '%s'" % (node, free_mem))
2019   if requested > free_mem:
2020     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2021                              " needed %s MiB, available %s MiB" %
2022                              (node, reason, requested, free_mem))
2023
2024
2025 class LUStartupInstance(LogicalUnit):
2026   """Starts an instance.
2027
2028   """
2029   HPATH = "instance-start"
2030   HTYPE = constants.HTYPE_INSTANCE
2031   _OP_REQP = ["instance_name", "force"]
2032   REQ_BGL = False
2033
2034   def ExpandNames(self):
2035     self._ExpandAndLockInstance()
2036     self.needed_locks[locking.LEVEL_NODE] = []
2037     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2038
2039   def DeclareLocks(self, level):
2040     if level == locking.LEVEL_NODE:
2041       self._LockInstancesNodes()
2042
2043   def BuildHooksEnv(self):
2044     """Build hooks env.
2045
2046     This runs on master, primary and secondary nodes of the instance.
2047
2048     """
2049     env = {
2050       "FORCE": self.op.force,
2051       }
2052     env.update(_BuildInstanceHookEnvByObject(self.instance))
2053     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2054           list(self.instance.secondary_nodes))
2055     return env, nl, nl
2056
2057   def CheckPrereq(self):
2058     """Check prerequisites.
2059
2060     This checks that the instance is in the cluster.
2061
2062     """
2063     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2064     assert self.instance is not None, \
2065       "Cannot retrieve locked instance %s" % self.op.instance_name
2066
2067     # check bridges existance
2068     _CheckInstanceBridgesExist(instance)
2069
2070     _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2071                          "starting instance %s" % instance.name,
2072                          instance.memory)
2073
2074   def Exec(self, feedback_fn):
2075     """Start the instance.
2076
2077     """
2078     instance = self.instance
2079     force = self.op.force
2080     extra_args = getattr(self.op, "extra_args", "")
2081
2082     self.cfg.MarkInstanceUp(instance.name)
2083
2084     node_current = instance.primary_node
2085
2086     _StartInstanceDisks(self.cfg, instance, force)
2087
2088     if not rpc.call_instance_start(node_current, instance, extra_args):
2089       _ShutdownInstanceDisks(instance, self.cfg)
2090       raise errors.OpExecError("Could not start instance")
2091
2092
2093 class LURebootInstance(LogicalUnit):
2094   """Reboot an instance.
2095
2096   """
2097   HPATH = "instance-reboot"
2098   HTYPE = constants.HTYPE_INSTANCE
2099   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2100   REQ_BGL = False
2101
2102   def ExpandNames(self):
2103     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2104                                    constants.INSTANCE_REBOOT_HARD,
2105                                    constants.INSTANCE_REBOOT_FULL]:
2106       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2107                                   (constants.INSTANCE_REBOOT_SOFT,
2108                                    constants.INSTANCE_REBOOT_HARD,
2109                                    constants.INSTANCE_REBOOT_FULL))
2110     self._ExpandAndLockInstance()
2111     self.needed_locks[locking.LEVEL_NODE] = []
2112     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2113
2114   def DeclareLocks(self, level):
2115     if level == locking.LEVEL_NODE:
2116       primary_only = not constants.INSTANCE_REBOOT_FULL
2117       self._LockInstancesNodes(primary_only=primary_only)
2118
2119   def BuildHooksEnv(self):
2120     """Build hooks env.
2121
2122     This runs on master, primary and secondary nodes of the instance.
2123
2124     """
2125     env = {
2126       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2127       }
2128     env.update(_BuildInstanceHookEnvByObject(self.instance))
2129     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2130           list(self.instance.secondary_nodes))
2131     return env, nl, nl
2132
2133   def CheckPrereq(self):
2134     """Check prerequisites.
2135
2136     This checks that the instance is in the cluster.
2137
2138     """
2139     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2140     assert self.instance is not None, \
2141       "Cannot retrieve locked instance %s" % self.op.instance_name
2142
2143     # check bridges existance
2144     _CheckInstanceBridgesExist(instance)
2145
2146   def Exec(self, feedback_fn):
2147     """Reboot the instance.
2148
2149     """
2150     instance = self.instance
2151     ignore_secondaries = self.op.ignore_secondaries
2152     reboot_type = self.op.reboot_type
2153     extra_args = getattr(self.op, "extra_args", "")
2154
2155     node_current = instance.primary_node
2156
2157     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2158                        constants.INSTANCE_REBOOT_HARD]:
2159       if not rpc.call_instance_reboot(node_current, instance,
2160                                       reboot_type, extra_args):
2161         raise errors.OpExecError("Could not reboot instance")
2162     else:
2163       if not rpc.call_instance_shutdown(node_current, instance):
2164         raise errors.OpExecError("could not shutdown instance for full reboot")
2165       _ShutdownInstanceDisks(instance, self.cfg)
2166       _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2167       if not rpc.call_instance_start(node_current, instance, extra_args):
2168         _ShutdownInstanceDisks(instance, self.cfg)
2169         raise errors.OpExecError("Could not start instance for full reboot")
2170
2171     self.cfg.MarkInstanceUp(instance.name)
2172
2173
2174 class LUShutdownInstance(LogicalUnit):
2175   """Shutdown an instance.
2176
2177   """
2178   HPATH = "instance-stop"
2179   HTYPE = constants.HTYPE_INSTANCE
2180   _OP_REQP = ["instance_name"]
2181   REQ_BGL = False
2182
2183   def ExpandNames(self):
2184     self._ExpandAndLockInstance()
2185     self.needed_locks[locking.LEVEL_NODE] = []
2186     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2187
2188   def DeclareLocks(self, level):
2189     if level == locking.LEVEL_NODE:
2190       self._LockInstancesNodes()
2191
2192   def BuildHooksEnv(self):
2193     """Build hooks env.
2194
2195     This runs on master, primary and secondary nodes of the instance.
2196
2197     """
2198     env = _BuildInstanceHookEnvByObject(self.instance)
2199     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2200           list(self.instance.secondary_nodes))
2201     return env, nl, nl
2202
2203   def CheckPrereq(self):
2204     """Check prerequisites.
2205
2206     This checks that the instance is in the cluster.
2207
2208     """
2209     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2210     assert self.instance is not None, \
2211       "Cannot retrieve locked instance %s" % self.op.instance_name
2212
2213   def Exec(self, feedback_fn):
2214     """Shutdown the instance.
2215
2216     """
2217     instance = self.instance
2218     node_current = instance.primary_node
2219     self.cfg.MarkInstanceDown(instance.name)
2220     if not rpc.call_instance_shutdown(node_current, instance):
2221       logger.Error("could not shutdown instance")
2222
2223     _ShutdownInstanceDisks(instance, self.cfg)
2224
2225
2226 class LUReinstallInstance(LogicalUnit):
2227   """Reinstall an instance.
2228
2229   """
2230   HPATH = "instance-reinstall"
2231   HTYPE = constants.HTYPE_INSTANCE
2232   _OP_REQP = ["instance_name"]
2233   REQ_BGL = False
2234
2235   def ExpandNames(self):
2236     self._ExpandAndLockInstance()
2237     self.needed_locks[locking.LEVEL_NODE] = []
2238     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2239
2240   def DeclareLocks(self, level):
2241     if level == locking.LEVEL_NODE:
2242       self._LockInstancesNodes()
2243
2244   def BuildHooksEnv(self):
2245     """Build hooks env.
2246
2247     This runs on master, primary and secondary nodes of the instance.
2248
2249     """
2250     env = _BuildInstanceHookEnvByObject(self.instance)
2251     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2252           list(self.instance.secondary_nodes))
2253     return env, nl, nl
2254
2255   def CheckPrereq(self):
2256     """Check prerequisites.
2257
2258     This checks that the instance is in the cluster and is not running.
2259
2260     """
2261     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2262     assert instance is not None, \
2263       "Cannot retrieve locked instance %s" % self.op.instance_name
2264
2265     if instance.disk_template == constants.DT_DISKLESS:
2266       raise errors.OpPrereqError("Instance '%s' has no disks" %
2267                                  self.op.instance_name)
2268     if instance.status != "down":
2269       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2270                                  self.op.instance_name)
2271     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2272     if remote_info:
2273       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2274                                  (self.op.instance_name,
2275                                   instance.primary_node))
2276
2277     self.op.os_type = getattr(self.op, "os_type", None)
2278     if self.op.os_type is not None:
2279       # OS verification
2280       pnode = self.cfg.GetNodeInfo(
2281         self.cfg.ExpandNodeName(instance.primary_node))
2282       if pnode is None:
2283         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2284                                    self.op.pnode)
2285       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2286       if not os_obj:
2287         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2288                                    " primary node"  % self.op.os_type)
2289
2290     self.instance = instance
2291
2292   def Exec(self, feedback_fn):
2293     """Reinstall the instance.
2294
2295     """
2296     inst = self.instance
2297
2298     if self.op.os_type is not None:
2299       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2300       inst.os = self.op.os_type
2301       self.cfg.AddInstance(inst)
2302
2303     _StartInstanceDisks(self.cfg, inst, None)
2304     try:
2305       feedback_fn("Running the instance OS create scripts...")
2306       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2307         raise errors.OpExecError("Could not install OS for instance %s"
2308                                  " on node %s" %
2309                                  (inst.name, inst.primary_node))
2310     finally:
2311       _ShutdownInstanceDisks(inst, self.cfg)
2312
2313
2314 class LURenameInstance(LogicalUnit):
2315   """Rename an instance.
2316
2317   """
2318   HPATH = "instance-rename"
2319   HTYPE = constants.HTYPE_INSTANCE
2320   _OP_REQP = ["instance_name", "new_name"]
2321
2322   def BuildHooksEnv(self):
2323     """Build hooks env.
2324
2325     This runs on master, primary and secondary nodes of the instance.
2326
2327     """
2328     env = _BuildInstanceHookEnvByObject(self.instance)
2329     env["INSTANCE_NEW_NAME"] = self.op.new_name
2330     nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2331           list(self.instance.secondary_nodes))
2332     return env, nl, nl
2333
2334   def CheckPrereq(self):
2335     """Check prerequisites.
2336
2337     This checks that the instance is in the cluster and is not running.
2338
2339     """
2340     instance = self.cfg.GetInstanceInfo(
2341       self.cfg.ExpandInstanceName(self.op.instance_name))
2342     if instance is None:
2343       raise errors.OpPrereqError("Instance '%s' not known" %
2344                                  self.op.instance_name)
2345     if instance.status != "down":
2346       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2347                                  self.op.instance_name)
2348     remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2349     if remote_info:
2350       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2351                                  (self.op.instance_name,
2352                                   instance.primary_node))
2353     self.instance = instance
2354
2355     # new name verification
2356     name_info = utils.HostInfo(self.op.new_name)
2357
2358     self.op.new_name = new_name = name_info.name
2359     instance_list = self.cfg.GetInstanceList()
2360     if new_name in instance_list:
2361       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2362                                  new_name)
2363
2364     if not getattr(self.op, "ignore_ip", False):
2365       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2366         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2367                                    (name_info.ip, new_name))
2368
2369
2370   def Exec(self, feedback_fn):
2371     """Reinstall the instance.
2372
2373     """
2374     inst = self.instance
2375     old_name = inst.name
2376
2377     if inst.disk_template == constants.DT_FILE:
2378       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2379
2380     self.cfg.RenameInstance(inst.name, self.op.new_name)
2381     # Change the instance lock. This is definitely safe while we hold the BGL
2382     self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2383     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2384
2385     # re-read the instance from the configuration after rename
2386     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2387
2388     if inst.disk_template == constants.DT_FILE:
2389       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2390       result = rpc.call_file_storage_dir_rename(inst.primary_node,
2391                                                 old_file_storage_dir,
2392                                                 new_file_storage_dir)
2393
2394       if not result:
2395         raise errors.OpExecError("Could not connect to node '%s' to rename"
2396                                  " directory '%s' to '%s' (but the instance"
2397                                  " has been renamed in Ganeti)" % (
2398                                  inst.primary_node, old_file_storage_dir,
2399                                  new_file_storage_dir))
2400
2401       if not result[0]:
2402         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2403                                  " (but the instance has been renamed in"
2404                                  " Ganeti)" % (old_file_storage_dir,
2405                                                new_file_storage_dir))
2406
2407     _StartInstanceDisks(self.cfg, inst, None)
2408     try:
2409       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2410                                           "sda", "sdb"):
2411         msg = ("Could not run OS rename script for instance %s on node %s"
2412                " (but the instance has been renamed in Ganeti)" %
2413                (inst.name, inst.primary_node))
2414         logger.Error(msg)
2415     finally:
2416       _ShutdownInstanceDisks(inst, self.cfg)
2417
2418
2419 class LURemoveInstance(LogicalUnit):
2420   """Remove an instance.
2421
2422   """
2423   HPATH = "instance-remove"
2424   HTYPE = constants.HTYPE_INSTANCE
2425   _OP_REQP = ["instance_name", "ignore_failures"]
2426
2427   def BuildHooksEnv(self):
2428     """Build hooks env.
2429
2430     This runs on master, primary and secondary nodes of the instance.
2431
2432     """
2433     env = _BuildInstanceHookEnvByObject(self.instance)
2434     nl = [self.sstore.GetMasterNode()]
2435     return env, nl, nl
2436
2437   def CheckPrereq(self):
2438     """Check prerequisites.
2439
2440     This checks that the instance is in the cluster.
2441
2442     """
2443     instance = self.cfg.GetInstanceInfo(
2444       self.cfg.ExpandInstanceName(self.op.instance_name))
2445     if instance is None:
2446       raise errors.OpPrereqError("Instance '%s' not known" %
2447                                  self.op.instance_name)
2448     self.instance = instance
2449
2450   def Exec(self, feedback_fn):
2451     """Remove the instance.
2452
2453     """
2454     instance = self.instance
2455     logger.Info("shutting down instance %s on node %s" %
2456                 (instance.name, instance.primary_node))
2457
2458     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2459       if self.op.ignore_failures:
2460         feedback_fn("Warning: can't shutdown instance")
2461       else:
2462         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2463                                  (instance.name, instance.primary_node))
2464
2465     logger.Info("removing block devices for instance %s" % instance.name)
2466
2467     if not _RemoveDisks(instance, self.cfg):
2468       if self.op.ignore_failures:
2469         feedback_fn("Warning: can't remove instance's disks")
2470       else:
2471         raise errors.OpExecError("Can't remove instance's disks")
2472
2473     logger.Info("removing instance %s out of cluster config" % instance.name)
2474
2475     self.cfg.RemoveInstance(instance.name)
2476     # Remove the new instance from the Ganeti Lock Manager
2477     self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2478
2479
2480 class LUQueryInstances(NoHooksLU):
2481   """Logical unit for querying instances.
2482
2483   """
2484   _OP_REQP = ["output_fields", "names"]
2485   REQ_BGL = False
2486
2487   def ExpandNames(self):
2488     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2489     _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2490                                "admin_state", "admin_ram",
2491                                "disk_template", "ip", "mac", "bridge",
2492                                "sda_size", "sdb_size", "vcpus", "tags",
2493                                "auto_balance",
2494                                "network_port", "kernel_path", "initrd_path",
2495                                "hvm_boot_order", "hvm_acpi", "hvm_pae",
2496                                "hvm_cdrom_image_path", "hvm_nic_type",
2497                                "hvm_disk_type", "vnc_bind_address"],
2498                        dynamic=self.dynamic_fields,
2499                        selected=self.op.output_fields)
2500
2501     self.needed_locks = {}
2502     self.share_locks[locking.LEVEL_INSTANCE] = 1
2503     self.share_locks[locking.LEVEL_NODE] = 1
2504
2505     # TODO: we could lock instances (and nodes) only if the user asked for
2506     # dynamic fields. For that we need atomic ways to get info for a group of
2507     # instances from the config, though.
2508     if not self.op.names:
2509       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
2510     else:
2511       self.needed_locks[locking.LEVEL_INSTANCE] = \
2512         _GetWantedInstances(self, self.op.names)
2513
2514     self.needed_locks[locking.LEVEL_NODE] = []
2515     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2516
2517   def DeclareLocks(self, level):
2518     # TODO: locking of nodes could be avoided when not querying them
2519     if level == locking.LEVEL_NODE:
2520       self._LockInstancesNodes()
2521
2522   def CheckPrereq(self):
2523     """Check prerequisites.
2524
2525     """
2526     # This of course is valid only if we locked the instances
2527     self.wanted = self.acquired_locks[locking.LEVEL_INSTANCE]
2528
2529   def Exec(self, feedback_fn):
2530     """Computes the list of nodes and their attributes.
2531
2532     """
2533     instance_names = self.wanted
2534     instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2535                      in instance_names]
2536
2537     # begin data gathering
2538
2539     nodes = frozenset([inst.primary_node for inst in instance_list])
2540
2541     bad_nodes = []
2542     if self.dynamic_fields.intersection(self.op.output_fields):
2543       live_data = {}
2544       node_data = rpc.call_all_instances_info(nodes)
2545       for name in nodes:
2546         result = node_data[name]
2547         if result:
2548           live_data.update(result)
2549         elif result == False:
2550           bad_nodes.append(name)
2551         # else no instance is alive
2552     else:
2553       live_data = dict([(name, {}) for name in instance_names])
2554
2555     # end data gathering
2556
2557     output = []
2558     for instance in instance_list:
2559       iout = []
2560       for field in self.op.output_fields:
2561         if field == "name":
2562           val = instance.name
2563         elif field == "os":
2564           val = instance.os
2565         elif field == "pnode":
2566           val = instance.primary_node
2567         elif field == "snodes":
2568           val = list(instance.secondary_nodes)
2569         elif field == "admin_state":
2570           val = (instance.status != "down")
2571         elif field == "oper_state":
2572           if instance.primary_node in bad_nodes:
2573             val = None
2574           else:
2575             val = bool(live_data.get(instance.name))
2576         elif field == "status":
2577           if instance.primary_node in bad_nodes:
2578             val = "ERROR_nodedown"
2579           else:
2580             running = bool(live_data.get(instance.name))
2581             if running:
2582               if instance.status != "down":
2583                 val = "running"
2584               else:
2585                 val = "ERROR_up"
2586             else:
2587               if instance.status != "down":
2588                 val = "ERROR_down"
2589               else:
2590                 val = "ADMIN_down"
2591         elif field == "admin_ram":
2592           val = instance.memory
2593         elif field == "oper_ram":
2594           if instance.primary_node in bad_nodes:
2595             val = None
2596           elif instance.name in live_data:
2597             val = live_data[instance.name].get("memory", "?")
2598           else:
2599             val = "-"
2600         elif field == "disk_template":
2601           val = instance.disk_template
2602         elif field == "ip":
2603           val = instance.nics[0].ip
2604         elif field == "bridge":
2605           val = instance.nics[0].bridge
2606         elif field == "mac":
2607           val = instance.nics[0].mac
2608         elif field == "sda_size" or field == "sdb_size":
2609           disk = instance.FindDisk(field[:3])
2610           if disk is None:
2611             val = None
2612           else:
2613             val = disk.size
2614         elif field == "vcpus":
2615           val = instance.vcpus
2616         elif field == "tags":
2617           val = list(instance.GetTags())
2618         elif field in ("network_port", "kernel_path", "initrd_path",
2619                        "hvm_boot_order", "hvm_acpi", "hvm_pae",
2620                        "hvm_cdrom_image_path", "hvm_nic_type",
2621                        "hvm_disk_type", "vnc_bind_address"):
2622           val = getattr(instance, field, None)
2623           if val is not None:
2624             pass
2625           elif field in ("hvm_nic_type", "hvm_disk_type",
2626                          "kernel_path", "initrd_path"):
2627             val = "default"
2628           else:
2629             val = "-"
2630         else:
2631           raise errors.ParameterError(field)
2632         iout.append(val)
2633       output.append(iout)
2634
2635     return output
2636
2637
2638 class LUFailoverInstance(LogicalUnit):
2639   """Failover an instance.
2640
2641   """
2642   HPATH = "instance-failover"
2643   HTYPE = constants.HTYPE_INSTANCE
2644   _OP_REQP = ["instance_name", "ignore_consistency"]
2645   REQ_BGL = False
2646
2647   def ExpandNames(self):
2648     self._ExpandAndLockInstance()
2649     self.needed_locks[locking.LEVEL_NODE] = []
2650     self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2651
2652   def DeclareLocks(self, level):
2653     if level == locking.LEVEL_NODE:
2654       self._LockInstancesNodes()
2655
2656   def BuildHooksEnv(self):
2657     """Build hooks env.
2658
2659     This runs on master, primary and secondary nodes of the instance.
2660
2661     """
2662     env = {
2663       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2664       }
2665     env.update(_BuildInstanceHookEnvByObject(self.instance))
2666     nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2667     return env, nl, nl
2668
2669   def CheckPrereq(self):
2670     """Check prerequisites.
2671
2672     This checks that the instance is in the cluster.
2673
2674     """
2675     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2676     assert self.instance is not None, \
2677       "Cannot retrieve locked instance %s" % self.op.instance_name
2678
2679     if instance.disk_template not in constants.DTS_NET_MIRROR:
2680       raise errors.OpPrereqError("Instance's disk layout is not"
2681                                  " network mirrored, cannot failover.")
2682
2683     secondary_nodes = instance.secondary_nodes
2684     if not secondary_nodes:
2685       raise errors.ProgrammerError("no secondary node but using "
2686                                    "a mirrored disk template")
2687
2688     target_node = secondary_nodes[0]
2689     # check memory requirements on the secondary node
2690     _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2691                          instance.name, instance.memory)
2692
2693     # check bridge existance
2694     brlist = [nic.bridge for nic in instance.nics]
2695     if not rpc.call_bridges_exist(target_node, brlist):
2696       raise errors.OpPrereqError("One or more target bridges %s does not"
2697                                  " exist on destination node '%s'" %
2698                                  (brlist, target_node))
2699
2700   def Exec(self, feedback_fn):
2701     """Failover an instance.
2702
2703     The failover is done by shutting it down on its present node and
2704     starting it on the secondary.
2705
2706     """
2707     instance = self.instance
2708
2709     source_node = instance.primary_node
2710     target_node = instance.secondary_nodes[0]
2711
2712     feedback_fn("* checking disk consistency between source and target")
2713     for dev in instance.disks:
2714       # for drbd, these are drbd over lvm
2715       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2716         if instance.status == "up" and not self.op.ignore_consistency:
2717           raise errors.OpExecError("Disk %s is degraded on target node,"
2718                                    " aborting failover." % dev.iv_name)
2719
2720     feedback_fn("* shutting down instance on source node")
2721     logger.Info("Shutting down instance %s on node %s" %
2722                 (instance.name, source_node))
2723
2724     if not rpc.call_instance_shutdown(source_node, instance):
2725       if self.op.ignore_consistency:
2726         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2727                      " anyway. Please make sure node %s is down"  %
2728                      (instance.name, source_node, source_node))
2729       else:
2730         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2731                                  (instance.name, source_node))
2732
2733     feedback_fn("* deactivating the instance's disks on source node")
2734     if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2735       raise errors.OpExecError("Can't shut down the instance's disks.")
2736
2737     instance.primary_node = target_node
2738     # distribute new instance config to the other nodes
2739     self.cfg.Update(instance)
2740
2741     # Only start the instance if it's marked as up
2742     if instance.status == "up":
2743       feedback_fn("* activating the instance's disks on target node")
2744       logger.Info("Starting instance %s on node %s" %
2745                   (instance.name, target_node))
2746
2747       disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2748                                                ignore_secondaries=True)
2749       if not disks_ok:
2750         _ShutdownInstanceDisks(instance, self.cfg)
2751         raise errors.OpExecError("Can't activate the instance's disks")
2752
2753       feedback_fn("* starting the instance on the target node")
2754       if not rpc.call_instance_start(target_node, instance, None):
2755         _ShutdownInstanceDisks(instance, self.cfg)
2756         raise errors.OpExecError("Could not start instance %s on node %s." %
2757                                  (instance.name, target_node))
2758
2759
2760 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2761   """Create a tree of block devices on the primary node.
2762
2763   This always creates all devices.
2764
2765   """
2766   if device.children:
2767     for child in device.children:
2768       if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2769         return False
2770
2771   cfg.SetDiskID(device, node)
2772   new_id = rpc.call_blockdev_create(node, device, device.size,
2773                                     instance.name, True, info)
2774   if not new_id:
2775     return False
2776   if device.physical_id is None:
2777     device.physical_id = new_id
2778   return True
2779
2780
2781 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2782   """Create a tree of block devices on a secondary node.
2783
2784   If this device type has to be created on secondaries, create it and
2785   all its children.
2786
2787   If not, just recurse to children keeping the same 'force' value.
2788
2789   """
2790   if device.CreateOnSecondary():
2791     force = True
2792   if device.children:
2793     for child in device.children:
2794       if not _CreateBlockDevOnSecondary(cfg, node, instance,
2795                                         child, force, info):
2796         return False
2797
2798   if not force:
2799     return True
2800   cfg.SetDiskID(device, node)
2801   new_id = rpc.call_blockdev_create(node, device, device.size,
2802                                     instance.name, False, info)
2803   if not new_id:
2804     return False
2805   if device.physical_id is None:
2806     device.physical_id = new_id
2807   return True
2808
2809
2810 def _GenerateUniqueNames(cfg, exts):
2811   """Generate a suitable LV name.
2812
2813   This will generate a logical volume name for the given instance.
2814
2815   """
2816   results = []
2817   for val in exts:
2818     new_id = cfg.GenerateUniqueID()
2819     results.append("%s%s" % (new_id, val))
2820   return results
2821
2822
2823 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2824   """Generate a drbd8 device complete with its children.
2825
2826   """
2827   port = cfg.AllocatePort()
2828   vgname = cfg.GetVGName()
2829   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2830                           logical_id=(vgname, names[0]))
2831   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2832                           logical_id=(vgname, names[1]))
2833   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2834                           logical_id = (primary, secondary, port),
2835                           children = [dev_data, dev_meta],
2836                           iv_name=iv_name)
2837   return drbd_dev
2838
2839
2840 def _GenerateDiskTemplate(cfg, template_name,
2841                           instance_name, primary_node,
2842                           secondary_nodes, disk_sz, swap_sz,
2843                           file_storage_dir, file_driver):
2844   """Generate the entire disk layout for a given template type.
2845
2846   """
2847   #TODO: compute space requirements
2848
2849   vgname = cfg.GetVGName()
2850   if template_name == constants.DT_DISKLESS:
2851     disks = []
2852   elif template_name == constants.DT_PLAIN:
2853     if len(secondary_nodes) != 0:
2854       raise errors.ProgrammerError("Wrong template configuration")
2855
2856     names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2857     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2858                            logical_id=(vgname, names[0]),
2859                            iv_name = "sda")
2860     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2861                            logical_id=(vgname, names[1]),
2862                            iv_name = "sdb")
2863     disks = [sda_dev, sdb_dev]
2864   elif template_name == constants.DT_DRBD8:
2865     if len(secondary_nodes) != 1:
2866       raise errors.ProgrammerError("Wrong template configuration")
2867     remote_node = secondary_nodes[0]
2868     names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2869                                        ".sdb_data", ".sdb_meta"])
2870     drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2871                                          disk_sz, names[0:2], "sda")
2872     drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2873                                          swap_sz, names[2:4], "sdb")
2874     disks = [drbd_sda_dev, drbd_sdb_dev]
2875   elif template_name == constants.DT_FILE:
2876     if len(secondary_nodes) != 0:
2877       raise errors.ProgrammerError("Wrong template configuration")
2878
2879     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2880                                 iv_name="sda", logical_id=(file_driver,
2881                                 "%s/sda" % file_storage_dir))
2882     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2883                                 iv_name="sdb", logical_id=(file_driver,
2884                                 "%s/sdb" % file_storage_dir))
2885     disks = [file_sda_dev, file_sdb_dev]
2886   else:
2887     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2888   return disks
2889
2890
2891 def _GetInstanceInfoText(instance):
2892   """Compute that text that should be added to the disk's metadata.
2893
2894   """
2895   return "originstname+%s" % instance.name
2896
2897
2898 def _CreateDisks(cfg, instance):
2899   """Create all disks for an instance.
2900
2901   This abstracts away some work from AddInstance.
2902
2903   Args:
2904     instance: the instance object
2905
2906   Returns:
2907     True or False showing the success of the creation process
2908
2909   """
2910   info = _GetInstanceInfoText(instance)
2911
2912   if instance.disk_template == constants.DT_FILE:
2913     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2914     result = rpc.call_file_storage_dir_create(instance.primary_node,
2915                                               file_storage_dir)
2916
2917     if not result:
2918       logger.Error("Could not connect to node '%s'" % instance.primary_node)
2919       return False
2920
2921     if not result[0]:
2922       logger.Error("failed to create directory '%s'" % file_storage_dir)
2923       return False
2924
2925   for device in instance.disks:
2926     logger.Info("creating volume %s for instance %s" %
2927                 (device.iv_name, instance.name))
2928     #HARDCODE
2929     for secondary_node in instance.secondary_nodes:
2930       if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2931                                         device, False, info):
2932         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2933                      (device.iv_name, device, secondary_node))
2934         return False
2935     #HARDCODE
2936     if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2937                                     instance, device, info):
2938       logger.Error("failed to create volume %s on primary!" %
2939                    device.iv_name)
2940       return False
2941
2942   return True
2943
2944
2945 def _RemoveDisks(instance, cfg):
2946   """Remove all disks for an instance.
2947
2948   This abstracts away some work from `AddInstance()` and
2949   `RemoveInstance()`. Note that in case some of the devices couldn't
2950   be removed, the removal will continue with the other ones (compare
2951   with `_CreateDisks()`).
2952
2953   Args:
2954     instance: the instance object
2955
2956   Returns:
2957     True or False showing the success of the removal proces
2958
2959   """
2960   logger.Info("removing block devices for instance %s" % instance.name)
2961
2962   result = True
2963   for device in instance.disks:
2964     for node, disk in device.ComputeNodeTree(instance.primary_node):
2965       cfg.SetDiskID(disk, node)
2966       if not rpc.call_blockdev_remove(node, disk):
2967         logger.Error("could not remove block device %s on node %s,"
2968                      " continuing anyway" %
2969                      (device.iv_name, node))
2970         result = False
2971
2972   if instance.disk_template == constants.DT_FILE:
2973     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2974     if not rpc.call_file_storage_dir_remove(instance.primary_node,
2975                                             file_storage_dir):
2976       logger.Error("could not remove directory '%s'" % file_storage_dir)
2977       result = False
2978
2979   return result
2980
2981
2982 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2983   """Compute disk size requirements in the volume group
2984
2985   This is currently hard-coded for the two-drive layout.
2986
2987   """
2988   # Required free disk space as a function of disk and swap space
2989   req_size_dict = {
2990     constants.DT_DISKLESS: None,
2991     constants.DT_PLAIN: disk_size + swap_size,
2992     # 256 MB are added for drbd metadata, 128MB for each drbd device
2993     constants.DT_DRBD8: disk_size + swap_size + 256,
2994     constants.DT_FILE: None,
2995   }
2996
2997   if disk_template not in req_size_dict:
2998     raise errors.ProgrammerError("Disk template '%s' size requirement"
2999                                  " is unknown" %  disk_template)
3000
3001   return req_size_dict[disk_template]
3002
3003
3004 class LUCreateInstance(LogicalUnit):
3005   """Create an instance.
3006
3007   """
3008   HPATH = "instance-add"
3009   HTYPE = constants.HTYPE_INSTANCE
3010   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3011               "disk_template", "swap_size", "mode", "start", "vcpus",
3012               "wait_for_sync", "ip_check", "mac"]
3013
3014   def _RunAllocator(self):
3015     """Run the allocator based on input opcode.
3016
3017     """
3018     disks = [{"size": self.op.disk_size, "mode": "w"},
3019              {"size": self.op.swap_size, "mode": "w"}]
3020     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3021              "bridge": self.op.bridge}]
3022     ial = IAllocator(self.cfg, self.sstore,
3023                      mode=constants.IALLOCATOR_MODE_ALLOC,
3024                      name=self.op.instance_name,
3025                      disk_template=self.op.disk_template,
3026                      tags=[],
3027                      os=self.op.os_type,
3028                      vcpus=self.op.vcpus,
3029                      mem_size=self.op.mem_size,
3030                      disks=disks,
3031                      nics=nics,
3032                      )
3033
3034     ial.Run(self.op.iallocator)
3035
3036     if not ial.success:
3037       raise errors.OpPrereqError("Can't compute nodes using"
3038                                  " iallocator '%s': %s" % (self.op.iallocator,
3039                                                            ial.info))
3040     if len(ial.nodes) != ial.required_nodes:
3041       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3042                                  " of nodes (%s), required %s" %
3043                                  (len(ial.nodes), ial.required_nodes))
3044     self.op.pnode = ial.nodes[0]
3045     logger.ToStdout("Selected nodes for the instance: %s" %
3046                     (", ".join(ial.nodes),))
3047     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3048                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3049     if ial.required_nodes == 2:
3050       self.op.snode = ial.nodes[1]
3051
3052   def BuildHooksEnv(self):
3053     """Build hooks env.
3054
3055     This runs on master, primary and secondary nodes of the instance.
3056
3057     """
3058     env = {
3059       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3060       "INSTANCE_DISK_SIZE": self.op.disk_size,
3061       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3062       "INSTANCE_ADD_MODE": self.op.mode,
3063       }
3064     if self.op.mode == constants.INSTANCE_IMPORT:
3065       env["INSTANCE_SRC_NODE"] = self.op.src_node
3066       env["INSTANCE_SRC_PATH"] = self.op.src_path
3067       env["INSTANCE_SRC_IMAGE"] = self.src_image
3068
3069     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3070       primary_node=self.op.pnode,
3071       secondary_nodes=self.secondaries,
3072       status=self.instance_status,
3073       os_type=self.op.os_type,
3074       memory=self.op.mem_size,
3075       vcpus=self.op.vcpus,
3076       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3077     ))
3078
3079     nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3080           self.secondaries)
3081     return env, nl, nl
3082
3083
3084   def CheckPrereq(self):
3085     """Check prerequisites.
3086
3087     """
3088     # set optional parameters to none if they don't exist
3089     for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3090                  "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3091                  "hvm_nic_type", "hvm_disk_type", "vnc_bind_address"]:
3092       if not hasattr(self.op, attr):
3093         setattr(self.op, attr, None)
3094
3095     if self.op.mode not in (constants.INSTANCE_CREATE,
3096                             constants.INSTANCE_IMPORT):
3097       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3098                                  self.op.mode)
3099
3100     if (not self.cfg.GetVGName() and
3101         self.op.disk_template not in constants.DTS_NOT_LVM):
3102       raise errors.OpPrereqError("Cluster does not support lvm-based"
3103                                  " instances")
3104
3105     if self.op.mode == constants.INSTANCE_IMPORT:
3106       src_node = getattr(self.op, "src_node", None)
3107       src_path = getattr(self.op, "src_path", None)
3108       if src_node is None or src_path is None:
3109         raise errors.OpPrereqError("Importing an instance requires source"
3110                                    " node and path options")
3111       src_node_full = self.cfg.ExpandNodeName(src_node)
3112       if src_node_full is None:
3113         raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3114       self.op.src_node = src_node = src_node_full
3115
3116       if not os.path.isabs(src_path):
3117         raise errors.OpPrereqError("The source path must be absolute")
3118
3119       export_info = rpc.call_export_info(src_node, src_path)
3120
3121       if not export_info:
3122         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3123
3124       if not export_info.has_section(constants.INISECT_EXP):
3125         raise errors.ProgrammerError("Corrupted export config")
3126
3127       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3128       if (int(ei_version) != constants.EXPORT_VERSION):
3129         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3130                                    (ei_version, constants.EXPORT_VERSION))
3131
3132       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3133         raise errors.OpPrereqError("Can't import instance with more than"
3134                                    " one data disk")
3135
3136       # FIXME: are the old os-es, disk sizes, etc. useful?
3137       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3138       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3139                                                          'disk0_dump'))
3140       self.src_image = diskimage
3141     else: # INSTANCE_CREATE
3142       if getattr(self.op, "os_type", None) is None:
3143         raise errors.OpPrereqError("No guest OS specified")
3144
3145     #### instance parameters check
3146
3147     # disk template and mirror node verification
3148     if self.op.disk_template not in constants.DISK_TEMPLATES:
3149       raise errors.OpPrereqError("Invalid disk template name")
3150
3151     # instance name verification
3152     hostname1 = utils.HostInfo(self.op.instance_name)
3153
3154     self.op.instance_name = instance_name = hostname1.name
3155     instance_list = self.cfg.GetInstanceList()
3156     if instance_name in instance_list:
3157       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3158                                  instance_name)
3159
3160     # ip validity checks
3161     ip = getattr(self.op, "ip", None)
3162     if ip is None or ip.lower() == "none":
3163       inst_ip = None
3164     elif ip.lower() == "auto":
3165       inst_ip = hostname1.ip
3166     else:
3167       if not utils.IsValidIP(ip):
3168         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3169                                    " like a valid IP" % ip)
3170       inst_ip = ip
3171     self.inst_ip = self.op.ip = inst_ip
3172
3173     if self.op.start and not self.op.ip_check:
3174       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3175                                  " adding an instance in start mode")
3176
3177     if self.op.ip_check:
3178       if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3179         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3180                                    (hostname1.ip, instance_name))
3181
3182     # MAC address verification
3183     if self.op.mac != "auto":
3184       if not utils.IsValidMac(self.op.mac.lower()):
3185         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3186                                    self.op.mac)
3187
3188     # bridge verification
3189     bridge = getattr(self.op, "bridge", None)
3190     if bridge is None:
3191       self.op.bridge = self.cfg.GetDefBridge()
3192     else:
3193       self.op.bridge = bridge
3194
3195     # boot order verification
3196     if self.op.hvm_boot_order is not None:
3197       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3198         raise errors.OpPrereqError("invalid boot order specified,"
3199                                    " must be one or more of [acdn]")
3200     # file storage checks
3201     if (self.op.file_driver and
3202         not self.op.file_driver in constants.FILE_DRIVER):
3203       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3204                                  self.op.file_driver)
3205
3206     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3207       raise errors.OpPrereqError("File storage directory not a relative"
3208                                  " path")
3209     #### allocator run
3210
3211     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3212       raise errors.OpPrereqError("One and only one of iallocator and primary"
3213                                  " node must be given")
3214
3215     if self.op.iallocator is not None:
3216       self._RunAllocator()
3217
3218     #### node related checks
3219
3220     # check primary node
3221     pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3222     if pnode is None:
3223       raise errors.OpPrereqError("Primary node '%s' is unknown" %
3224                                  self.op.pnode)
3225     self.op.pnode = pnode.name
3226     self.pnode = pnode
3227     self.secondaries = []
3228
3229     # mirror node verification
3230     if self.op.disk_template in constants.DTS_NET_MIRROR:
3231       if getattr(self.op, "snode", None) is None:
3232         raise errors.OpPrereqError("The networked disk templates need"
3233                                    " a mirror node")
3234
3235       snode_name = self.cfg.ExpandNodeName(self.op.snode)
3236       if snode_name is None:
3237         raise errors.OpPrereqError("Unknown secondary node '%s'" %
3238                                    self.op.snode)
3239       elif snode_name == pnode.name:
3240         raise errors.OpPrereqError("The secondary node cannot be"
3241                                    " the primary node.")
3242       self.secondaries.append(snode_name)
3243
3244     req_size = _ComputeDiskSize(self.op.disk_template,
3245                                 self.op.disk_size, self.op.swap_size)
3246
3247     # Check lv size requirements
3248     if req_size is not None:
3249       nodenames = [pnode.name] + self.secondaries
3250       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3251       for node in nodenames:
3252         info = nodeinfo.get(node, None)
3253         if not info:
3254           raise errors.OpPrereqError("Cannot get current information"
3255                                      " from node '%s'" % node)
3256         vg_free = info.get('vg_free', None)
3257         if not isinstance(vg_free, int):
3258           raise errors.OpPrereqError("Can't compute free disk space on"
3259                                      " node %s" % node)
3260         if req_size > info['vg_free']:
3261           raise errors.OpPrereqError("Not enough disk space on target node %s."
3262                                      " %d MB available, %d MB required" %
3263                                      (node, info['vg_free'], req_size))
3264
3265     # os verification
3266     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3267     if not os_obj:
3268       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3269                                  " primary node"  % self.op.os_type)
3270
3271     if self.op.kernel_path == constants.VALUE_NONE:
3272       raise errors.OpPrereqError("Can't set instance kernel to none")
3273
3274
3275     # bridge check on primary node
3276     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3277       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3278                                  " destination node '%s'" %
3279                                  (self.op.bridge, pnode.name))
3280
3281     # memory check on primary node
3282     if self.op.start:
3283       _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3284                            "creating instance %s" % self.op.instance_name,
3285                            self.op.mem_size)
3286
3287     # hvm_cdrom_image_path verification
3288     if self.op.hvm_cdrom_image_path is not None:
3289       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3290         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3291                                    " be an absolute path or None, not %s" %
3292                                    self.op.hvm_cdrom_image_path)
3293       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3294         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3295                                    " regular file or a symlink pointing to"
3296                                    " an existing regular file, not %s" %
3297                                    self.op.hvm_cdrom_image_path)
3298
3299     # vnc_bind_address verification
3300     if self.op.vnc_bind_address is not None:
3301       if not utils.IsValidIP(self.op.vnc_bind_address):
3302         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3303                                    " like a valid IP address" %
3304                                    self.op.vnc_bind_address)
3305
3306     # Xen HVM device type checks
3307     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
3308       if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3309         raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3310                                    " hypervisor" % self.op.hvm_nic_type)
3311       if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3312         raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3313                                    " hypervisor" % self.op.hvm_disk_type)
3314
3315     if self.op.start:
3316       self.instance_status = 'up'
3317     else:
3318       self.instance_status = 'down'
3319
3320   def Exec(self, feedback_fn):
3321     """Create and add the instance to the cluster.
3322
3323     """
3324     instance = self.op.instance_name
3325     pnode_name = self.pnode.name
3326
3327     if self.op.mac == "auto":
3328       mac_address = self.cfg.GenerateMAC()
3329     else:
3330       mac_address = self.op.mac
3331
3332     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3333     if self.inst_ip is not None:
3334       nic.ip = self.inst_ip
3335
3336     ht_kind = self.sstore.GetHypervisorType()
3337     if ht_kind in constants.HTS_REQ_PORT:
3338       network_port = self.cfg.AllocatePort()
3339     else:
3340       network_port = None
3341
3342     if self.op.vnc_bind_address is None:
3343       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3344
3345     # this is needed because os.path.join does not accept None arguments
3346     if self.op.file_storage_dir is None:
3347       string_file_storage_dir = ""
3348     else:
3349       string_file_storage_dir = self.op.file_storage_dir
3350
3351     # build the full file storage dir path
3352     file_storage_dir = os.path.normpath(os.path.join(
3353                                         self.sstore.GetFileStorageDir(),
3354                                         string_file_storage_dir, instance))
3355
3356
3357     disks = _GenerateDiskTemplate(self.cfg,
3358                                   self.op.disk_template,
3359                                   instance, pnode_name,
3360                                   self.secondaries, self.op.disk_size,
3361                                   self.op.swap_size,
3362                                   file_storage_dir,
3363                                   self.op.file_driver)
3364
3365     iobj = objects.Instance(name=instance, os=self.op.os_type,
3366                             primary_node=pnode_name,
3367                             memory=self.op.mem_size,
3368                             vcpus=self.op.vcpus,
3369                             nics=[nic], disks=disks,
3370                             disk_template=self.op.disk_template,
3371                             status=self.instance_status,
3372                             network_port=network_port,
3373                             kernel_path=self.op.kernel_path,
3374                             initrd_path=self.op.initrd_path,
3375                             hvm_boot_order=self.op.hvm_boot_order,
3376                             hvm_acpi=self.op.hvm_acpi,
3377                             hvm_pae=self.op.hvm_pae,
3378                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3379                             vnc_bind_address=self.op.vnc_bind_address,
3380                             hvm_nic_type=self.op.hvm_nic_type,
3381                             hvm_disk_type=self.op.hvm_disk_type,
3382                             )
3383
3384     feedback_fn("* creating instance disks...")
3385     if not _CreateDisks(self.cfg, iobj):
3386       _RemoveDisks(iobj, self.cfg)
3387       raise errors.OpExecError("Device creation failed, reverting...")
3388
3389     feedback_fn("adding instance %s to cluster config" % instance)
3390
3391     self.cfg.AddInstance(iobj)
3392     # Add the new instance to the Ganeti Lock Manager
3393     self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3394
3395     if self.op.wait_for_sync:
3396       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3397     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3398       # make sure the disks are not degraded (still sync-ing is ok)
3399       time.sleep(15)
3400       feedback_fn("* checking mirrors status")
3401       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3402     else:
3403       disk_abort = False
3404
3405     if disk_abort:
3406       _RemoveDisks(iobj, self.cfg)
3407       self.cfg.RemoveInstance(iobj.name)
3408       # Remove the new instance from the Ganeti Lock Manager
3409       self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3410       raise errors.OpExecError("There are some degraded disks for"
3411                                " this instance")
3412
3413     feedback_fn("creating os for instance %s on node %s" %
3414                 (instance, pnode_name))
3415
3416     if iobj.disk_template != constants.DT_DISKLESS:
3417       if self.op.mode == constants.INSTANCE_CREATE:
3418         feedback_fn("* running the instance OS create scripts...")
3419         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3420           raise errors.OpExecError("could not add os for instance %s"
3421                                    " on node %s" %
3422                                    (instance, pnode_name))
3423
3424       elif self.op.mode == constants.INSTANCE_IMPORT:
3425         feedback_fn("* running the instance OS import scripts...")
3426         src_node = self.op.src_node
3427         src_image = self.src_image
3428         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3429                                                 src_node, src_image):
3430           raise errors.OpExecError("Could not import os for instance"
3431                                    " %s on node %s" %
3432                                    (instance, pnode_name))
3433       else:
3434         # also checked in the prereq part
3435         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3436                                      % self.op.mode)
3437
3438     if self.op.start:
3439       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3440       feedback_fn("* starting instance...")
3441       if not rpc.call_instance_start(pnode_name, iobj, None):
3442         raise errors.OpExecError("Could not start instance")
3443
3444
3445 class LUConnectConsole(NoHooksLU):
3446   """Connect to an instance's console.
3447
3448   This is somewhat special in that it returns the command line that
3449   you need to run on the master node in order to connect to the
3450   console.
3451
3452   """
3453   _OP_REQP = ["instance_name"]
3454   REQ_BGL = False
3455
3456   def ExpandNames(self):
3457     self._ExpandAndLockInstance()
3458
3459   def CheckPrereq(self):
3460     """Check prerequisites.
3461
3462     This checks that the instance is in the cluster.
3463
3464     """
3465     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3466     assert self.instance is not None, \
3467       "Cannot retrieve locked instance %s" % self.op.instance_name
3468
3469   def Exec(self, feedback_fn):
3470     """Connect to the console of an instance
3471
3472     """
3473     instance = self.instance
3474     node = instance.primary_node
3475
3476     node_insts = rpc.call_instance_list([node])[node]
3477     if node_insts is False:
3478       raise errors.OpExecError("Can't connect to node %s." % node)
3479
3480     if instance.name not in node_insts:
3481       raise errors.OpExecError("Instance %s is not running." % instance.name)
3482
3483     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3484
3485     hyper = hypervisor.GetHypervisor()
3486     console_cmd = hyper.GetShellCommandForConsole(instance)
3487
3488     # build ssh cmdline
3489     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3490
3491
3492 class LUReplaceDisks(LogicalUnit):
3493   """Replace the disks of an instance.
3494
3495   """
3496   HPATH = "mirrors-replace"
3497   HTYPE = constants.HTYPE_INSTANCE
3498   _OP_REQP = ["instance_name", "mode", "disks"]
3499
3500   def _RunAllocator(self):
3501     """Compute a new secondary node using an IAllocator.
3502
3503     """
3504     ial = IAllocator(self.cfg, self.sstore,
3505                      mode=constants.IALLOCATOR_MODE_RELOC,
3506                      name=self.op.instance_name,
3507                      relocate_from=[self.sec_node])
3508
3509     ial.Run(self.op.iallocator)
3510
3511     if not ial.success:
3512       raise errors.OpPrereqError("Can't compute nodes using"
3513                                  " iallocator '%s': %s" % (self.op.iallocator,
3514                                                            ial.info))
3515     if len(ial.nodes) != ial.required_nodes:
3516       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3517                                  " of nodes (%s), required %s" %
3518                                  (len(ial.nodes), ial.required_nodes))
3519     self.op.remote_node = ial.nodes[0]
3520     logger.ToStdout("Selected new secondary for the instance: %s" %
3521                     self.op.remote_node)
3522
3523   def BuildHooksEnv(self):
3524     """Build hooks env.
3525
3526     This runs on the master, the primary and all the secondaries.
3527
3528     """
3529     env = {
3530       "MODE": self.op.mode,
3531       "NEW_SECONDARY": self.op.remote_node,
3532       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3533       }
3534     env.update(_BuildInstanceHookEnvByObject(self.instance))
3535     nl = [
3536       self.sstore.GetMasterNode(),
3537       self.instance.primary_node,
3538       ]
3539     if self.op.remote_node is not None:
3540       nl.append(self.op.remote_node)
3541     return env, nl, nl
3542
3543   def CheckPrereq(self):
3544     """Check prerequisites.
3545
3546     This checks that the instance is in the cluster.
3547
3548     """
3549     if not hasattr(self.op, "remote_node"):
3550       self.op.remote_node = None
3551
3552     instance = self.cfg.GetInstanceInfo(
3553       self.cfg.ExpandInstanceName(self.op.instance_name))
3554     if instance is None:
3555       raise errors.OpPrereqError("Instance '%s' not known" %
3556                                  self.op.instance_name)
3557     self.instance = instance
3558     self.op.instance_name = instance.name
3559
3560     if instance.disk_template not in constants.DTS_NET_MIRROR:
3561       raise errors.OpPrereqError("Instance's disk layout is not"
3562                                  " network mirrored.")
3563
3564     if len(instance.secondary_nodes) != 1:
3565       raise errors.OpPrereqError("The instance has a strange layout,"
3566                                  " expected one secondary but found %d" %
3567                                  len(instance.secondary_nodes))
3568
3569     self.sec_node = instance.secondary_nodes[0]
3570
3571     ia_name = getattr(self.op, "iallocator", None)
3572     if ia_name is not None:
3573       if self.op.remote_node is not None:
3574         raise errors.OpPrereqError("Give either the iallocator or the new"
3575                                    " secondary, not both")
3576       self.op.remote_node = self._RunAllocator()
3577
3578     remote_node = self.op.remote_node
3579     if remote_node is not None:
3580       remote_node = self.cfg.ExpandNodeName(remote_node)
3581       if remote_node is None:
3582         raise errors.OpPrereqError("Node '%s' not known" %
3583                                    self.op.remote_node)
3584       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3585     else:
3586       self.remote_node_info = None
3587     if remote_node == instance.primary_node:
3588       raise errors.OpPrereqError("The specified node is the primary node of"
3589                                  " the instance.")
3590     elif remote_node == self.sec_node:
3591       if self.op.mode == constants.REPLACE_DISK_SEC:
3592         # this is for DRBD8, where we can't execute the same mode of
3593         # replacement as for drbd7 (no different port allocated)
3594         raise errors.OpPrereqError("Same secondary given, cannot execute"
3595                                    " replacement")
3596     if instance.disk_template == constants.DT_DRBD8:
3597       if (self.op.mode == constants.REPLACE_DISK_ALL and
3598           remote_node is not None):
3599         # switch to replace secondary mode
3600         self.op.mode = constants.REPLACE_DISK_SEC
3601
3602       if self.op.mode == constants.REPLACE_DISK_ALL:
3603         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3604                                    " secondary disk replacement, not"
3605                                    " both at once")
3606       elif self.op.mode == constants.REPLACE_DISK_PRI:
3607         if remote_node is not None:
3608           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3609                                      " the secondary while doing a primary"
3610                                      " node disk replacement")
3611         self.tgt_node = instance.primary_node
3612         self.oth_node = instance.secondary_nodes[0]
3613       elif self.op.mode == constants.REPLACE_DISK_SEC:
3614         self.new_node = remote_node # this can be None, in which case
3615                                     # we don't change the secondary
3616         self.tgt_node = instance.secondary_nodes[0]
3617         self.oth_node = instance.primary_node
3618       else:
3619         raise errors.ProgrammerError("Unhandled disk replace mode")
3620
3621     for name in self.op.disks:
3622       if instance.FindDisk(name) is None:
3623         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3624                                    (name, instance.name))
3625     self.op.remote_node = remote_node
3626
3627   def _ExecD8DiskOnly(self, feedback_fn):
3628     """Replace a disk on the primary or secondary for dbrd8.
3629
3630     The algorithm for replace is quite complicated:
3631       - for each disk to be replaced:
3632         - create new LVs on the target node with unique names
3633         - detach old LVs from the drbd device
3634         - rename old LVs to name_replaced.<time_t>
3635         - rename new LVs to old LVs
3636         - attach the new LVs (with the old names now) to the drbd device
3637       - wait for sync across all devices
3638       - for each modified disk:
3639         - remove old LVs (which have the name name_replaces.<time_t>)
3640
3641     Failures are not very well handled.
3642
3643     """
3644     steps_total = 6
3645     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3646     instance = self.instance
3647     iv_names = {}
3648     vgname = self.cfg.GetVGName()
3649     # start of work
3650     cfg = self.cfg
3651     tgt_node = self.tgt_node
3652     oth_node = self.oth_node
3653
3654     # Step: check device activation
3655     self.proc.LogStep(1, steps_total, "check device existence")
3656     info("checking volume groups")
3657     my_vg = cfg.GetVGName()
3658     results = rpc.call_vg_list([oth_node, tgt_node])
3659     if not results:
3660       raise errors.OpExecError("Can't list volume groups on the nodes")
3661     for node in oth_node, tgt_node:
3662       res = results.get(node, False)
3663       if not res or my_vg not in res:
3664         raise errors.OpExecError("Volume group '%s' not found on %s" %
3665                                  (my_vg, node))
3666     for dev in instance.disks:
3667       if not dev.iv_name in self.op.disks:
3668         continue
3669       for node in tgt_node, oth_node:
3670         info("checking %s on %s" % (dev.iv_name, node))
3671         cfg.SetDiskID(dev, node)
3672         if not rpc.call_blockdev_find(node, dev):
3673           raise errors.OpExecError("Can't find device %s on node %s" %
3674                                    (dev.iv_name, node))
3675
3676     # Step: check other node consistency
3677     self.proc.LogStep(2, steps_total, "check peer consistency")
3678     for dev in instance.disks:
3679       if not dev.iv_name in self.op.disks:
3680         continue
3681       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3682       if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3683                                    oth_node==instance.primary_node):
3684         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3685                                  " to replace disks on this node (%s)" %
3686                                  (oth_node, tgt_node))
3687
3688     # Step: create new storage
3689     self.proc.LogStep(3, steps_total, "allocate new storage")
3690     for dev in instance.disks:
3691       if not dev.iv_name in self.op.disks:
3692         continue
3693       size = dev.size
3694       cfg.SetDiskID(dev, tgt_node)
3695       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3696       names = _GenerateUniqueNames(cfg, lv_names)
3697       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3698                              logical_id=(vgname, names[0]))
3699       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3700                              logical_id=(vgname, names[1]))
3701       new_lvs = [lv_data, lv_meta]
3702       old_lvs = dev.children
3703       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3704       info("creating new local storage on %s for %s" %
3705            (tgt_node, dev.iv_name))
3706       # since we *always* want to create this LV, we use the
3707       # _Create...OnPrimary (which forces the creation), even if we
3708       # are talking about the secondary node
3709       for new_lv in new_lvs:
3710         if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3711                                         _GetInstanceInfoText(instance)):
3712           raise errors.OpExecError("Failed to create new LV named '%s' on"
3713                                    " node '%s'" %
3714                                    (new_lv.logical_id[1], tgt_node))
3715
3716     # Step: for each lv, detach+rename*2+attach
3717     self.proc.LogStep(4, steps_total, "change drbd configuration")
3718     for dev, old_lvs, new_lvs in iv_names.itervalues():
3719       info("detaching %s drbd from local storage" % dev.iv_name)
3720       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3721         raise errors.OpExecError("Can't detach drbd from local storage on node"
3722                                  " %s for device %s" % (tgt_node, dev.iv_name))
3723       #dev.children = []
3724       #cfg.Update(instance)
3725
3726       # ok, we created the new LVs, so now we know we have the needed
3727       # storage; as such, we proceed on the target node to rename
3728       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3729       # using the assumption that logical_id == physical_id (which in
3730       # turn is the unique_id on that node)
3731
3732       # FIXME(iustin): use a better name for the replaced LVs
3733       temp_suffix = int(time.time())
3734       ren_fn = lambda d, suff: (d.physical_id[0],
3735                                 d.physical_id[1] + "_replaced-%s" % suff)
3736       # build the rename list based on what LVs exist on the node
3737       rlist = []
3738       for to_ren in old_lvs:
3739         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3740         if find_res is not None: # device exists
3741           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3742
3743       info("renaming the old LVs on the target node")
3744       if not rpc.call_blockdev_rename(tgt_node, rlist):
3745         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3746       # now we rename the new LVs to the old LVs
3747       info("renaming the new LVs on the target node")
3748       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3749       if not rpc.call_blockdev_rename(tgt_node, rlist):
3750         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3751
3752       for old, new in zip(old_lvs, new_lvs):
3753         new.logical_id = old.logical_id
3754         cfg.SetDiskID(new, tgt_node)
3755
3756       for disk in old_lvs:
3757         disk.logical_id = ren_fn(disk, temp_suffix)
3758         cfg.SetDiskID(disk, tgt_node)
3759
3760       # now that the new lvs have the old name, we can add them to the device
3761       info("adding new mirror component on %s" % tgt_node)
3762       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3763         for new_lv in new_lvs:
3764           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3765             warning("Can't rollback device %s", hint="manually cleanup unused"
3766                     " logical volumes")
3767         raise errors.OpExecError("Can't add local storage to drbd")
3768
3769       dev.children = new_lvs
3770       cfg.Update(instance)
3771
3772     # Step: wait for sync
3773
3774     # this can fail as the old devices are degraded and _WaitForSync
3775     # does a combined result over all disks, so we don't check its
3776     # return value
3777     self.proc.LogStep(5, steps_total, "sync devices")
3778     _WaitForSync(cfg, instance, self.proc, unlock=True)
3779
3780     # so check manually all the devices
3781     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3782       cfg.SetDiskID(dev, instance.primary_node)
3783       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3784       if is_degr:
3785         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3786
3787     # Step: remove old storage
3788     self.proc.LogStep(6, steps_total, "removing old storage")
3789     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3790       info("remove logical volumes for %s" % name)
3791       for lv in old_lvs:
3792         cfg.SetDiskID(lv, tgt_node)
3793         if not rpc.call_blockdev_remove(tgt_node, lv):
3794           warning("Can't remove old LV", hint="manually remove unused LVs")
3795           continue
3796
3797   def _ExecD8Secondary(self, feedback_fn):
3798     """Replace the secondary node for drbd8.
3799
3800     The algorithm for replace is quite complicated:
3801       - for all disks of the instance:
3802         - create new LVs on the new node with same names
3803         - shutdown the drbd device on the old secondary
3804         - disconnect the drbd network on the primary
3805         - create the drbd device on the new secondary
3806         - network attach the drbd on the primary, using an artifice:
3807           the drbd code for Attach() will connect to the network if it
3808           finds a device which is connected to the good local disks but
3809           not network enabled
3810       - wait for sync across all devices
3811       - remove all disks from the old secondary
3812
3813     Failures are not very well handled.
3814
3815     """
3816     steps_total = 6
3817     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3818     instance = self.instance
3819     iv_names = {}
3820     vgname = self.cfg.GetVGName()
3821     # start of work
3822     cfg = self.cfg
3823     old_node = self.tgt_node
3824     new_node = self.new_node
3825     pri_node = instance.primary_node
3826
3827     # Step: check device activation
3828     self.proc.LogStep(1, steps_total, "check device existence")
3829     info("checking volume groups")
3830     my_vg = cfg.GetVGName()
3831     results = rpc.call_vg_list([pri_node, new_node])
3832     if not results:
3833       raise errors.OpExecError("Can't list volume groups on the nodes")
3834     for node in pri_node, new_node:
3835       res = results.get(node, False)
3836       if not res or my_vg not in res:
3837         raise errors.OpExecError("Volume group '%s' not found on %s" %
3838                                  (my_vg, node))
3839     for dev in instance.disks:
3840       if not dev.iv_name in self.op.disks:
3841         continue
3842       info("checking %s on %s" % (dev.iv_name, pri_node))
3843       cfg.SetDiskID(dev, pri_node)
3844       if not rpc.call_blockdev_find(pri_node, dev):
3845         raise errors.OpExecError("Can't find device %s on node %s" %
3846                                  (dev.iv_name, pri_node))
3847
3848     # Step: check other node consistency
3849     self.proc.LogStep(2, steps_total, "check peer consistency")
3850     for dev in instance.disks:
3851       if not dev.iv_name in self.op.disks:
3852         continue
3853       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3854       if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3855         raise errors.OpExecError("Primary node (%s) has degraded storage,"
3856                                  " unsafe to replace the secondary" %
3857                                  pri_node)
3858
3859     # Step: create new storage
3860     self.proc.LogStep(3, steps_total, "allocate new storage")
3861     for dev in instance.disks:
3862       size = dev.size
3863       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3864       # since we *always* want to create this LV, we use the
3865       # _Create...OnPrimary (which forces the creation), even if we
3866       # are talking about the secondary node
3867       for new_lv in dev.children:
3868         if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3869                                         _GetInstanceInfoText(instance)):
3870           raise errors.OpExecError("Failed to create new LV named '%s' on"
3871                                    " node '%s'" %
3872                                    (new_lv.logical_id[1], new_node))
3873
3874       iv_names[dev.iv_name] = (dev, dev.children)
3875
3876     self.proc.LogStep(4, steps_total, "changing drbd configuration")
3877     for dev in instance.disks:
3878       size = dev.size
3879       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3880       # create new devices on new_node
3881       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3882                               logical_id=(pri_node, new_node,
3883                                           dev.logical_id[2]),
3884                               children=dev.children)
3885       if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3886                                         new_drbd, False,
3887                                       _GetInstanceInfoText(instance)):
3888         raise errors.OpExecError("Failed to create new DRBD on"
3889                                  " node '%s'" % new_node)
3890
3891     for dev in instance.disks:
3892       # we have new devices, shutdown the drbd on the old secondary
3893       info("shutting down drbd for %s on old node" % dev.iv_name)
3894       cfg.SetDiskID(dev, old_node)
3895       if not rpc.call_blockdev_shutdown(old_node, dev):
3896         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3897                 hint="Please cleanup this device manually as soon as possible")
3898
3899     info("detaching primary drbds from the network (=> standalone)")
3900     done = 0
3901     for dev in instance.disks:
3902       cfg.SetDiskID(dev, pri_node)
3903       # set the physical (unique in bdev terms) id to None, meaning
3904       # detach from network
3905       dev.physical_id = (None,) * len(dev.physical_id)
3906       # and 'find' the device, which will 'fix' it to match the
3907       # standalone state
3908       if rpc.call_blockdev_find(pri_node, dev):
3909         done += 1
3910       else:
3911         warning("Failed to detach drbd %s from network, unusual case" %
3912                 dev.iv_name)
3913
3914     if not done:
3915       # no detaches succeeded (very unlikely)
3916       raise errors.OpExecError("Can't detach at least one DRBD from old node")
3917
3918     # if we managed to detach at least one, we update all the disks of
3919     # the instance to point to the new secondary
3920     info("updating instance configuration")
3921     for dev in instance.disks:
3922       dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3923       cfg.SetDiskID(dev, pri_node)
3924     cfg.Update(instance)
3925
3926     # and now perform the drbd attach
3927     info("attaching primary drbds to new secondary (standalone => connected)")
3928     failures = []
3929     for dev in instance.disks:
3930       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3931       # since the attach is smart, it's enough to 'find' the device,
3932       # it will automatically activate the network, if the physical_id
3933       # is correct
3934       cfg.SetDiskID(dev, pri_node)
3935       if not rpc.call_blockdev_find(pri_node, dev):
3936         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3937                 "please do a gnt-instance info to see the status of disks")
3938
3939     # this can fail as the old devices are degraded and _WaitForSync
3940     # does a combined result over all disks, so we don't check its
3941     # return value
3942     self.proc.LogStep(5, steps_total, "sync devices")
3943     _WaitForSync(cfg, instance, self.proc, unlock=True)
3944
3945     # so check manually all the devices
3946     for name, (dev, old_lvs) in iv_names.iteritems():
3947       cfg.SetDiskID(dev, pri_node)
3948       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3949       if is_degr:
3950         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3951
3952     self.proc.LogStep(6, steps_total, "removing old storage")
3953     for name, (dev, old_lvs) in iv_names.iteritems():
3954       info("remove logical volumes for %s" % name)
3955       for lv in old_lvs:
3956         cfg.SetDiskID(lv, old_node)
3957         if not rpc.call_blockdev_remove(old_node, lv):
3958           warning("Can't remove LV on old secondary",
3959                   hint="Cleanup stale volumes by hand")
3960
3961   def Exec(self, feedback_fn):
3962     """Execute disk replacement.
3963
3964     This dispatches the disk replacement to the appropriate handler.
3965
3966     """
3967     instance = self.instance
3968
3969     # Activate the instance disks if we're replacing them on a down instance
3970     if instance.status == "down":
3971       op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3972       self.proc.ChainOpCode(op)
3973
3974     if instance.disk_template == constants.DT_DRBD8:
3975       if self.op.remote_node is None:
3976         fn = self._ExecD8DiskOnly
3977       else:
3978         fn = self._ExecD8Secondary
3979     else:
3980       raise errors.ProgrammerError("Unhandled disk replacement case")
3981
3982     ret = fn(feedback_fn)
3983
3984     # Deactivate the instance disks if we're replacing them on a down instance
3985     if instance.status == "down":
3986       op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3987       self.proc.ChainOpCode(op)
3988
3989     return ret
3990
3991
3992 class LUGrowDisk(LogicalUnit):
3993   """Grow a disk of an instance.
3994
3995   """
3996   HPATH = "disk-grow"
3997   HTYPE = constants.HTYPE_INSTANCE
3998   _OP_REQP = ["instance_name", "disk", "amount"]
3999
4000   def BuildHooksEnv(self):
4001     """Build hooks env.
4002
4003     This runs on the master, the primary and all the secondaries.
4004
4005     """
4006     env = {
4007       "DISK": self.op.disk,
4008       "AMOUNT": self.op.amount,
4009       }
4010     env.update(_BuildInstanceHookEnvByObject(self.instance))
4011     nl = [
4012       self.sstore.GetMasterNode(),
4013       self.instance.primary_node,
4014       ]
4015     return env, nl, nl
4016
4017   def CheckPrereq(self):
4018     """Check prerequisites.
4019
4020     This checks that the instance is in the cluster.
4021
4022     """
4023     instance = self.cfg.GetInstanceInfo(
4024       self.cfg.ExpandInstanceName(self.op.instance_name))
4025     if instance is None:
4026       raise errors.OpPrereqError("Instance '%s' not known" %
4027                                  self.op.instance_name)
4028     self.instance = instance
4029     self.op.instance_name = instance.name
4030
4031     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4032       raise errors.OpPrereqError("Instance's disk layout does not support"
4033                                  " growing.")
4034
4035     if instance.FindDisk(self.op.disk) is None:
4036       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4037                                  (self.op.disk, instance.name))
4038
4039     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4040     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
4041     for node in nodenames:
4042       info = nodeinfo.get(node, None)
4043       if not info:
4044         raise errors.OpPrereqError("Cannot get current information"
4045                                    " from node '%s'" % node)
4046       vg_free = info.get('vg_free', None)
4047       if not isinstance(vg_free, int):
4048         raise errors.OpPrereqError("Can't compute free disk space on"
4049                                    " node %s" % node)
4050       if self.op.amount > info['vg_free']:
4051         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4052                                    " %d MiB available, %d MiB required" %
4053                                    (node, info['vg_free'], self.op.amount))
4054
4055   def Exec(self, feedback_fn):
4056     """Execute disk grow.
4057
4058     """
4059     instance = self.instance
4060     disk = instance.FindDisk(self.op.disk)
4061     for node in (instance.secondary_nodes + (instance.primary_node,)):
4062       self.cfg.SetDiskID(disk, node)
4063       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4064       if not result or not isinstance(result, tuple) or len(result) != 2:
4065         raise errors.OpExecError("grow request failed to node %s" % node)
4066       elif not result[0]:
4067         raise errors.OpExecError("grow request failed to node %s: %s" %
4068                                  (node, result[1]))
4069     disk.RecordGrow(self.op.amount)
4070     self.cfg.Update(instance)
4071     return
4072
4073
4074 class LUQueryInstanceData(NoHooksLU):
4075   """Query runtime instance data.
4076
4077   """
4078   _OP_REQP = ["instances"]
4079
4080   def CheckPrereq(self):
4081     """Check prerequisites.
4082
4083     This only checks the optional instance list against the existing names.
4084
4085     """
4086     if not isinstance(self.op.instances, list):
4087       raise errors.OpPrereqError("Invalid argument type 'instances'")
4088     if self.op.instances:
4089       self.wanted_instances = []
4090       names = self.op.instances
4091       for name in names:
4092         instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4093         if instance is None:
4094           raise errors.OpPrereqError("No such instance name '%s'" % name)
4095         self.wanted_instances.append(instance)
4096     else:
4097       self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4098                                in self.cfg.GetInstanceList()]
4099     return
4100
4101
4102   def _ComputeDiskStatus(self, instance, snode, dev):
4103     """Compute block device status.
4104
4105     """
4106     self.cfg.SetDiskID(dev, instance.primary_node)
4107     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4108     if dev.dev_type in constants.LDS_DRBD:
4109       # we change the snode then (otherwise we use the one passed in)
4110       if dev.logical_id[0] == instance.primary_node:
4111         snode = dev.logical_id[1]
4112       else:
4113         snode = dev.logical_id[0]
4114
4115     if snode:
4116       self.cfg.SetDiskID(dev, snode)
4117       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4118     else:
4119       dev_sstatus = None
4120
4121     if dev.children:
4122       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4123                       for child in dev.children]
4124     else:
4125       dev_children = []
4126
4127     data = {
4128       "iv_name": dev.iv_name,
4129       "dev_type": dev.dev_type,
4130       "logical_id": dev.logical_id,
4131       "physical_id": dev.physical_id,
4132       "pstatus": dev_pstatus,
4133       "sstatus": dev_sstatus,
4134       "children": dev_children,
4135       }
4136
4137     return data
4138
4139   def Exec(self, feedback_fn):
4140     """Gather and return data"""
4141     result = {}
4142     for instance in self.wanted_instances:
4143       remote_info = rpc.call_instance_info(instance.primary_node,
4144                                                 instance.name)
4145       if remote_info and "state" in remote_info:
4146         remote_state = "up"
4147       else:
4148         remote_state = "down"
4149       if instance.status == "down":
4150         config_state = "down"
4151       else:
4152         config_state = "up"
4153
4154       disks = [self._ComputeDiskStatus(instance, None, device)
4155                for device in instance.disks]
4156
4157       idict = {
4158         "name": instance.name,
4159         "config_state": config_state,
4160         "run_state": remote_state,
4161         "pnode": instance.primary_node,
4162         "snodes": instance.secondary_nodes,
4163         "os": instance.os,
4164         "memory": instance.memory,
4165         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4166         "disks": disks,
4167         "vcpus": instance.vcpus,
4168         }
4169
4170       htkind = self.sstore.GetHypervisorType()
4171       if htkind == constants.HT_XEN_PVM30:
4172         idict["kernel_path"] = instance.kernel_path
4173         idict["initrd_path"] = instance.initrd_path
4174
4175       if htkind == constants.HT_XEN_HVM31:
4176         idict["hvm_boot_order"] = instance.hvm_boot_order
4177         idict["hvm_acpi"] = instance.hvm_acpi
4178         idict["hvm_pae"] = instance.hvm_pae
4179         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4180         idict["hvm_nic_type"] = instance.hvm_nic_type
4181         idict["hvm_disk_type"] = instance.hvm_disk_type
4182
4183       if htkind in constants.HTS_REQ_PORT:
4184         if instance.vnc_bind_address is None:
4185           vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4186         else:
4187           vnc_bind_address = instance.vnc_bind_address
4188         if instance.network_port is None:
4189           vnc_console_port = None
4190         elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4191           vnc_console_port = "%s:%s" % (instance.primary_node,
4192                                        instance.network_port)
4193         elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4194           vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4195                                                    instance.network_port,
4196                                                    instance.primary_node)
4197         else:
4198           vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4199                                         instance.network_port)
4200         idict["vnc_console_port"] = vnc_console_port
4201         idict["vnc_bind_address"] = vnc_bind_address
4202         idict["network_port"] = instance.network_port
4203
4204       result[instance.name] = idict
4205
4206     return result
4207
4208
4209 class LUSetInstanceParams(LogicalUnit):
4210   """Modifies an instances's parameters.
4211
4212   """
4213   HPATH = "instance-modify"
4214   HTYPE = constants.HTYPE_INSTANCE
4215   _OP_REQP = ["instance_name"]
4216   REQ_BGL = False
4217
4218   def ExpandNames(self):
4219     self._ExpandAndLockInstance()
4220
4221   def BuildHooksEnv(self):
4222     """Build hooks env.
4223
4224     This runs on the master, primary and secondaries.
4225
4226     """
4227     args = dict()
4228     if self.mem:
4229       args['memory'] = self.mem
4230     if self.vcpus:
4231       args['vcpus'] = self.vcpus
4232     if self.do_ip or self.do_bridge or self.mac:
4233       if self.do_ip:
4234         ip = self.ip
4235       else:
4236         ip = self.instance.nics[0].ip
4237       if self.bridge:
4238         bridge = self.bridge
4239       else:
4240         bridge = self.instance.nics[0].bridge
4241       if self.mac:
4242         mac = self.mac
4243       else:
4244         mac = self.instance.nics[0].mac
4245       args['nics'] = [(ip, bridge, mac)]
4246     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4247     nl = [self.sstore.GetMasterNode(),
4248           self.instance.primary_node] + list(self.instance.secondary_nodes)
4249     return env, nl, nl
4250
4251   def CheckPrereq(self):
4252     """Check prerequisites.
4253
4254     This only checks the instance list against the existing names.
4255
4256     """
4257     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4258     # a separate CheckArguments function, if we implement one, so the operation
4259     # can be aborted without waiting for any lock, should it have an error...
4260     self.mem = getattr(self.op, "mem", None)
4261     self.vcpus = getattr(self.op, "vcpus", None)
4262     self.ip = getattr(self.op, "ip", None)
4263     self.mac = getattr(self.op, "mac", None)
4264     self.bridge = getattr(self.op, "bridge", None)
4265     self.kernel_path = getattr(self.op, "kernel_path", None)
4266     self.initrd_path = getattr(self.op, "initrd_path", None)
4267     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4268     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4269     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4270     self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4271     self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4272     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4273     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4274     self.force = getattr(self.op, "force", None)
4275     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4276                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4277                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4278                  self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4279     if all_parms.count(None) == len(all_parms):
4280       raise errors.OpPrereqError("No changes submitted")
4281     if self.mem is not None:
4282       try:
4283         self.mem = int(self.mem)
4284       except ValueError, err:
4285         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4286     if self.vcpus is not None:
4287       try:
4288         self.vcpus = int(self.vcpus)
4289       except ValueError, err:
4290         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4291     if self.ip is not None:
4292       self.do_ip = True
4293       if self.ip.lower() == "none":
4294         self.ip = None
4295       else:
4296         if not utils.IsValidIP(self.ip):
4297           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4298     else:
4299       self.do_ip = False
4300     self.do_bridge = (self.bridge is not None)
4301     if self.mac is not None:
4302       if self.cfg.IsMacInUse(self.mac):
4303         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4304                                    self.mac)
4305       if not utils.IsValidMac(self.mac):
4306         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4307
4308     if self.kernel_path is not None:
4309       self.do_kernel_path = True
4310       if self.kernel_path == constants.VALUE_NONE:
4311         raise errors.OpPrereqError("Can't set instance to no kernel")
4312
4313       if self.kernel_path != constants.VALUE_DEFAULT:
4314         if not os.path.isabs(self.kernel_path):
4315           raise errors.OpPrereqError("The kernel path must be an absolute"
4316                                     " filename")
4317     else:
4318       self.do_kernel_path = False
4319
4320     if self.initrd_path is not None:
4321       self.do_initrd_path = True
4322       if self.initrd_path not in (constants.VALUE_NONE,
4323                                   constants.VALUE_DEFAULT):
4324         if not os.path.isabs(self.initrd_path):
4325           raise errors.OpPrereqError("The initrd path must be an absolute"
4326                                     " filename")
4327     else:
4328       self.do_initrd_path = False
4329
4330     # boot order verification
4331     if self.hvm_boot_order is not None:
4332       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4333         if len(self.hvm_boot_order.strip("acdn")) != 0:
4334           raise errors.OpPrereqError("invalid boot order specified,"
4335                                      " must be one or more of [acdn]"
4336                                      " or 'default'")
4337
4338     # hvm_cdrom_image_path verification
4339     if self.op.hvm_cdrom_image_path is not None:
4340       if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4341               self.op.hvm_cdrom_image_path.lower() == "none"):
4342         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4343                                    " be an absolute path or None, not %s" %
4344                                    self.op.hvm_cdrom_image_path)
4345       if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4346               self.op.hvm_cdrom_image_path.lower() == "none"):
4347         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4348                                    " regular file or a symlink pointing to"
4349                                    " an existing regular file, not %s" %
4350                                    self.op.hvm_cdrom_image_path)
4351
4352     # vnc_bind_address verification
4353     if self.op.vnc_bind_address is not None:
4354       if not utils.IsValidIP(self.op.vnc_bind_address):
4355         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4356                                    " like a valid IP address" %
4357                                    self.op.vnc_bind_address)
4358
4359     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4360     assert self.instance is not None, \
4361       "Cannot retrieve locked instance %s" % self.op.instance_name
4362     self.warn = []
4363     if self.mem is not None and not self.force:
4364       pnode = self.instance.primary_node
4365       nodelist = [pnode]
4366       nodelist.extend(instance.secondary_nodes)
4367       instance_info = rpc.call_instance_info(pnode, instance.name)
4368       nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
4369
4370       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4371         # Assume the primary node is unreachable and go ahead
4372         self.warn.append("Can't get info from primary node %s" % pnode)
4373       else:
4374         if instance_info:
4375           current_mem = instance_info['memory']
4376         else:
4377           # Assume instance not running
4378           # (there is a slight race condition here, but it's not very probable,
4379           # and we have no other way to check)
4380           current_mem = 0
4381         miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4382         if miss_mem > 0:
4383           raise errors.OpPrereqError("This change will prevent the instance"
4384                                      " from starting, due to %d MB of memory"
4385                                      " missing on its primary node" % miss_mem)
4386
4387       for node in instance.secondary_nodes:
4388         if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4389           self.warn.append("Can't get info from secondary node %s" % node)
4390         elif self.mem > nodeinfo[node]['memory_free']:
4391           self.warn.append("Not enough memory to failover instance to secondary"
4392                            " node %s" % node)
4393
4394     # Xen HVM device type checks
4395     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
4396       if self.op.hvm_nic_type is not None:
4397         if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4398           raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4399                                      " HVM  hypervisor" % self.op.hvm_nic_type)
4400       if self.op.hvm_disk_type is not None:
4401         if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4402           raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4403                                      " HVM hypervisor" % self.op.hvm_disk_type)
4404
4405     return
4406
4407   def Exec(self, feedback_fn):
4408     """Modifies an instance.
4409
4410     All parameters take effect only at the next restart of the instance.
4411     """
4412     # Process here the warnings from CheckPrereq, as we don't have a
4413     # feedback_fn there.
4414     for warn in self.warn:
4415       feedback_fn("WARNING: %s" % warn)
4416
4417     result = []
4418     instance = self.instance
4419     if self.mem:
4420       instance.memory = self.mem
4421       result.append(("mem", self.mem))
4422     if self.vcpus:
4423       instance.vcpus = self.vcpus
4424       result.append(("vcpus",  self.vcpus))
4425     if self.do_ip:
4426       instance.nics[0].ip = self.ip
4427       result.append(("ip", self.ip))
4428     if self.bridge:
4429       instance.nics[0].bridge = self.bridge
4430       result.append(("bridge", self.bridge))
4431     if self.mac:
4432       instance.nics[0].mac = self.mac
4433       result.append(("mac", self.mac))
4434     if self.do_kernel_path:
4435       instance.kernel_path = self.kernel_path
4436       result.append(("kernel_path", self.kernel_path))
4437     if self.do_initrd_path:
4438       instance.initrd_path = self.initrd_path
4439       result.append(("initrd_path", self.initrd_path))
4440     if self.hvm_boot_order:
4441       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4442         instance.hvm_boot_order = None
4443       else:
4444         instance.hvm_boot_order = self.hvm_boot_order
4445       result.append(("hvm_boot_order", self.hvm_boot_order))
4446     if self.hvm_acpi is not None:
4447       instance.hvm_acpi = self.hvm_acpi
4448       result.append(("hvm_acpi", self.hvm_acpi))
4449     if self.hvm_pae is not None:
4450       instance.hvm_pae = self.hvm_pae
4451       result.append(("hvm_pae", self.hvm_pae))
4452     if self.hvm_nic_type is not None:
4453       instance.hvm_nic_type = self.hvm_nic_type
4454       result.append(("hvm_nic_type", self.hvm_nic_type))
4455     if self.hvm_disk_type is not None:
4456       instance.hvm_disk_type = self.hvm_disk_type
4457       result.append(("hvm_disk_type", self.hvm_disk_type))
4458     if self.hvm_cdrom_image_path:
4459       if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4460         instance.hvm_cdrom_image_path = None
4461       else:
4462         instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4463       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4464     if self.vnc_bind_address:
4465       instance.vnc_bind_address = self.vnc_bind_address
4466       result.append(("vnc_bind_address", self.vnc_bind_address))
4467
4468     self.cfg.Update(instance)
4469
4470     return result
4471
4472
4473 class LUQueryExports(NoHooksLU):
4474   """Query the exports list
4475
4476   """
4477   _OP_REQP = ['nodes']
4478   REQ_BGL = False
4479
4480   def ExpandNames(self):
4481     self.needed_locks = {}
4482     self.share_locks[locking.LEVEL_NODE] = 1
4483     if not self.op.nodes:
4484       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4485     else:
4486       self.needed_locks[locking.LEVEL_NODE] = \
4487         _GetWantedNodes(self, self.op.nodes)
4488
4489   def CheckPrereq(self):
4490     """Check prerequisites.
4491
4492     """
4493     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4494
4495   def Exec(self, feedback_fn):
4496     """Compute the list of all the exported system images.
4497
4498     Returns:
4499       a dictionary with the structure node->(export-list)
4500       where export-list is a list of the instances exported on
4501       that node.
4502
4503     """
4504     return rpc.call_export_list(self.nodes)
4505
4506
4507 class LUExportInstance(LogicalUnit):
4508   """Export an instance to an image in the cluster.
4509
4510   """
4511   HPATH = "instance-export"
4512   HTYPE = constants.HTYPE_INSTANCE
4513   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4514
4515   def BuildHooksEnv(self):
4516     """Build hooks env.
4517
4518     This will run on the master, primary node and target node.
4519
4520     """
4521     env = {
4522       "EXPORT_NODE": self.op.target_node,
4523       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4524       }
4525     env.update(_BuildInstanceHookEnvByObject(self.instance))
4526     nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4527           self.op.target_node]
4528     return env, nl, nl
4529
4530   def CheckPrereq(self):
4531     """Check prerequisites.
4532
4533     This checks that the instance and node names are valid.
4534
4535     """
4536     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4537     self.instance = self.cfg.GetInstanceInfo(instance_name)
4538     if self.instance is None:
4539       raise errors.OpPrereqError("Instance '%s' not found" %
4540                                  self.op.instance_name)
4541
4542     # node verification
4543     dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4544     self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4545
4546     if self.dst_node is None:
4547       raise errors.OpPrereqError("Destination node '%s' is unknown." %
4548                                  self.op.target_node)
4549     self.op.target_node = self.dst_node.name
4550
4551     # instance disk type verification
4552     for disk in self.instance.disks:
4553       if disk.dev_type == constants.LD_FILE:
4554         raise errors.OpPrereqError("Export not supported for instances with"
4555                                    " file-based disks")
4556
4557   def Exec(self, feedback_fn):
4558     """Export an instance to an image in the cluster.
4559
4560     """
4561     instance = self.instance
4562     dst_node = self.dst_node
4563     src_node = instance.primary_node
4564     if self.op.shutdown:
4565       # shutdown the instance, but not the disks
4566       if not rpc.call_instance_shutdown(src_node, instance):
4567         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4568                                  (instance.name, src_node))
4569
4570     vgname = self.cfg.GetVGName()
4571
4572     snap_disks = []
4573
4574     try:
4575       for disk in instance.disks:
4576         if disk.iv_name == "sda":
4577           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4578           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4579
4580           if not new_dev_name:
4581             logger.Error("could not snapshot block device %s on node %s" %
4582                          (disk.logical_id[1], src_node))
4583           else:
4584             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4585                                       logical_id=(vgname, new_dev_name),
4586                                       physical_id=(vgname, new_dev_name),
4587                                       iv_name=disk.iv_name)
4588             snap_disks.append(new_dev)
4589
4590     finally:
4591       if self.op.shutdown and instance.status == "up":
4592         if not rpc.call_instance_start(src_node, instance, None):
4593           _ShutdownInstanceDisks(instance, self.cfg)
4594           raise errors.OpExecError("Could not start instance")
4595
4596     # TODO: check for size
4597
4598     for dev in snap_disks:
4599       if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4600         logger.Error("could not export block device %s from node %s to node %s"
4601                      % (dev.logical_id[1], src_node, dst_node.name))
4602       if not rpc.call_blockdev_remove(src_node, dev):
4603         logger.Error("could not remove snapshot block device %s from node %s" %
4604                      (dev.logical_id[1], src_node))
4605
4606     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4607       logger.Error("could not finalize export for instance %s on node %s" %
4608                    (instance.name, dst_node.name))
4609
4610     nodelist = self.cfg.GetNodeList()
4611     nodelist.remove(dst_node.name)
4612
4613     # on one-node clusters nodelist will be empty after the removal
4614     # if we proceed the backup would be removed because OpQueryExports
4615     # substitutes an empty list with the full cluster node list.
4616     if nodelist:
4617       exportlist = rpc.call_export_list(nodelist)
4618       for node in exportlist:
4619         if instance.name in exportlist[node]:
4620           if not rpc.call_export_remove(node, instance.name):
4621             logger.Error("could not remove older export for instance %s"
4622                          " on node %s" % (instance.name, node))
4623
4624
4625 class LURemoveExport(NoHooksLU):
4626   """Remove exports related to the named instance.
4627
4628   """
4629   _OP_REQP = ["instance_name"]
4630
4631   def CheckPrereq(self):
4632     """Check prerequisites.
4633     """
4634     pass
4635
4636   def Exec(self, feedback_fn):
4637     """Remove any export.
4638
4639     """
4640     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4641     # If the instance was not found we'll try with the name that was passed in.
4642     # This will only work if it was an FQDN, though.
4643     fqdn_warn = False
4644     if not instance_name:
4645       fqdn_warn = True
4646       instance_name = self.op.instance_name
4647
4648     exportlist = rpc.call_export_list(self.cfg.GetNodeList())
4649     found = False
4650     for node in exportlist:
4651       if instance_name in exportlist[node]:
4652         found = True
4653         if not rpc.call_export_remove(node, instance_name):
4654           logger.Error("could not remove export for instance %s"
4655                        " on node %s" % (instance_name, node))
4656
4657     if fqdn_warn and not found:
4658       feedback_fn("Export not found. If trying to remove an export belonging"
4659                   " to a deleted instance please use its Fully Qualified"
4660                   " Domain Name.")
4661
4662
4663 class TagsLU(NoHooksLU):
4664   """Generic tags LU.
4665
4666   This is an abstract class which is the parent of all the other tags LUs.
4667
4668   """
4669   def CheckPrereq(self):
4670     """Check prerequisites.
4671
4672     """
4673     if self.op.kind == constants.TAG_CLUSTER:
4674       self.target = self.cfg.GetClusterInfo()
4675     elif self.op.kind == constants.TAG_NODE:
4676       name = self.cfg.ExpandNodeName(self.op.name)
4677       if name is None:
4678         raise errors.OpPrereqError("Invalid node name (%s)" %
4679                                    (self.op.name,))
4680       self.op.name = name
4681       self.target = self.cfg.GetNodeInfo(name)
4682     elif self.op.kind == constants.TAG_INSTANCE:
4683       name = self.cfg.ExpandInstanceName(self.op.name)
4684       if name is None:
4685         raise errors.OpPrereqError("Invalid instance name (%s)" %
4686                                    (self.op.name,))
4687       self.op.name = name
4688       self.target = self.cfg.GetInstanceInfo(name)
4689     else:
4690       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4691                                  str(self.op.kind))
4692
4693
4694 class LUGetTags(TagsLU):
4695   """Returns the tags of a given object.
4696
4697   """
4698   _OP_REQP = ["kind", "name"]
4699
4700   def Exec(self, feedback_fn):
4701     """Returns the tag list.
4702
4703     """
4704     return list(self.target.GetTags())
4705
4706
4707 class LUSearchTags(NoHooksLU):
4708   """Searches the tags for a given pattern.
4709
4710   """
4711   _OP_REQP = ["pattern"]
4712
4713   def CheckPrereq(self):
4714     """Check prerequisites.
4715
4716     This checks the pattern passed for validity by compiling it.
4717
4718     """
4719     try:
4720       self.re = re.compile(self.op.pattern)
4721     except re.error, err:
4722       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4723                                  (self.op.pattern, err))
4724
4725   def Exec(self, feedback_fn):
4726     """Returns the tag list.
4727
4728     """
4729     cfg = self.cfg
4730     tgts = [("/cluster", cfg.GetClusterInfo())]
4731     ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4732     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4733     nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4734     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4735     results = []
4736     for path, target in tgts:
4737       for tag in target.GetTags():
4738         if self.re.search(tag):
4739           results.append((path, tag))
4740     return results
4741
4742
4743 class LUAddTags(TagsLU):
4744   """Sets a tag on a given object.
4745
4746   """
4747   _OP_REQP = ["kind", "name", "tags"]
4748
4749   def CheckPrereq(self):
4750     """Check prerequisites.
4751
4752     This checks the type and length of the tag name and value.
4753
4754     """
4755     TagsLU.CheckPrereq(self)
4756     for tag in self.op.tags:
4757       objects.TaggableObject.ValidateTag(tag)
4758
4759   def Exec(self, feedback_fn):
4760     """Sets the tag.
4761
4762     """
4763     try:
4764       for tag in self.op.tags:
4765         self.target.AddTag(tag)
4766     except errors.TagError, err:
4767       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4768     try:
4769       self.cfg.Update(self.target)
4770     except errors.ConfigurationError:
4771       raise errors.OpRetryError("There has been a modification to the"
4772                                 " config file and the operation has been"
4773                                 " aborted. Please retry.")
4774
4775
4776 class LUDelTags(TagsLU):
4777   """Delete a list of tags from a given object.
4778
4779   """
4780   _OP_REQP = ["kind", "name", "tags"]
4781
4782   def CheckPrereq(self):
4783     """Check prerequisites.
4784
4785     This checks that we have the given tag.
4786
4787     """
4788     TagsLU.CheckPrereq(self)
4789     for tag in self.op.tags:
4790       objects.TaggableObject.ValidateTag(tag)
4791     del_tags = frozenset(self.op.tags)
4792     cur_tags = self.target.GetTags()
4793     if not del_tags <= cur_tags:
4794       diff_tags = del_tags - cur_tags
4795       diff_names = ["'%s'" % tag for tag in diff_tags]
4796       diff_names.sort()
4797       raise errors.OpPrereqError("Tag(s) %s not found" %
4798                                  (",".join(diff_names)))
4799
4800   def Exec(self, feedback_fn):
4801     """Remove the tag from the object.
4802
4803     """
4804     for tag in self.op.tags:
4805       self.target.RemoveTag(tag)
4806     try:
4807       self.cfg.Update(self.target)
4808     except errors.ConfigurationError:
4809       raise errors.OpRetryError("There has been a modification to the"
4810                                 " config file and the operation has been"
4811                                 " aborted. Please retry.")
4812
4813
4814 class LUTestDelay(NoHooksLU):
4815   """Sleep for a specified amount of time.
4816
4817   This LU sleeps on the master and/or nodes for a specified amount of
4818   time.
4819
4820   """
4821   _OP_REQP = ["duration", "on_master", "on_nodes"]
4822   REQ_BGL = False
4823
4824   def ExpandNames(self):
4825     """Expand names and set required locks.
4826
4827     This expands the node list, if any.
4828
4829     """
4830     self.needed_locks = {}
4831     if self.op.on_nodes:
4832       # _GetWantedNodes can be used here, but is not always appropriate to use
4833       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4834       # more information.
4835       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4836       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4837
4838   def CheckPrereq(self):
4839     """Check prerequisites.
4840
4841     """
4842
4843   def Exec(self, feedback_fn):
4844     """Do the actual sleep.
4845
4846     """
4847     if self.op.on_master:
4848       if not utils.TestDelay(self.op.duration):
4849         raise errors.OpExecError("Error during master delay test")
4850     if self.op.on_nodes:
4851       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4852       if not result:
4853         raise errors.OpExecError("Complete failure from rpc call")
4854       for node, node_result in result.items():
4855         if not node_result:
4856           raise errors.OpExecError("Failure during rpc call to node %s,"
4857                                    " result: %s" % (node, node_result))
4858
4859
4860 class IAllocator(object):
4861   """IAllocator framework.
4862
4863   An IAllocator instance has three sets of attributes:
4864     - cfg/sstore that are needed to query the cluster
4865     - input data (all members of the _KEYS class attribute are required)
4866     - four buffer attributes (in|out_data|text), that represent the
4867       input (to the external script) in text and data structure format,
4868       and the output from it, again in two formats
4869     - the result variables from the script (success, info, nodes) for
4870       easy usage
4871
4872   """
4873   _ALLO_KEYS = [
4874     "mem_size", "disks", "disk_template",
4875     "os", "tags", "nics", "vcpus",
4876     ]
4877   _RELO_KEYS = [
4878     "relocate_from",
4879     ]
4880
4881   def __init__(self, cfg, sstore, mode, name, **kwargs):
4882     self.cfg = cfg
4883     self.sstore = sstore
4884     # init buffer variables
4885     self.in_text = self.out_text = self.in_data = self.out_data = None
4886     # init all input fields so that pylint is happy
4887     self.mode = mode
4888     self.name = name
4889     self.mem_size = self.disks = self.disk_template = None
4890     self.os = self.tags = self.nics = self.vcpus = None
4891     self.relocate_from = None
4892     # computed fields
4893     self.required_nodes = None
4894     # init result fields
4895     self.success = self.info = self.nodes = None
4896     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4897       keyset = self._ALLO_KEYS
4898     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4899       keyset = self._RELO_KEYS
4900     else:
4901       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4902                                    " IAllocator" % self.mode)
4903     for key in kwargs:
4904       if key not in keyset:
4905         raise errors.ProgrammerError("Invalid input parameter '%s' to"
4906                                      " IAllocator" % key)
4907       setattr(self, key, kwargs[key])
4908     for key in keyset:
4909       if key not in kwargs:
4910         raise errors.ProgrammerError("Missing input parameter '%s' to"
4911                                      " IAllocator" % key)
4912     self._BuildInputData()
4913
4914   def _ComputeClusterData(self):
4915     """Compute the generic allocator input data.
4916
4917     This is the data that is independent of the actual operation.
4918
4919     """
4920     cfg = self.cfg
4921     # cluster data
4922     data = {
4923       "version": 1,
4924       "cluster_name": self.sstore.GetClusterName(),
4925       "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4926       "hypervisor_type": self.sstore.GetHypervisorType(),
4927       # we don't have job IDs
4928       }
4929
4930     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4931
4932     # node data
4933     node_results = {}
4934     node_list = cfg.GetNodeList()
4935     node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4936     for nname in node_list:
4937       ninfo = cfg.GetNodeInfo(nname)
4938       if nname not in node_data or not isinstance(node_data[nname], dict):
4939         raise errors.OpExecError("Can't get data for node %s" % nname)
4940       remote_info = node_data[nname]
4941       for attr in ['memory_total', 'memory_free', 'memory_dom0',
4942                    'vg_size', 'vg_free', 'cpu_total']:
4943         if attr not in remote_info:
4944           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4945                                    (nname, attr))
4946         try:
4947           remote_info[attr] = int(remote_info[attr])
4948         except ValueError, err:
4949           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4950                                    " %s" % (nname, attr, str(err)))
4951       # compute memory used by primary instances
4952       i_p_mem = i_p_up_mem = 0
4953       for iinfo in i_list:
4954         if iinfo.primary_node == nname:
4955           i_p_mem += iinfo.memory
4956           if iinfo.status == "up":
4957             i_p_up_mem += iinfo.memory
4958
4959       # compute memory used by instances
4960       pnr = {
4961         "tags": list(ninfo.GetTags()),
4962         "total_memory": remote_info['memory_total'],
4963         "reserved_memory": remote_info['memory_dom0'],
4964         "free_memory": remote_info['memory_free'],
4965         "i_pri_memory": i_p_mem,
4966         "i_pri_up_memory": i_p_up_mem,
4967         "total_disk": remote_info['vg_size'],
4968         "free_disk": remote_info['vg_free'],
4969         "primary_ip": ninfo.primary_ip,
4970         "secondary_ip": ninfo.secondary_ip,
4971         "total_cpus": remote_info['cpu_total'],
4972         }
4973       node_results[nname] = pnr
4974     data["nodes"] = node_results
4975
4976     # instance data
4977     instance_data = {}
4978     for iinfo in i_list:
4979       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4980                   for n in iinfo.nics]
4981       pir = {
4982         "tags": list(iinfo.GetTags()),
4983         "should_run": iinfo.status == "up",
4984         "vcpus": iinfo.vcpus,
4985         "memory": iinfo.memory,
4986         "os": iinfo.os,
4987         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4988         "nics": nic_data,
4989         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4990         "disk_template": iinfo.disk_template,
4991         }
4992       instance_data[iinfo.name] = pir
4993
4994     data["instances"] = instance_data
4995
4996     self.in_data = data
4997
4998   def _AddNewInstance(self):
4999     """Add new instance data to allocator structure.
5000
5001     This in combination with _AllocatorGetClusterData will create the
5002     correct structure needed as input for the allocator.
5003
5004     The checks for the completeness of the opcode must have already been
5005     done.
5006
5007     """
5008     data = self.in_data
5009     if len(self.disks) != 2:
5010       raise errors.OpExecError("Only two-disk configurations supported")
5011
5012     disk_space = _ComputeDiskSize(self.disk_template,
5013                                   self.disks[0]["size"], self.disks[1]["size"])
5014
5015     if self.disk_template in constants.DTS_NET_MIRROR:
5016       self.required_nodes = 2
5017     else:
5018       self.required_nodes = 1
5019     request = {
5020       "type": "allocate",
5021       "name": self.name,
5022       "disk_template": self.disk_template,
5023       "tags": self.tags,
5024       "os": self.os,
5025       "vcpus": self.vcpus,
5026       "memory": self.mem_size,
5027       "disks": self.disks,
5028       "disk_space_total": disk_space,
5029       "nics": self.nics,
5030       "required_nodes": self.required_nodes,
5031       }
5032     data["request"] = request
5033
5034   def _AddRelocateInstance(self):
5035     """Add relocate instance data to allocator structure.
5036
5037     This in combination with _IAllocatorGetClusterData will create the
5038     correct structure needed as input for the allocator.
5039
5040     The checks for the completeness of the opcode must have already been
5041     done.
5042
5043     """
5044     instance = self.cfg.GetInstanceInfo(self.name)
5045     if instance is None:
5046       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5047                                    " IAllocator" % self.name)
5048
5049     if instance.disk_template not in constants.DTS_NET_MIRROR:
5050       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5051
5052     if len(instance.secondary_nodes) != 1:
5053       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5054
5055     self.required_nodes = 1
5056
5057     disk_space = _ComputeDiskSize(instance.disk_template,
5058                                   instance.disks[0].size,
5059                                   instance.disks[1].size)
5060
5061     request = {
5062       "type": "relocate",
5063       "name": self.name,
5064       "disk_space_total": disk_space,
5065       "required_nodes": self.required_nodes,
5066       "relocate_from": self.relocate_from,
5067       }
5068     self.in_data["request"] = request
5069
5070   def _BuildInputData(self):
5071     """Build input data structures.
5072
5073     """
5074     self._ComputeClusterData()
5075
5076     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5077       self._AddNewInstance()
5078     else:
5079       self._AddRelocateInstance()
5080
5081     self.in_text = serializer.Dump(self.in_data)
5082
5083   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5084     """Run an instance allocator and return the results.
5085
5086     """
5087     data = self.in_text
5088
5089     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
5090
5091     if not isinstance(result, tuple) or len(result) != 4:
5092       raise errors.OpExecError("Invalid result from master iallocator runner")
5093
5094     rcode, stdout, stderr, fail = result
5095
5096     if rcode == constants.IARUN_NOTFOUND:
5097       raise errors.OpExecError("Can't find allocator '%s'" % name)
5098     elif rcode == constants.IARUN_FAILURE:
5099       raise errors.OpExecError("Instance allocator call failed: %s,"
5100                                " output: %s" % (fail, stdout+stderr))
5101     self.out_text = stdout
5102     if validate:
5103       self._ValidateResult()
5104
5105   def _ValidateResult(self):
5106     """Process the allocator results.
5107
5108     This will process and if successful save the result in
5109     self.out_data and the other parameters.
5110
5111     """
5112     try:
5113       rdict = serializer.Load(self.out_text)
5114     except Exception, err:
5115       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5116
5117     if not isinstance(rdict, dict):
5118       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5119
5120     for key in "success", "info", "nodes":
5121       if key not in rdict:
5122         raise errors.OpExecError("Can't parse iallocator results:"
5123                                  " missing key '%s'" % key)
5124       setattr(self, key, rdict[key])
5125
5126     if not isinstance(rdict["nodes"], list):
5127       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5128                                " is not a list")
5129     self.out_data = rdict
5130
5131
5132 class LUTestAllocator(NoHooksLU):
5133   """Run allocator tests.
5134
5135   This LU runs the allocator tests
5136
5137   """
5138   _OP_REQP = ["direction", "mode", "name"]
5139
5140   def CheckPrereq(self):
5141     """Check prerequisites.
5142
5143     This checks the opcode parameters depending on the director and mode test.
5144
5145     """
5146     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5147       for attr in ["name", "mem_size", "disks", "disk_template",
5148                    "os", "tags", "nics", "vcpus"]:
5149         if not hasattr(self.op, attr):
5150           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5151                                      attr)
5152       iname = self.cfg.ExpandInstanceName(self.op.name)
5153       if iname is not None:
5154         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5155                                    iname)
5156       if not isinstance(self.op.nics, list):
5157         raise errors.OpPrereqError("Invalid parameter 'nics'")
5158       for row in self.op.nics:
5159         if (not isinstance(row, dict) or
5160             "mac" not in row or
5161             "ip" not in row or
5162             "bridge" not in row):
5163           raise errors.OpPrereqError("Invalid contents of the"
5164                                      " 'nics' parameter")
5165       if not isinstance(self.op.disks, list):
5166         raise errors.OpPrereqError("Invalid parameter 'disks'")
5167       if len(self.op.disks) != 2:
5168         raise errors.OpPrereqError("Only two-disk configurations supported")
5169       for row in self.op.disks:
5170         if (not isinstance(row, dict) or
5171             "size" not in row or
5172             not isinstance(row["size"], int) or
5173             "mode" not in row or
5174             row["mode"] not in ['r', 'w']):
5175           raise errors.OpPrereqError("Invalid contents of the"
5176                                      " 'disks' parameter")
5177     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5178       if not hasattr(self.op, "name"):
5179         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5180       fname = self.cfg.ExpandInstanceName(self.op.name)
5181       if fname is None:
5182         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5183                                    self.op.name)
5184       self.op.name = fname
5185       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5186     else:
5187       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5188                                  self.op.mode)
5189
5190     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5191       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5192         raise errors.OpPrereqError("Missing allocator name")
5193     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5194       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5195                                  self.op.direction)
5196
5197   def Exec(self, feedback_fn):
5198     """Run the allocator test.
5199
5200     """
5201     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5202       ial = IAllocator(self.cfg, self.sstore,
5203                        mode=self.op.mode,
5204                        name=self.op.name,
5205                        mem_size=self.op.mem_size,
5206                        disks=self.op.disks,
5207                        disk_template=self.op.disk_template,
5208                        os=self.op.os,
5209                        tags=self.op.tags,
5210                        nics=self.op.nics,
5211                        vcpus=self.op.vcpus,
5212                        )
5213     else:
5214       ial = IAllocator(self.cfg, self.sstore,
5215                        mode=self.op.mode,
5216                        name=self.op.name,
5217                        relocate_from=list(self.relocate_from),
5218                        )
5219
5220     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5221       result = ial.in_text
5222     else:
5223       ial.Run(self.op.allocator, validate=False)
5224       result = ial.out_text
5225     return result