Cleanup in cmdlib for standalone function calls
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_MASTER: the LU needs to run on the master node
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_MASTER = True
68   REQ_BGL = True
69
70   def __init__(self, processor, op, context):
71     """Constructor for LogicalUnit.
72
73     This needs to be overriden in derived classes in order to check op
74     validity.
75
76     """
77     self.proc = processor
78     self.op = op
79     self.cfg = context.cfg
80     self.context = context
81     # Dicts used to declare locking needs to mcpu
82     self.needed_locks = None
83     self.acquired_locks = {}
84     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
85     self.add_locks = {}
86     self.remove_locks = {}
87     # Used to force good behavior when calling helper functions
88     self.recalculate_locks = {}
89     self.__ssh = None
90
91     for attr_name in self._OP_REQP:
92       attr_val = getattr(op, attr_name, None)
93       if attr_val is None:
94         raise errors.OpPrereqError("Required parameter '%s' missing" %
95                                    attr_name)
96
97     if not self.cfg.IsCluster():
98       raise errors.OpPrereqError("Cluster not initialized yet,"
99                                  " use 'gnt-cluster init' first.")
100     if self.REQ_MASTER:
101       master = self.cfg.GetMasterNode()
102       if master != utils.HostInfo().name:
103         raise errors.OpPrereqError("Commands must be run on the master"
104                                    " node %s" % master)
105
106   def __GetSSH(self):
107     """Returns the SshRunner object
108
109     """
110     if not self.__ssh:
111       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
112     return self.__ssh
113
114   ssh = property(fget=__GetSSH)
115
116   def ExpandNames(self):
117     """Expand names for this LU.
118
119     This method is called before starting to execute the opcode, and it should
120     update all the parameters of the opcode to their canonical form (e.g. a
121     short node name must be fully expanded after this method has successfully
122     completed). This way locking, hooks, logging, ecc. can work correctly.
123
124     LUs which implement this method must also populate the self.needed_locks
125     member, as a dict with lock levels as keys, and a list of needed lock names
126     as values. Rules:
127       - Use an empty dict if you don't need any lock
128       - If you don't need any lock at a particular level omit that level
129       - Don't put anything for the BGL level
130       - If you want all locks at a level use locking.ALL_SET as a value
131
132     If you need to share locks (rather than acquire them exclusively) at one
133     level you can modify self.share_locks, setting a true value (usually 1) for
134     that level. By default locks are not shared.
135
136     Examples:
137     # Acquire all nodes and one instance
138     self.needed_locks = {
139       locking.LEVEL_NODE: locking.ALL_SET,
140       locking.LEVEL_INSTANCE: ['instance1.example.tld'],
141     }
142     # Acquire just two nodes
143     self.needed_locks = {
144       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
145     }
146     # Acquire no locks
147     self.needed_locks = {} # No, you can't leave it to the default value None
148
149     """
150     # The implementation of this method is mandatory only if the new LU is
151     # concurrent, so that old LUs don't need to be changed all at the same
152     # time.
153     if self.REQ_BGL:
154       self.needed_locks = {} # Exclusive LUs don't need locks.
155     else:
156       raise NotImplementedError
157
158   def DeclareLocks(self, level):
159     """Declare LU locking needs for a level
160
161     While most LUs can just declare their locking needs at ExpandNames time,
162     sometimes there's the need to calculate some locks after having acquired
163     the ones before. This function is called just before acquiring locks at a
164     particular level, but after acquiring the ones at lower levels, and permits
165     such calculations. It can be used to modify self.needed_locks, and by
166     default it does nothing.
167
168     This function is only called if you have something already set in
169     self.needed_locks for the level.
170
171     @param level: Locking level which is going to be locked
172     @type level: member of ganeti.locking.LEVELS
173
174     """
175
176   def CheckPrereq(self):
177     """Check prerequisites for this LU.
178
179     This method should check that the prerequisites for the execution
180     of this LU are fulfilled. It can do internode communication, but
181     it should be idempotent - no cluster or system changes are
182     allowed.
183
184     The method should raise errors.OpPrereqError in case something is
185     not fulfilled. Its return value is ignored.
186
187     This method should also update all the parameters of the opcode to
188     their canonical form if it hasn't been done by ExpandNames before.
189
190     """
191     raise NotImplementedError
192
193   def Exec(self, feedback_fn):
194     """Execute the LU.
195
196     This method should implement the actual work. It should raise
197     errors.OpExecError for failures that are somewhat dealt with in
198     code, or expected.
199
200     """
201     raise NotImplementedError
202
203   def BuildHooksEnv(self):
204     """Build hooks environment for this LU.
205
206     This method should return a three-node tuple consisting of: a dict
207     containing the environment that will be used for running the
208     specific hook for this LU, a list of node names on which the hook
209     should run before the execution, and a list of node names on which
210     the hook should run after the execution.
211
212     The keys of the dict must not have 'GANETI_' prefixed as this will
213     be handled in the hooks runner. Also note additional keys will be
214     added by the hooks runner. If the LU doesn't define any
215     environment, an empty dict (and not None) should be returned.
216
217     No nodes should be returned as an empty list (and not None).
218
219     Note that if the HPATH for a LU class is None, this function will
220     not be called.
221
222     """
223     raise NotImplementedError
224
225   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
226     """Notify the LU about the results of its hooks.
227
228     This method is called every time a hooks phase is executed, and notifies
229     the Logical Unit about the hooks' result. The LU can then use it to alter
230     its result based on the hooks.  By default the method does nothing and the
231     previous result is passed back unchanged but any LU can define it if it
232     wants to use the local cluster hook-scripts somehow.
233
234     Args:
235       phase: the hooks phase that has just been run
236       hooks_results: the results of the multi-node hooks rpc call
237       feedback_fn: function to send feedback back to the caller
238       lu_result: the previous result this LU had, or None in the PRE phase.
239
240     """
241     return lu_result
242
243   def _ExpandAndLockInstance(self):
244     """Helper function to expand and lock an instance.
245
246     Many LUs that work on an instance take its name in self.op.instance_name
247     and need to expand it and then declare the expanded name for locking. This
248     function does it, and then updates self.op.instance_name to the expanded
249     name. It also initializes needed_locks as a dict, if this hasn't been done
250     before.
251
252     """
253     if self.needed_locks is None:
254       self.needed_locks = {}
255     else:
256       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
257         "_ExpandAndLockInstance called with instance-level locks set"
258     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
259     if expanded_name is None:
260       raise errors.OpPrereqError("Instance '%s' not known" %
261                                   self.op.instance_name)
262     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
263     self.op.instance_name = expanded_name
264
265   def _LockInstancesNodes(self, primary_only=False):
266     """Helper function to declare instances' nodes for locking.
267
268     This function should be called after locking one or more instances to lock
269     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
270     with all primary or secondary nodes for instances already locked and
271     present in self.needed_locks[locking.LEVEL_INSTANCE].
272
273     It should be called from DeclareLocks, and for safety only works if
274     self.recalculate_locks[locking.LEVEL_NODE] is set.
275
276     In the future it may grow parameters to just lock some instance's nodes, or
277     to just lock primaries or secondary nodes, if needed.
278
279     If should be called in DeclareLocks in a way similar to:
280
281     if level == locking.LEVEL_NODE:
282       self._LockInstancesNodes()
283
284     @type primary_only: boolean
285     @param primary_only: only lock primary nodes of locked instances
286
287     """
288     assert locking.LEVEL_NODE in self.recalculate_locks, \
289       "_LockInstancesNodes helper function called with no nodes to recalculate"
290
291     # TODO: check if we're really been called with the instance locks held
292
293     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
294     # future we might want to have different behaviors depending on the value
295     # of self.recalculate_locks[locking.LEVEL_NODE]
296     wanted_nodes = []
297     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
298       instance = self.context.cfg.GetInstanceInfo(instance_name)
299       wanted_nodes.append(instance.primary_node)
300       if not primary_only:
301         wanted_nodes.extend(instance.secondary_nodes)
302
303     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
304       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
305     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
306       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
307
308     del self.recalculate_locks[locking.LEVEL_NODE]
309
310
311 class NoHooksLU(LogicalUnit):
312   """Simple LU which runs no hooks.
313
314   This LU is intended as a parent for other LogicalUnits which will
315   run no hooks, in order to reduce duplicate code.
316
317   """
318   HPATH = None
319   HTYPE = None
320
321
322 def _GetWantedNodes(lu, nodes):
323   """Returns list of checked and expanded node names.
324
325   Args:
326     nodes: List of nodes (strings) or None for all
327
328   """
329   if not isinstance(nodes, list):
330     raise errors.OpPrereqError("Invalid argument type 'nodes'")
331
332   if not nodes:
333     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
334       " non-empty list of nodes whose name is to be expanded.")
335
336   wanted = []
337   for name in nodes:
338     node = lu.cfg.ExpandNodeName(name)
339     if node is None:
340       raise errors.OpPrereqError("No such node name '%s'" % name)
341     wanted.append(node)
342
343   return utils.NiceSort(wanted)
344
345
346 def _GetWantedInstances(lu, instances):
347   """Returns list of checked and expanded instance names.
348
349   Args:
350     instances: List of instances (strings) or None for all
351
352   """
353   if not isinstance(instances, list):
354     raise errors.OpPrereqError("Invalid argument type 'instances'")
355
356   if instances:
357     wanted = []
358
359     for name in instances:
360       instance = lu.cfg.ExpandInstanceName(name)
361       if instance is None:
362         raise errors.OpPrereqError("No such instance name '%s'" % name)
363       wanted.append(instance)
364
365   else:
366     wanted = lu.cfg.GetInstanceList()
367   return utils.NiceSort(wanted)
368
369
370 def _CheckOutputFields(static, dynamic, selected):
371   """Checks whether all selected fields are valid.
372
373   Args:
374     static: Static fields
375     dynamic: Dynamic fields
376
377   """
378   static_fields = frozenset(static)
379   dynamic_fields = frozenset(dynamic)
380
381   all_fields = static_fields | dynamic_fields
382
383   if not all_fields.issuperset(selected):
384     raise errors.OpPrereqError("Unknown output fields selected: %s"
385                                % ",".join(frozenset(selected).
386                                           difference(all_fields)))
387
388
389 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
390                           memory, vcpus, nics):
391   """Builds instance related env variables for hooks from single variables.
392
393   Args:
394     secondary_nodes: List of secondary nodes as strings
395   """
396   env = {
397     "OP_TARGET": name,
398     "INSTANCE_NAME": name,
399     "INSTANCE_PRIMARY": primary_node,
400     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
401     "INSTANCE_OS_TYPE": os_type,
402     "INSTANCE_STATUS": status,
403     "INSTANCE_MEMORY": memory,
404     "INSTANCE_VCPUS": vcpus,
405   }
406
407   if nics:
408     nic_count = len(nics)
409     for idx, (ip, bridge, mac) in enumerate(nics):
410       if ip is None:
411         ip = ""
412       env["INSTANCE_NIC%d_IP" % idx] = ip
413       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
414       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
415   else:
416     nic_count = 0
417
418   env["INSTANCE_NIC_COUNT"] = nic_count
419
420   return env
421
422
423 def _BuildInstanceHookEnvByObject(instance, override=None):
424   """Builds instance related env variables for hooks from an object.
425
426   Args:
427     instance: objects.Instance object of instance
428     override: dict of values to override
429   """
430   args = {
431     'name': instance.name,
432     'primary_node': instance.primary_node,
433     'secondary_nodes': instance.secondary_nodes,
434     'os_type': instance.os,
435     'status': instance.os,
436     'memory': instance.memory,
437     'vcpus': instance.vcpus,
438     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
439   }
440   if override:
441     args.update(override)
442   return _BuildInstanceHookEnv(**args)
443
444
445 def _CheckInstanceBridgesExist(lu, instance):
446   """Check that the brigdes needed by an instance exist.
447
448   """
449   # check bridges existance
450   brlist = [nic.bridge for nic in instance.nics]
451   if not rpc.call_bridges_exist(instance.primary_node, brlist):
452     raise errors.OpPrereqError("one or more target bridges %s does not"
453                                " exist on destination node '%s'" %
454                                (brlist, instance.primary_node))
455
456
457 class LUDestroyCluster(NoHooksLU):
458   """Logical unit for destroying the cluster.
459
460   """
461   _OP_REQP = []
462
463   def CheckPrereq(self):
464     """Check prerequisites.
465
466     This checks whether the cluster is empty.
467
468     Any errors are signalled by raising errors.OpPrereqError.
469
470     """
471     master = self.cfg.GetMasterNode()
472
473     nodelist = self.cfg.GetNodeList()
474     if len(nodelist) != 1 or nodelist[0] != master:
475       raise errors.OpPrereqError("There are still %d node(s) in"
476                                  " this cluster." % (len(nodelist) - 1))
477     instancelist = self.cfg.GetInstanceList()
478     if instancelist:
479       raise errors.OpPrereqError("There are still %d instance(s) in"
480                                  " this cluster." % len(instancelist))
481
482   def Exec(self, feedback_fn):
483     """Destroys the cluster.
484
485     """
486     master = self.cfg.GetMasterNode()
487     if not rpc.call_node_stop_master(master, False):
488       raise errors.OpExecError("Could not disable the master role")
489     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
490     utils.CreateBackup(priv_key)
491     utils.CreateBackup(pub_key)
492     return master
493
494
495 class LUVerifyCluster(LogicalUnit):
496   """Verifies the cluster status.
497
498   """
499   HPATH = "cluster-verify"
500   HTYPE = constants.HTYPE_CLUSTER
501   _OP_REQP = ["skip_checks"]
502   REQ_BGL = False
503
504   def ExpandNames(self):
505     self.needed_locks = {
506       locking.LEVEL_NODE: locking.ALL_SET,
507       locking.LEVEL_INSTANCE: locking.ALL_SET,
508     }
509     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
510
511   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
512                   remote_version, feedback_fn):
513     """Run multiple tests against a node.
514
515     Test list:
516       - compares ganeti version
517       - checks vg existance and size > 20G
518       - checks config file checksum
519       - checks ssh to other nodes
520
521     Args:
522       node: name of the node to check
523       file_list: required list of files
524       local_cksum: dictionary of local files and their checksums
525
526     """
527     # compares ganeti version
528     local_version = constants.PROTOCOL_VERSION
529     if not remote_version:
530       feedback_fn("  - ERROR: connection to %s failed" % (node))
531       return True
532
533     if local_version != remote_version:
534       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
535                       (local_version, node, remote_version))
536       return True
537
538     # checks vg existance and size > 20G
539
540     bad = False
541     if not vglist:
542       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
543                       (node,))
544       bad = True
545     else:
546       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
547                                             constants.MIN_VG_SIZE)
548       if vgstatus:
549         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
550         bad = True
551
552     # checks config file checksum
553     # checks ssh to any
554
555     if 'filelist' not in node_result:
556       bad = True
557       feedback_fn("  - ERROR: node hasn't returned file checksum data")
558     else:
559       remote_cksum = node_result['filelist']
560       for file_name in file_list:
561         if file_name not in remote_cksum:
562           bad = True
563           feedback_fn("  - ERROR: file '%s' missing" % file_name)
564         elif remote_cksum[file_name] != local_cksum[file_name]:
565           bad = True
566           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
567
568     if 'nodelist' not in node_result:
569       bad = True
570       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
571     else:
572       if node_result['nodelist']:
573         bad = True
574         for node in node_result['nodelist']:
575           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
576                           (node, node_result['nodelist'][node]))
577     if 'node-net-test' not in node_result:
578       bad = True
579       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
580     else:
581       if node_result['node-net-test']:
582         bad = True
583         nlist = utils.NiceSort(node_result['node-net-test'].keys())
584         for node in nlist:
585           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
586                           (node, node_result['node-net-test'][node]))
587
588     hyp_result = node_result.get('hypervisor', None)
589     if isinstance(hyp_result, dict):
590       for hv_name, hv_result in hyp_result.iteritems():
591         if hv_result is not None:
592           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
593                       (hv_name, hv_result))
594     return bad
595
596   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
597                       node_instance, feedback_fn):
598     """Verify an instance.
599
600     This function checks to see if the required block devices are
601     available on the instance's node.
602
603     """
604     bad = False
605
606     node_current = instanceconfig.primary_node
607
608     node_vol_should = {}
609     instanceconfig.MapLVsByNode(node_vol_should)
610
611     for node in node_vol_should:
612       for volume in node_vol_should[node]:
613         if node not in node_vol_is or volume not in node_vol_is[node]:
614           feedback_fn("  - ERROR: volume %s missing on node %s" %
615                           (volume, node))
616           bad = True
617
618     if not instanceconfig.status == 'down':
619       if (node_current not in node_instance or
620           not instance in node_instance[node_current]):
621         feedback_fn("  - ERROR: instance %s not running on node %s" %
622                         (instance, node_current))
623         bad = True
624
625     for node in node_instance:
626       if (not node == node_current):
627         if instance in node_instance[node]:
628           feedback_fn("  - ERROR: instance %s should not run on node %s" %
629                           (instance, node))
630           bad = True
631
632     return bad
633
634   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
635     """Verify if there are any unknown volumes in the cluster.
636
637     The .os, .swap and backup volumes are ignored. All other volumes are
638     reported as unknown.
639
640     """
641     bad = False
642
643     for node in node_vol_is:
644       for volume in node_vol_is[node]:
645         if node not in node_vol_should or volume not in node_vol_should[node]:
646           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
647                       (volume, node))
648           bad = True
649     return bad
650
651   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
652     """Verify the list of running instances.
653
654     This checks what instances are running but unknown to the cluster.
655
656     """
657     bad = False
658     for node in node_instance:
659       for runninginstance in node_instance[node]:
660         if runninginstance not in instancelist:
661           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
662                           (runninginstance, node))
663           bad = True
664     return bad
665
666   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
667     """Verify N+1 Memory Resilience.
668
669     Check that if one single node dies we can still start all the instances it
670     was primary for.
671
672     """
673     bad = False
674
675     for node, nodeinfo in node_info.iteritems():
676       # This code checks that every node which is now listed as secondary has
677       # enough memory to host all instances it is supposed to should a single
678       # other node in the cluster fail.
679       # FIXME: not ready for failover to an arbitrary node
680       # FIXME: does not support file-backed instances
681       # WARNING: we currently take into account down instances as well as up
682       # ones, considering that even if they're down someone might want to start
683       # them even in the event of a node failure.
684       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
685         needed_mem = 0
686         for instance in instances:
687           needed_mem += instance_cfg[instance].memory
688         if nodeinfo['mfree'] < needed_mem:
689           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
690                       " failovers should node %s fail" % (node, prinode))
691           bad = True
692     return bad
693
694   def CheckPrereq(self):
695     """Check prerequisites.
696
697     Transform the list of checks we're going to skip into a set and check that
698     all its members are valid.
699
700     """
701     self.skip_set = frozenset(self.op.skip_checks)
702     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
703       raise errors.OpPrereqError("Invalid checks to be skipped specified")
704
705   def BuildHooksEnv(self):
706     """Build hooks env.
707
708     Cluster-Verify hooks just rone in the post phase and their failure makes
709     the output be logged in the verify output and the verification to fail.
710
711     """
712     all_nodes = self.cfg.GetNodeList()
713     # TODO: populate the environment with useful information for verify hooks
714     env = {}
715     return env, [], all_nodes
716
717   def Exec(self, feedback_fn):
718     """Verify integrity of cluster, performing various test on nodes.
719
720     """
721     bad = False
722     feedback_fn("* Verifying global settings")
723     for msg in self.cfg.VerifyConfig():
724       feedback_fn("  - ERROR: %s" % msg)
725
726     vg_name = self.cfg.GetVGName()
727     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
728     nodelist = utils.NiceSort(self.cfg.GetNodeList())
729     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
730     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
731     i_non_redundant = [] # Non redundant instances
732     node_volume = {}
733     node_instance = {}
734     node_info = {}
735     instance_cfg = {}
736
737     # FIXME: verify OS list
738     # do local checksums
739     file_names = []
740     file_names.append(constants.SSL_CERT_FILE)
741     file_names.append(constants.CLUSTER_CONF_FILE)
742     local_checksums = utils.FingerprintFiles(file_names)
743
744     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
745     all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
746     all_instanceinfo = rpc.call_instance_list(nodelist, hypervisors)
747     all_vglist = rpc.call_vg_list(nodelist)
748     node_verify_param = {
749       'filelist': file_names,
750       'nodelist': nodelist,
751       'hypervisor': hypervisors,
752       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
753                         for node in nodeinfo]
754       }
755     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param,
756                                       self.cfg.GetClusterName())
757     all_rversion = rpc.call_version(nodelist)
758     all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName(),
759                                    self.cfg.GetHypervisorType())
760
761     for node in nodelist:
762       feedback_fn("* Verifying node %s" % node)
763       result = self._VerifyNode(node, file_names, local_checksums,
764                                 all_vglist[node], all_nvinfo[node],
765                                 all_rversion[node], feedback_fn)
766       bad = bad or result
767
768       # node_volume
769       volumeinfo = all_volumeinfo[node]
770
771       if isinstance(volumeinfo, basestring):
772         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
773                     (node, volumeinfo[-400:].encode('string_escape')))
774         bad = True
775         node_volume[node] = {}
776       elif not isinstance(volumeinfo, dict):
777         feedback_fn("  - ERROR: connection to %s failed" % (node,))
778         bad = True
779         continue
780       else:
781         node_volume[node] = volumeinfo
782
783       # node_instance
784       nodeinstance = all_instanceinfo[node]
785       if type(nodeinstance) != list:
786         feedback_fn("  - ERROR: connection to %s failed" % (node,))
787         bad = True
788         continue
789
790       node_instance[node] = nodeinstance
791
792       # node_info
793       nodeinfo = all_ninfo[node]
794       if not isinstance(nodeinfo, dict):
795         feedback_fn("  - ERROR: connection to %s failed" % (node,))
796         bad = True
797         continue
798
799       try:
800         node_info[node] = {
801           "mfree": int(nodeinfo['memory_free']),
802           "dfree": int(nodeinfo['vg_free']),
803           "pinst": [],
804           "sinst": [],
805           # dictionary holding all instances this node is secondary for,
806           # grouped by their primary node. Each key is a cluster node, and each
807           # value is a list of instances which have the key as primary and the
808           # current node as secondary.  this is handy to calculate N+1 memory
809           # availability if you can only failover from a primary to its
810           # secondary.
811           "sinst-by-pnode": {},
812         }
813       except ValueError:
814         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
815         bad = True
816         continue
817
818     node_vol_should = {}
819
820     for instance in instancelist:
821       feedback_fn("* Verifying instance %s" % instance)
822       inst_config = self.cfg.GetInstanceInfo(instance)
823       result =  self._VerifyInstance(instance, inst_config, node_volume,
824                                      node_instance, feedback_fn)
825       bad = bad or result
826
827       inst_config.MapLVsByNode(node_vol_should)
828
829       instance_cfg[instance] = inst_config
830
831       pnode = inst_config.primary_node
832       if pnode in node_info:
833         node_info[pnode]['pinst'].append(instance)
834       else:
835         feedback_fn("  - ERROR: instance %s, connection to primary node"
836                     " %s failed" % (instance, pnode))
837         bad = True
838
839       # If the instance is non-redundant we cannot survive losing its primary
840       # node, so we are not N+1 compliant. On the other hand we have no disk
841       # templates with more than one secondary so that situation is not well
842       # supported either.
843       # FIXME: does not support file-backed instances
844       if len(inst_config.secondary_nodes) == 0:
845         i_non_redundant.append(instance)
846       elif len(inst_config.secondary_nodes) > 1:
847         feedback_fn("  - WARNING: multiple secondaries for instance %s"
848                     % instance)
849
850       for snode in inst_config.secondary_nodes:
851         if snode in node_info:
852           node_info[snode]['sinst'].append(instance)
853           if pnode not in node_info[snode]['sinst-by-pnode']:
854             node_info[snode]['sinst-by-pnode'][pnode] = []
855           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
856         else:
857           feedback_fn("  - ERROR: instance %s, connection to secondary node"
858                       " %s failed" % (instance, snode))
859
860     feedback_fn("* Verifying orphan volumes")
861     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
862                                        feedback_fn)
863     bad = bad or result
864
865     feedback_fn("* Verifying remaining instances")
866     result = self._VerifyOrphanInstances(instancelist, node_instance,
867                                          feedback_fn)
868     bad = bad or result
869
870     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
871       feedback_fn("* Verifying N+1 Memory redundancy")
872       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
873       bad = bad or result
874
875     feedback_fn("* Other Notes")
876     if i_non_redundant:
877       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
878                   % len(i_non_redundant))
879
880     return not bad
881
882   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
883     """Analize the post-hooks' result, handle it, and send some
884     nicely-formatted feedback back to the user.
885
886     Args:
887       phase: the hooks phase that has just been run
888       hooks_results: the results of the multi-node hooks rpc call
889       feedback_fn: function to send feedback back to the caller
890       lu_result: previous Exec result
891
892     """
893     # We only really run POST phase hooks, and are only interested in
894     # their results
895     if phase == constants.HOOKS_PHASE_POST:
896       # Used to change hooks' output to proper indentation
897       indent_re = re.compile('^', re.M)
898       feedback_fn("* Hooks Results")
899       if not hooks_results:
900         feedback_fn("  - ERROR: general communication failure")
901         lu_result = 1
902       else:
903         for node_name in hooks_results:
904           show_node_header = True
905           res = hooks_results[node_name]
906           if res is False or not isinstance(res, list):
907             feedback_fn("    Communication failure")
908             lu_result = 1
909             continue
910           for script, hkr, output in res:
911             if hkr == constants.HKR_FAIL:
912               # The node header is only shown once, if there are
913               # failing hooks on that node
914               if show_node_header:
915                 feedback_fn("  Node %s:" % node_name)
916                 show_node_header = False
917               feedback_fn("    ERROR: Script %s failed, output:" % script)
918               output = indent_re.sub('      ', output)
919               feedback_fn("%s" % output)
920               lu_result = 1
921
922       return lu_result
923
924
925 class LUVerifyDisks(NoHooksLU):
926   """Verifies the cluster disks status.
927
928   """
929   _OP_REQP = []
930   REQ_BGL = False
931
932   def ExpandNames(self):
933     self.needed_locks = {
934       locking.LEVEL_NODE: locking.ALL_SET,
935       locking.LEVEL_INSTANCE: locking.ALL_SET,
936     }
937     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
938
939   def CheckPrereq(self):
940     """Check prerequisites.
941
942     This has no prerequisites.
943
944     """
945     pass
946
947   def Exec(self, feedback_fn):
948     """Verify integrity of cluster disks.
949
950     """
951     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
952
953     vg_name = self.cfg.GetVGName()
954     nodes = utils.NiceSort(self.cfg.GetNodeList())
955     instances = [self.cfg.GetInstanceInfo(name)
956                  for name in self.cfg.GetInstanceList()]
957
958     nv_dict = {}
959     for inst in instances:
960       inst_lvs = {}
961       if (inst.status != "up" or
962           inst.disk_template not in constants.DTS_NET_MIRROR):
963         continue
964       inst.MapLVsByNode(inst_lvs)
965       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
966       for node, vol_list in inst_lvs.iteritems():
967         for vol in vol_list:
968           nv_dict[(node, vol)] = inst
969
970     if not nv_dict:
971       return result
972
973     node_lvs = rpc.call_volume_list(nodes, vg_name)
974
975     to_act = set()
976     for node in nodes:
977       # node_volume
978       lvs = node_lvs[node]
979
980       if isinstance(lvs, basestring):
981         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
982         res_nlvm[node] = lvs
983       elif not isinstance(lvs, dict):
984         logger.Info("connection to node %s failed or invalid data returned" %
985                     (node,))
986         res_nodes.append(node)
987         continue
988
989       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
990         inst = nv_dict.pop((node, lv_name), None)
991         if (not lv_online and inst is not None
992             and inst.name not in res_instances):
993           res_instances.append(inst.name)
994
995     # any leftover items in nv_dict are missing LVs, let's arrange the
996     # data better
997     for key, inst in nv_dict.iteritems():
998       if inst.name not in res_missing:
999         res_missing[inst.name] = []
1000       res_missing[inst.name].append(key)
1001
1002     return result
1003
1004
1005 class LURenameCluster(LogicalUnit):
1006   """Rename the cluster.
1007
1008   """
1009   HPATH = "cluster-rename"
1010   HTYPE = constants.HTYPE_CLUSTER
1011   _OP_REQP = ["name"]
1012
1013   def BuildHooksEnv(self):
1014     """Build hooks env.
1015
1016     """
1017     env = {
1018       "OP_TARGET": self.cfg.GetClusterName(),
1019       "NEW_NAME": self.op.name,
1020       }
1021     mn = self.cfg.GetMasterNode()
1022     return env, [mn], [mn]
1023
1024   def CheckPrereq(self):
1025     """Verify that the passed name is a valid one.
1026
1027     """
1028     hostname = utils.HostInfo(self.op.name)
1029
1030     new_name = hostname.name
1031     self.ip = new_ip = hostname.ip
1032     old_name = self.cfg.GetClusterName()
1033     old_ip = self.cfg.GetMasterIP()
1034     if new_name == old_name and new_ip == old_ip:
1035       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1036                                  " cluster has changed")
1037     if new_ip != old_ip:
1038       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1039         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1040                                    " reachable on the network. Aborting." %
1041                                    new_ip)
1042
1043     self.op.name = new_name
1044
1045   def Exec(self, feedback_fn):
1046     """Rename the cluster.
1047
1048     """
1049     clustername = self.op.name
1050     ip = self.ip
1051
1052     # shutdown the master IP
1053     master = self.cfg.GetMasterNode()
1054     if not rpc.call_node_stop_master(master, False):
1055       raise errors.OpExecError("Could not disable the master role")
1056
1057     try:
1058       # modify the sstore
1059       # TODO: sstore
1060       ss.SetKey(ss.SS_MASTER_IP, ip)
1061       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1062
1063       # Distribute updated ss config to all nodes
1064       myself = self.cfg.GetNodeInfo(master)
1065       dist_nodes = self.cfg.GetNodeList()
1066       if myself.name in dist_nodes:
1067         dist_nodes.remove(myself.name)
1068
1069       logger.Debug("Copying updated ssconf data to all nodes")
1070       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1071         fname = ss.KeyToFilename(keyname)
1072         result = rpc.call_upload_file(dist_nodes, fname)
1073         for to_node in dist_nodes:
1074           if not result[to_node]:
1075             logger.Error("copy of file %s to node %s failed" %
1076                          (fname, to_node))
1077     finally:
1078       if not rpc.call_node_start_master(master, False):
1079         logger.Error("Could not re-enable the master role on the master,"
1080                      " please restart manually.")
1081
1082
1083 def _RecursiveCheckIfLVMBased(disk):
1084   """Check if the given disk or its children are lvm-based.
1085
1086   Args:
1087     disk: ganeti.objects.Disk object
1088
1089   Returns:
1090     boolean indicating whether a LD_LV dev_type was found or not
1091
1092   """
1093   if disk.children:
1094     for chdisk in disk.children:
1095       if _RecursiveCheckIfLVMBased(chdisk):
1096         return True
1097   return disk.dev_type == constants.LD_LV
1098
1099
1100 class LUSetClusterParams(LogicalUnit):
1101   """Change the parameters of the cluster.
1102
1103   """
1104   HPATH = "cluster-modify"
1105   HTYPE = constants.HTYPE_CLUSTER
1106   _OP_REQP = []
1107   REQ_BGL = False
1108
1109   def ExpandNames(self):
1110     # FIXME: in the future maybe other cluster params won't require checking on
1111     # all nodes to be modified.
1112     self.needed_locks = {
1113       locking.LEVEL_NODE: locking.ALL_SET,
1114     }
1115     self.share_locks[locking.LEVEL_NODE] = 1
1116
1117   def BuildHooksEnv(self):
1118     """Build hooks env.
1119
1120     """
1121     env = {
1122       "OP_TARGET": self.cfg.GetClusterName(),
1123       "NEW_VG_NAME": self.op.vg_name,
1124       }
1125     mn = self.cfg.GetMasterNode()
1126     return env, [mn], [mn]
1127
1128   def CheckPrereq(self):
1129     """Check prerequisites.
1130
1131     This checks whether the given params don't conflict and
1132     if the given volume group is valid.
1133
1134     """
1135     # FIXME: This only works because there is only one parameter that can be
1136     # changed or removed.
1137     if not self.op.vg_name:
1138       instances = self.cfg.GetAllInstancesInfo().values()
1139       for inst in instances:
1140         for disk in inst.disks:
1141           if _RecursiveCheckIfLVMBased(disk):
1142             raise errors.OpPrereqError("Cannot disable lvm storage while"
1143                                        " lvm-based instances exist")
1144
1145     # if vg_name not None, checks given volume group on all nodes
1146     if self.op.vg_name:
1147       node_list = self.acquired_locks[locking.LEVEL_NODE]
1148       vglist = rpc.call_vg_list(node_list)
1149       for node in node_list:
1150         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1151                                               constants.MIN_VG_SIZE)
1152         if vgstatus:
1153           raise errors.OpPrereqError("Error on node '%s': %s" %
1154                                      (node, vgstatus))
1155
1156   def Exec(self, feedback_fn):
1157     """Change the parameters of the cluster.
1158
1159     """
1160     if self.op.vg_name != self.cfg.GetVGName():
1161       self.cfg.SetVGName(self.op.vg_name)
1162     else:
1163       feedback_fn("Cluster LVM configuration already in desired"
1164                   " state, not changing")
1165
1166
1167 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1168   """Sleep and poll for an instance's disk to sync.
1169
1170   """
1171   if not instance.disks:
1172     return True
1173
1174   if not oneshot:
1175     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1176
1177   node = instance.primary_node
1178
1179   for dev in instance.disks:
1180     lu.cfg.SetDiskID(dev, node)
1181
1182   retries = 0
1183   while True:
1184     max_time = 0
1185     done = True
1186     cumul_degraded = False
1187     rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1188     if not rstats:
1189       lu.proc.LogWarning("Can't get any data from node %s" % node)
1190       retries += 1
1191       if retries >= 10:
1192         raise errors.RemoteError("Can't contact node %s for mirror data,"
1193                                  " aborting." % node)
1194       time.sleep(6)
1195       continue
1196     retries = 0
1197     for i in range(len(rstats)):
1198       mstat = rstats[i]
1199       if mstat is None:
1200         lu.proc.LogWarning("Can't compute data for node %s/%s" %
1201                            (node, instance.disks[i].iv_name))
1202         continue
1203       # we ignore the ldisk parameter
1204       perc_done, est_time, is_degraded, _ = mstat
1205       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1206       if perc_done is not None:
1207         done = False
1208         if est_time is not None:
1209           rem_time = "%d estimated seconds remaining" % est_time
1210           max_time = est_time
1211         else:
1212           rem_time = "no time estimate"
1213         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1214                         (instance.disks[i].iv_name, perc_done, rem_time))
1215     if done or oneshot:
1216       break
1217
1218     time.sleep(min(60, max_time))
1219
1220   if done:
1221     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1222   return not cumul_degraded
1223
1224
1225 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1226   """Check that mirrors are not degraded.
1227
1228   The ldisk parameter, if True, will change the test from the
1229   is_degraded attribute (which represents overall non-ok status for
1230   the device(s)) to the ldisk (representing the local storage status).
1231
1232   """
1233   lu.cfg.SetDiskID(dev, node)
1234   if ldisk:
1235     idx = 6
1236   else:
1237     idx = 5
1238
1239   result = True
1240   if on_primary or dev.AssembleOnSecondary():
1241     rstats = rpc.call_blockdev_find(node, dev)
1242     if not rstats:
1243       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1244       result = False
1245     else:
1246       result = result and (not rstats[idx])
1247   if dev.children:
1248     for child in dev.children:
1249       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1250
1251   return result
1252
1253
1254 class LUDiagnoseOS(NoHooksLU):
1255   """Logical unit for OS diagnose/query.
1256
1257   """
1258   _OP_REQP = ["output_fields", "names"]
1259   REQ_BGL = False
1260
1261   def ExpandNames(self):
1262     if self.op.names:
1263       raise errors.OpPrereqError("Selective OS query not supported")
1264
1265     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1266     _CheckOutputFields(static=[],
1267                        dynamic=self.dynamic_fields,
1268                        selected=self.op.output_fields)
1269
1270     # Lock all nodes, in shared mode
1271     self.needed_locks = {}
1272     self.share_locks[locking.LEVEL_NODE] = 1
1273     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1274
1275   def CheckPrereq(self):
1276     """Check prerequisites.
1277
1278     """
1279
1280   @staticmethod
1281   def _DiagnoseByOS(node_list, rlist):
1282     """Remaps a per-node return list into an a per-os per-node dictionary
1283
1284       Args:
1285         node_list: a list with the names of all nodes
1286         rlist: a map with node names as keys and OS objects as values
1287
1288       Returns:
1289         map: a map with osnames as keys and as value another map, with
1290              nodes as
1291              keys and list of OS objects as values
1292              e.g. {"debian-etch": {"node1": [<object>,...],
1293                                    "node2": [<object>,]}
1294                   }
1295
1296     """
1297     all_os = {}
1298     for node_name, nr in rlist.iteritems():
1299       if not nr:
1300         continue
1301       for os_obj in nr:
1302         if os_obj.name not in all_os:
1303           # build a list of nodes for this os containing empty lists
1304           # for each node in node_list
1305           all_os[os_obj.name] = {}
1306           for nname in node_list:
1307             all_os[os_obj.name][nname] = []
1308         all_os[os_obj.name][node_name].append(os_obj)
1309     return all_os
1310
1311   def Exec(self, feedback_fn):
1312     """Compute the list of OSes.
1313
1314     """
1315     node_list = self.acquired_locks[locking.LEVEL_NODE]
1316     node_data = rpc.call_os_diagnose(node_list)
1317     if node_data == False:
1318       raise errors.OpExecError("Can't gather the list of OSes")
1319     pol = self._DiagnoseByOS(node_list, node_data)
1320     output = []
1321     for os_name, os_data in pol.iteritems():
1322       row = []
1323       for field in self.op.output_fields:
1324         if field == "name":
1325           val = os_name
1326         elif field == "valid":
1327           val = utils.all([osl and osl[0] for osl in os_data.values()])
1328         elif field == "node_status":
1329           val = {}
1330           for node_name, nos_list in os_data.iteritems():
1331             val[node_name] = [(v.status, v.path) for v in nos_list]
1332         else:
1333           raise errors.ParameterError(field)
1334         row.append(val)
1335       output.append(row)
1336
1337     return output
1338
1339
1340 class LURemoveNode(LogicalUnit):
1341   """Logical unit for removing a node.
1342
1343   """
1344   HPATH = "node-remove"
1345   HTYPE = constants.HTYPE_NODE
1346   _OP_REQP = ["node_name"]
1347
1348   def BuildHooksEnv(self):
1349     """Build hooks env.
1350
1351     This doesn't run on the target node in the pre phase as a failed
1352     node would then be impossible to remove.
1353
1354     """
1355     env = {
1356       "OP_TARGET": self.op.node_name,
1357       "NODE_NAME": self.op.node_name,
1358       }
1359     all_nodes = self.cfg.GetNodeList()
1360     all_nodes.remove(self.op.node_name)
1361     return env, all_nodes, all_nodes
1362
1363   def CheckPrereq(self):
1364     """Check prerequisites.
1365
1366     This checks:
1367      - the node exists in the configuration
1368      - it does not have primary or secondary instances
1369      - it's not the master
1370
1371     Any errors are signalled by raising errors.OpPrereqError.
1372
1373     """
1374     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1375     if node is None:
1376       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1377
1378     instance_list = self.cfg.GetInstanceList()
1379
1380     masternode = self.cfg.GetMasterNode()
1381     if node.name == masternode:
1382       raise errors.OpPrereqError("Node is the master node,"
1383                                  " you need to failover first.")
1384
1385     for instance_name in instance_list:
1386       instance = self.cfg.GetInstanceInfo(instance_name)
1387       if node.name == instance.primary_node:
1388         raise errors.OpPrereqError("Instance %s still running on the node,"
1389                                    " please remove first." % instance_name)
1390       if node.name in instance.secondary_nodes:
1391         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1392                                    " please remove first." % instance_name)
1393     self.op.node_name = node.name
1394     self.node = node
1395
1396   def Exec(self, feedback_fn):
1397     """Removes the node from the cluster.
1398
1399     """
1400     node = self.node
1401     logger.Info("stopping the node daemon and removing configs from node %s" %
1402                 node.name)
1403
1404     self.context.RemoveNode(node.name)
1405
1406     rpc.call_node_leave_cluster(node.name)
1407
1408
1409 class LUQueryNodes(NoHooksLU):
1410   """Logical unit for querying nodes.
1411
1412   """
1413   _OP_REQP = ["output_fields", "names"]
1414   REQ_BGL = False
1415
1416   def ExpandNames(self):
1417     self.dynamic_fields = frozenset([
1418       "dtotal", "dfree",
1419       "mtotal", "mnode", "mfree",
1420       "bootid",
1421       "ctotal",
1422       ])
1423
1424     self.static_fields = frozenset([
1425       "name", "pinst_cnt", "sinst_cnt",
1426       "pinst_list", "sinst_list",
1427       "pip", "sip", "tags",
1428       "serial_no",
1429       ])
1430
1431     _CheckOutputFields(static=self.static_fields,
1432                        dynamic=self.dynamic_fields,
1433                        selected=self.op.output_fields)
1434
1435     self.needed_locks = {}
1436     self.share_locks[locking.LEVEL_NODE] = 1
1437
1438     if self.op.names:
1439       self.wanted = _GetWantedNodes(self, self.op.names)
1440     else:
1441       self.wanted = locking.ALL_SET
1442
1443     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1444     if self.do_locking:
1445       # if we don't request only static fields, we need to lock the nodes
1446       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1447
1448
1449   def CheckPrereq(self):
1450     """Check prerequisites.
1451
1452     """
1453     # The validation of the node list is done in the _GetWantedNodes,
1454     # if non empty, and if empty, there's no validation to do
1455     pass
1456
1457   def Exec(self, feedback_fn):
1458     """Computes the list of nodes and their attributes.
1459
1460     """
1461     all_info = self.cfg.GetAllNodesInfo()
1462     if self.do_locking:
1463       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1464     elif self.wanted != locking.ALL_SET:
1465       nodenames = self.wanted
1466       missing = set(nodenames).difference(all_info.keys())
1467       if missing:
1468         raise errors.OpExecError(
1469           "Some nodes were removed before retrieving their data: %s" % missing)
1470     else:
1471       nodenames = all_info.keys()
1472     nodelist = [all_info[name] for name in nodenames]
1473
1474     # begin data gathering
1475
1476     if self.dynamic_fields.intersection(self.op.output_fields):
1477       live_data = {}
1478       node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1479                                      self.cfg.GetHypervisorType())
1480       for name in nodenames:
1481         nodeinfo = node_data.get(name, None)
1482         if nodeinfo:
1483           live_data[name] = {
1484             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1485             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1486             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1487             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1488             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1489             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1490             "bootid": nodeinfo['bootid'],
1491             }
1492         else:
1493           live_data[name] = {}
1494     else:
1495       live_data = dict.fromkeys(nodenames, {})
1496
1497     node_to_primary = dict([(name, set()) for name in nodenames])
1498     node_to_secondary = dict([(name, set()) for name in nodenames])
1499
1500     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1501                              "sinst_cnt", "sinst_list"))
1502     if inst_fields & frozenset(self.op.output_fields):
1503       instancelist = self.cfg.GetInstanceList()
1504
1505       for instance_name in instancelist:
1506         inst = self.cfg.GetInstanceInfo(instance_name)
1507         if inst.primary_node in node_to_primary:
1508           node_to_primary[inst.primary_node].add(inst.name)
1509         for secnode in inst.secondary_nodes:
1510           if secnode in node_to_secondary:
1511             node_to_secondary[secnode].add(inst.name)
1512
1513     # end data gathering
1514
1515     output = []
1516     for node in nodelist:
1517       node_output = []
1518       for field in self.op.output_fields:
1519         if field == "name":
1520           val = node.name
1521         elif field == "pinst_list":
1522           val = list(node_to_primary[node.name])
1523         elif field == "sinst_list":
1524           val = list(node_to_secondary[node.name])
1525         elif field == "pinst_cnt":
1526           val = len(node_to_primary[node.name])
1527         elif field == "sinst_cnt":
1528           val = len(node_to_secondary[node.name])
1529         elif field == "pip":
1530           val = node.primary_ip
1531         elif field == "sip":
1532           val = node.secondary_ip
1533         elif field == "tags":
1534           val = list(node.GetTags())
1535         elif field == "serial_no":
1536           val = node.serial_no
1537         elif field in self.dynamic_fields:
1538           val = live_data[node.name].get(field, None)
1539         else:
1540           raise errors.ParameterError(field)
1541         node_output.append(val)
1542       output.append(node_output)
1543
1544     return output
1545
1546
1547 class LUQueryNodeVolumes(NoHooksLU):
1548   """Logical unit for getting volumes on node(s).
1549
1550   """
1551   _OP_REQP = ["nodes", "output_fields"]
1552   REQ_BGL = False
1553
1554   def ExpandNames(self):
1555     _CheckOutputFields(static=["node"],
1556                        dynamic=["phys", "vg", "name", "size", "instance"],
1557                        selected=self.op.output_fields)
1558
1559     self.needed_locks = {}
1560     self.share_locks[locking.LEVEL_NODE] = 1
1561     if not self.op.nodes:
1562       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1563     else:
1564       self.needed_locks[locking.LEVEL_NODE] = \
1565         _GetWantedNodes(self, self.op.nodes)
1566
1567   def CheckPrereq(self):
1568     """Check prerequisites.
1569
1570     This checks that the fields required are valid output fields.
1571
1572     """
1573     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1574
1575   def Exec(self, feedback_fn):
1576     """Computes the list of nodes and their attributes.
1577
1578     """
1579     nodenames = self.nodes
1580     volumes = rpc.call_node_volumes(nodenames)
1581
1582     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1583              in self.cfg.GetInstanceList()]
1584
1585     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1586
1587     output = []
1588     for node in nodenames:
1589       if node not in volumes or not volumes[node]:
1590         continue
1591
1592       node_vols = volumes[node][:]
1593       node_vols.sort(key=lambda vol: vol['dev'])
1594
1595       for vol in node_vols:
1596         node_output = []
1597         for field in self.op.output_fields:
1598           if field == "node":
1599             val = node
1600           elif field == "phys":
1601             val = vol['dev']
1602           elif field == "vg":
1603             val = vol['vg']
1604           elif field == "name":
1605             val = vol['name']
1606           elif field == "size":
1607             val = int(float(vol['size']))
1608           elif field == "instance":
1609             for inst in ilist:
1610               if node not in lv_by_node[inst]:
1611                 continue
1612               if vol['name'] in lv_by_node[inst][node]:
1613                 val = inst.name
1614                 break
1615             else:
1616               val = '-'
1617           else:
1618             raise errors.ParameterError(field)
1619           node_output.append(str(val))
1620
1621         output.append(node_output)
1622
1623     return output
1624
1625
1626 class LUAddNode(LogicalUnit):
1627   """Logical unit for adding node to the cluster.
1628
1629   """
1630   HPATH = "node-add"
1631   HTYPE = constants.HTYPE_NODE
1632   _OP_REQP = ["node_name"]
1633
1634   def BuildHooksEnv(self):
1635     """Build hooks env.
1636
1637     This will run on all nodes before, and on all nodes + the new node after.
1638
1639     """
1640     env = {
1641       "OP_TARGET": self.op.node_name,
1642       "NODE_NAME": self.op.node_name,
1643       "NODE_PIP": self.op.primary_ip,
1644       "NODE_SIP": self.op.secondary_ip,
1645       }
1646     nodes_0 = self.cfg.GetNodeList()
1647     nodes_1 = nodes_0 + [self.op.node_name, ]
1648     return env, nodes_0, nodes_1
1649
1650   def CheckPrereq(self):
1651     """Check prerequisites.
1652
1653     This checks:
1654      - the new node is not already in the config
1655      - it is resolvable
1656      - its parameters (single/dual homed) matches the cluster
1657
1658     Any errors are signalled by raising errors.OpPrereqError.
1659
1660     """
1661     node_name = self.op.node_name
1662     cfg = self.cfg
1663
1664     dns_data = utils.HostInfo(node_name)
1665
1666     node = dns_data.name
1667     primary_ip = self.op.primary_ip = dns_data.ip
1668     secondary_ip = getattr(self.op, "secondary_ip", None)
1669     if secondary_ip is None:
1670       secondary_ip = primary_ip
1671     if not utils.IsValidIP(secondary_ip):
1672       raise errors.OpPrereqError("Invalid secondary IP given")
1673     self.op.secondary_ip = secondary_ip
1674
1675     node_list = cfg.GetNodeList()
1676     if not self.op.readd and node in node_list:
1677       raise errors.OpPrereqError("Node %s is already in the configuration" %
1678                                  node)
1679     elif self.op.readd and node not in node_list:
1680       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1681
1682     for existing_node_name in node_list:
1683       existing_node = cfg.GetNodeInfo(existing_node_name)
1684
1685       if self.op.readd and node == existing_node_name:
1686         if (existing_node.primary_ip != primary_ip or
1687             existing_node.secondary_ip != secondary_ip):
1688           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1689                                      " address configuration as before")
1690         continue
1691
1692       if (existing_node.primary_ip == primary_ip or
1693           existing_node.secondary_ip == primary_ip or
1694           existing_node.primary_ip == secondary_ip or
1695           existing_node.secondary_ip == secondary_ip):
1696         raise errors.OpPrereqError("New node ip address(es) conflict with"
1697                                    " existing node %s" % existing_node.name)
1698
1699     # check that the type of the node (single versus dual homed) is the
1700     # same as for the master
1701     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1702     master_singlehomed = myself.secondary_ip == myself.primary_ip
1703     newbie_singlehomed = secondary_ip == primary_ip
1704     if master_singlehomed != newbie_singlehomed:
1705       if master_singlehomed:
1706         raise errors.OpPrereqError("The master has no private ip but the"
1707                                    " new node has one")
1708       else:
1709         raise errors.OpPrereqError("The master has a private ip but the"
1710                                    " new node doesn't have one")
1711
1712     # checks reachablity
1713     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1714       raise errors.OpPrereqError("Node not reachable by ping")
1715
1716     if not newbie_singlehomed:
1717       # check reachability from my secondary ip to newbie's secondary ip
1718       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1719                            source=myself.secondary_ip):
1720         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1721                                    " based ping to noded port")
1722
1723     self.new_node = objects.Node(name=node,
1724                                  primary_ip=primary_ip,
1725                                  secondary_ip=secondary_ip)
1726
1727   def Exec(self, feedback_fn):
1728     """Adds the new node to the cluster.
1729
1730     """
1731     new_node = self.new_node
1732     node = new_node.name
1733
1734     # check connectivity
1735     result = rpc.call_version([node])[node]
1736     if result:
1737       if constants.PROTOCOL_VERSION == result:
1738         logger.Info("communication to node %s fine, sw version %s match" %
1739                     (node, result))
1740       else:
1741         raise errors.OpExecError("Version mismatch master version %s,"
1742                                  " node version %s" %
1743                                  (constants.PROTOCOL_VERSION, result))
1744     else:
1745       raise errors.OpExecError("Cannot get version from the new node")
1746
1747     # setup ssh on node
1748     logger.Info("copy ssh key to node %s" % node)
1749     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1750     keyarray = []
1751     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1752                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1753                 priv_key, pub_key]
1754
1755     for i in keyfiles:
1756       f = open(i, 'r')
1757       try:
1758         keyarray.append(f.read())
1759       finally:
1760         f.close()
1761
1762     result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1763                                keyarray[3], keyarray[4], keyarray[5])
1764
1765     if not result:
1766       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1767
1768     # Add node to our /etc/hosts, and add key to known_hosts
1769     utils.AddHostToEtcHosts(new_node.name)
1770
1771     if new_node.secondary_ip != new_node.primary_ip:
1772       if not rpc.call_node_tcp_ping(new_node.name,
1773                                     constants.LOCALHOST_IP_ADDRESS,
1774                                     new_node.secondary_ip,
1775                                     constants.DEFAULT_NODED_PORT,
1776                                     10, False):
1777         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1778                                  " you gave (%s). Please fix and re-run this"
1779                                  " command." % new_node.secondary_ip)
1780
1781     node_verify_list = [self.cfg.GetMasterNode()]
1782     node_verify_param = {
1783       'nodelist': [node],
1784       # TODO: do a node-net-test as well?
1785     }
1786
1787     result = rpc.call_node_verify(node_verify_list, node_verify_param,
1788                                   self.cfg.GetClusterName())
1789     for verifier in node_verify_list:
1790       if not result[verifier]:
1791         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1792                                  " for remote verification" % verifier)
1793       if result[verifier]['nodelist']:
1794         for failed in result[verifier]['nodelist']:
1795           feedback_fn("ssh/hostname verification failed %s -> %s" %
1796                       (verifier, result[verifier]['nodelist'][failed]))
1797         raise errors.OpExecError("ssh/hostname verification failed.")
1798
1799     # Distribute updated /etc/hosts and known_hosts to all nodes,
1800     # including the node just added
1801     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1802     dist_nodes = self.cfg.GetNodeList()
1803     if not self.op.readd:
1804       dist_nodes.append(node)
1805     if myself.name in dist_nodes:
1806       dist_nodes.remove(myself.name)
1807
1808     logger.Debug("Copying hosts and known_hosts to all nodes")
1809     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1810       result = rpc.call_upload_file(dist_nodes, fname)
1811       for to_node in dist_nodes:
1812         if not result[to_node]:
1813           logger.Error("copy of file %s to node %s failed" %
1814                        (fname, to_node))
1815
1816     to_copy = []
1817     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1818       to_copy.append(constants.VNC_PASSWORD_FILE)
1819     for fname in to_copy:
1820       result = rpc.call_upload_file([node], fname)
1821       if not result[node]:
1822         logger.Error("could not copy file %s to node %s" % (fname, node))
1823
1824     if self.op.readd:
1825       self.context.ReaddNode(new_node)
1826     else:
1827       self.context.AddNode(new_node)
1828
1829
1830 class LUQueryClusterInfo(NoHooksLU):
1831   """Query cluster configuration.
1832
1833   """
1834   _OP_REQP = []
1835   REQ_MASTER = False
1836   REQ_BGL = False
1837
1838   def ExpandNames(self):
1839     self.needed_locks = {}
1840
1841   def CheckPrereq(self):
1842     """No prerequsites needed for this LU.
1843
1844     """
1845     pass
1846
1847   def Exec(self, feedback_fn):
1848     """Return cluster config.
1849
1850     """
1851     result = {
1852       "name": self.cfg.GetClusterName(),
1853       "software_version": constants.RELEASE_VERSION,
1854       "protocol_version": constants.PROTOCOL_VERSION,
1855       "config_version": constants.CONFIG_VERSION,
1856       "os_api_version": constants.OS_API_VERSION,
1857       "export_version": constants.EXPORT_VERSION,
1858       "master": self.cfg.GetMasterNode(),
1859       "architecture": (platform.architecture()[0], platform.machine()),
1860       "hypervisor_type": self.cfg.GetHypervisorType(),
1861       "enabled_hypervisors": self.cfg.GetClusterInfo().enabled_hypervisors,
1862       }
1863
1864     return result
1865
1866
1867 class LUQueryConfigValues(NoHooksLU):
1868   """Return configuration values.
1869
1870   """
1871   _OP_REQP = []
1872   REQ_BGL = False
1873
1874   def ExpandNames(self):
1875     self.needed_locks = {}
1876
1877     static_fields = ["cluster_name", "master_node"]
1878     _CheckOutputFields(static=static_fields,
1879                        dynamic=[],
1880                        selected=self.op.output_fields)
1881
1882   def CheckPrereq(self):
1883     """No prerequisites.
1884
1885     """
1886     pass
1887
1888   def Exec(self, feedback_fn):
1889     """Dump a representation of the cluster config to the standard output.
1890
1891     """
1892     values = []
1893     for field in self.op.output_fields:
1894       if field == "cluster_name":
1895         values.append(self.cfg.GetClusterName())
1896       elif field == "master_node":
1897         values.append(self.cfg.GetMasterNode())
1898       else:
1899         raise errors.ParameterError(field)
1900     return values
1901
1902
1903 class LUActivateInstanceDisks(NoHooksLU):
1904   """Bring up an instance's disks.
1905
1906   """
1907   _OP_REQP = ["instance_name"]
1908   REQ_BGL = False
1909
1910   def ExpandNames(self):
1911     self._ExpandAndLockInstance()
1912     self.needed_locks[locking.LEVEL_NODE] = []
1913     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1914
1915   def DeclareLocks(self, level):
1916     if level == locking.LEVEL_NODE:
1917       self._LockInstancesNodes()
1918
1919   def CheckPrereq(self):
1920     """Check prerequisites.
1921
1922     This checks that the instance is in the cluster.
1923
1924     """
1925     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1926     assert self.instance is not None, \
1927       "Cannot retrieve locked instance %s" % self.op.instance_name
1928
1929   def Exec(self, feedback_fn):
1930     """Activate the disks.
1931
1932     """
1933     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
1934     if not disks_ok:
1935       raise errors.OpExecError("Cannot activate block devices")
1936
1937     return disks_info
1938
1939
1940 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
1941   """Prepare the block devices for an instance.
1942
1943   This sets up the block devices on all nodes.
1944
1945   Args:
1946     instance: a ganeti.objects.Instance object
1947     ignore_secondaries: if true, errors on secondary nodes won't result
1948                         in an error return from the function
1949
1950   Returns:
1951     false if the operation failed
1952     list of (host, instance_visible_name, node_visible_name) if the operation
1953          suceeded with the mapping from node devices to instance devices
1954   """
1955   device_info = []
1956   disks_ok = True
1957   iname = instance.name
1958   # With the two passes mechanism we try to reduce the window of
1959   # opportunity for the race condition of switching DRBD to primary
1960   # before handshaking occured, but we do not eliminate it
1961
1962   # The proper fix would be to wait (with some limits) until the
1963   # connection has been made and drbd transitions from WFConnection
1964   # into any other network-connected state (Connected, SyncTarget,
1965   # SyncSource, etc.)
1966
1967   # 1st pass, assemble on all nodes in secondary mode
1968   for inst_disk in instance.disks:
1969     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1970       lu.cfg.SetDiskID(node_disk, node)
1971       result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1972       if not result:
1973         logger.Error("could not prepare block device %s on node %s"
1974                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1975         if not ignore_secondaries:
1976           disks_ok = False
1977
1978   # FIXME: race condition on drbd migration to primary
1979
1980   # 2nd pass, do only the primary node
1981   for inst_disk in instance.disks:
1982     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1983       if node != instance.primary_node:
1984         continue
1985       lu.cfg.SetDiskID(node_disk, node)
1986       result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1987       if not result:
1988         logger.Error("could not prepare block device %s on node %s"
1989                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1990         disks_ok = False
1991     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1992
1993   # leave the disks configured for the primary node
1994   # this is a workaround that would be fixed better by
1995   # improving the logical/physical id handling
1996   for disk in instance.disks:
1997     lu.cfg.SetDiskID(disk, instance.primary_node)
1998
1999   return disks_ok, device_info
2000
2001
2002 def _StartInstanceDisks(lu, instance, force):
2003   """Start the disks of an instance.
2004
2005   """
2006   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2007                                            ignore_secondaries=force)
2008   if not disks_ok:
2009     _ShutdownInstanceDisks(lu, instance)
2010     if force is not None and not force:
2011       logger.Error("If the message above refers to a secondary node,"
2012                    " you can retry the operation using '--force'.")
2013     raise errors.OpExecError("Disk consistency error")
2014
2015
2016 class LUDeactivateInstanceDisks(NoHooksLU):
2017   """Shutdown an instance's disks.
2018
2019   """
2020   _OP_REQP = ["instance_name"]
2021   REQ_BGL = False
2022
2023   def ExpandNames(self):
2024     self._ExpandAndLockInstance()
2025     self.needed_locks[locking.LEVEL_NODE] = []
2026     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2027
2028   def DeclareLocks(self, level):
2029     if level == locking.LEVEL_NODE:
2030       self._LockInstancesNodes()
2031
2032   def CheckPrereq(self):
2033     """Check prerequisites.
2034
2035     This checks that the instance is in the cluster.
2036
2037     """
2038     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2039     assert self.instance is not None, \
2040       "Cannot retrieve locked instance %s" % self.op.instance_name
2041
2042   def Exec(self, feedback_fn):
2043     """Deactivate the disks
2044
2045     """
2046     instance = self.instance
2047     _SafeShutdownInstanceDisks(self, instance)
2048
2049
2050 def _SafeShutdownInstanceDisks(lu, instance):
2051   """Shutdown block devices of an instance.
2052
2053   This function checks if an instance is running, before calling
2054   _ShutdownInstanceDisks.
2055
2056   """
2057   ins_l = rpc.call_instance_list([instance.primary_node],
2058                                  [instance.hypervisor])
2059   ins_l = ins_l[instance.primary_node]
2060   if not type(ins_l) is list:
2061     raise errors.OpExecError("Can't contact node '%s'" %
2062                              instance.primary_node)
2063
2064   if instance.name in ins_l:
2065     raise errors.OpExecError("Instance is running, can't shutdown"
2066                              " block devices.")
2067
2068   _ShutdownInstanceDisks(lu, instance)
2069
2070
2071 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2072   """Shutdown block devices of an instance.
2073
2074   This does the shutdown on all nodes of the instance.
2075
2076   If the ignore_primary is false, errors on the primary node are
2077   ignored.
2078
2079   """
2080   result = True
2081   for disk in instance.disks:
2082     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2083       lu.cfg.SetDiskID(top_disk, node)
2084       if not rpc.call_blockdev_shutdown(node, top_disk):
2085         logger.Error("could not shutdown block device %s on node %s" %
2086                      (disk.iv_name, node))
2087         if not ignore_primary or node != instance.primary_node:
2088           result = False
2089   return result
2090
2091
2092 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2093   """Checks if a node has enough free memory.
2094
2095   This function check if a given node has the needed amount of free
2096   memory. In case the node has less memory or we cannot get the
2097   information from the node, this function raise an OpPrereqError
2098   exception.
2099
2100   @type lu: C{LogicalUnit}
2101   @param lu: a logical unit from which we get configuration data
2102   @type node: C{str}
2103   @param node: the node to check
2104   @type reason: C{str}
2105   @param reason: string to use in the error message
2106   @type requested: C{int}
2107   @param requested: the amount of memory in MiB to check for
2108   @type hypervisor: C{str}
2109   @param hypervisor: the hypervisor to ask for memory stats
2110   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2111       we cannot check the node
2112
2113   """
2114   nodeinfo = rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2115   if not nodeinfo or not isinstance(nodeinfo, dict):
2116     raise errors.OpPrereqError("Could not contact node %s for resource"
2117                              " information" % (node,))
2118
2119   free_mem = nodeinfo[node].get('memory_free')
2120   if not isinstance(free_mem, int):
2121     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2122                              " was '%s'" % (node, free_mem))
2123   if requested > free_mem:
2124     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2125                              " needed %s MiB, available %s MiB" %
2126                              (node, reason, requested, free_mem))
2127
2128
2129 class LUStartupInstance(LogicalUnit):
2130   """Starts an instance.
2131
2132   """
2133   HPATH = "instance-start"
2134   HTYPE = constants.HTYPE_INSTANCE
2135   _OP_REQP = ["instance_name", "force"]
2136   REQ_BGL = False
2137
2138   def ExpandNames(self):
2139     self._ExpandAndLockInstance()
2140     self.needed_locks[locking.LEVEL_NODE] = []
2141     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2142
2143   def DeclareLocks(self, level):
2144     if level == locking.LEVEL_NODE:
2145       self._LockInstancesNodes()
2146
2147   def BuildHooksEnv(self):
2148     """Build hooks env.
2149
2150     This runs on master, primary and secondary nodes of the instance.
2151
2152     """
2153     env = {
2154       "FORCE": self.op.force,
2155       }
2156     env.update(_BuildInstanceHookEnvByObject(self.instance))
2157     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2158           list(self.instance.secondary_nodes))
2159     return env, nl, nl
2160
2161   def CheckPrereq(self):
2162     """Check prerequisites.
2163
2164     This checks that the instance is in the cluster.
2165
2166     """
2167     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2168     assert self.instance is not None, \
2169       "Cannot retrieve locked instance %s" % self.op.instance_name
2170
2171     # check bridges existance
2172     _CheckInstanceBridgesExist(self, instance)
2173
2174     _CheckNodeFreeMemory(self, instance.primary_node,
2175                          "starting instance %s" % instance.name,
2176                          instance.memory, instance.hypervisor)
2177
2178   def Exec(self, feedback_fn):
2179     """Start the instance.
2180
2181     """
2182     instance = self.instance
2183     force = self.op.force
2184     extra_args = getattr(self.op, "extra_args", "")
2185
2186     self.cfg.MarkInstanceUp(instance.name)
2187
2188     node_current = instance.primary_node
2189
2190     _StartInstanceDisks(self, instance, force)
2191
2192     if not rpc.call_instance_start(node_current, instance, extra_args):
2193       _ShutdownInstanceDisks(self, instance)
2194       raise errors.OpExecError("Could not start instance")
2195
2196
2197 class LURebootInstance(LogicalUnit):
2198   """Reboot an instance.
2199
2200   """
2201   HPATH = "instance-reboot"
2202   HTYPE = constants.HTYPE_INSTANCE
2203   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2204   REQ_BGL = False
2205
2206   def ExpandNames(self):
2207     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2208                                    constants.INSTANCE_REBOOT_HARD,
2209                                    constants.INSTANCE_REBOOT_FULL]:
2210       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2211                                   (constants.INSTANCE_REBOOT_SOFT,
2212                                    constants.INSTANCE_REBOOT_HARD,
2213                                    constants.INSTANCE_REBOOT_FULL))
2214     self._ExpandAndLockInstance()
2215     self.needed_locks[locking.LEVEL_NODE] = []
2216     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2217
2218   def DeclareLocks(self, level):
2219     if level == locking.LEVEL_NODE:
2220       primary_only = not constants.INSTANCE_REBOOT_FULL
2221       self._LockInstancesNodes(primary_only=primary_only)
2222
2223   def BuildHooksEnv(self):
2224     """Build hooks env.
2225
2226     This runs on master, primary and secondary nodes of the instance.
2227
2228     """
2229     env = {
2230       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2231       }
2232     env.update(_BuildInstanceHookEnvByObject(self.instance))
2233     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2234           list(self.instance.secondary_nodes))
2235     return env, nl, nl
2236
2237   def CheckPrereq(self):
2238     """Check prerequisites.
2239
2240     This checks that the instance is in the cluster.
2241
2242     """
2243     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2244     assert self.instance is not None, \
2245       "Cannot retrieve locked instance %s" % self.op.instance_name
2246
2247     # check bridges existance
2248     _CheckInstanceBridgesExist(self, instance)
2249
2250   def Exec(self, feedback_fn):
2251     """Reboot the instance.
2252
2253     """
2254     instance = self.instance
2255     ignore_secondaries = self.op.ignore_secondaries
2256     reboot_type = self.op.reboot_type
2257     extra_args = getattr(self.op, "extra_args", "")
2258
2259     node_current = instance.primary_node
2260
2261     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2262                        constants.INSTANCE_REBOOT_HARD]:
2263       if not rpc.call_instance_reboot(node_current, instance,
2264                                       reboot_type, extra_args):
2265         raise errors.OpExecError("Could not reboot instance")
2266     else:
2267       if not rpc.call_instance_shutdown(node_current, instance):
2268         raise errors.OpExecError("could not shutdown instance for full reboot")
2269       _ShutdownInstanceDisks(self, instance)
2270       _StartInstanceDisks(self, instance, ignore_secondaries)
2271       if not rpc.call_instance_start(node_current, instance, extra_args):
2272         _ShutdownInstanceDisks(self, instance)
2273         raise errors.OpExecError("Could not start instance for full reboot")
2274
2275     self.cfg.MarkInstanceUp(instance.name)
2276
2277
2278 class LUShutdownInstance(LogicalUnit):
2279   """Shutdown an instance.
2280
2281   """
2282   HPATH = "instance-stop"
2283   HTYPE = constants.HTYPE_INSTANCE
2284   _OP_REQP = ["instance_name"]
2285   REQ_BGL = False
2286
2287   def ExpandNames(self):
2288     self._ExpandAndLockInstance()
2289     self.needed_locks[locking.LEVEL_NODE] = []
2290     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2291
2292   def DeclareLocks(self, level):
2293     if level == locking.LEVEL_NODE:
2294       self._LockInstancesNodes()
2295
2296   def BuildHooksEnv(self):
2297     """Build hooks env.
2298
2299     This runs on master, primary and secondary nodes of the instance.
2300
2301     """
2302     env = _BuildInstanceHookEnvByObject(self.instance)
2303     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2304           list(self.instance.secondary_nodes))
2305     return env, nl, nl
2306
2307   def CheckPrereq(self):
2308     """Check prerequisites.
2309
2310     This checks that the instance is in the cluster.
2311
2312     """
2313     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2314     assert self.instance is not None, \
2315       "Cannot retrieve locked instance %s" % self.op.instance_name
2316
2317   def Exec(self, feedback_fn):
2318     """Shutdown the instance.
2319
2320     """
2321     instance = self.instance
2322     node_current = instance.primary_node
2323     self.cfg.MarkInstanceDown(instance.name)
2324     if not rpc.call_instance_shutdown(node_current, instance):
2325       logger.Error("could not shutdown instance")
2326
2327     _ShutdownInstanceDisks(self, instance)
2328
2329
2330 class LUReinstallInstance(LogicalUnit):
2331   """Reinstall an instance.
2332
2333   """
2334   HPATH = "instance-reinstall"
2335   HTYPE = constants.HTYPE_INSTANCE
2336   _OP_REQP = ["instance_name"]
2337   REQ_BGL = False
2338
2339   def ExpandNames(self):
2340     self._ExpandAndLockInstance()
2341     self.needed_locks[locking.LEVEL_NODE] = []
2342     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2343
2344   def DeclareLocks(self, level):
2345     if level == locking.LEVEL_NODE:
2346       self._LockInstancesNodes()
2347
2348   def BuildHooksEnv(self):
2349     """Build hooks env.
2350
2351     This runs on master, primary and secondary nodes of the instance.
2352
2353     """
2354     env = _BuildInstanceHookEnvByObject(self.instance)
2355     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2356           list(self.instance.secondary_nodes))
2357     return env, nl, nl
2358
2359   def CheckPrereq(self):
2360     """Check prerequisites.
2361
2362     This checks that the instance is in the cluster and is not running.
2363
2364     """
2365     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2366     assert instance is not None, \
2367       "Cannot retrieve locked instance %s" % self.op.instance_name
2368
2369     if instance.disk_template == constants.DT_DISKLESS:
2370       raise errors.OpPrereqError("Instance '%s' has no disks" %
2371                                  self.op.instance_name)
2372     if instance.status != "down":
2373       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2374                                  self.op.instance_name)
2375     remote_info = rpc.call_instance_info(instance.primary_node, instance.name,
2376                                          instance.hypervisor)
2377     if remote_info:
2378       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2379                                  (self.op.instance_name,
2380                                   instance.primary_node))
2381
2382     self.op.os_type = getattr(self.op, "os_type", None)
2383     if self.op.os_type is not None:
2384       # OS verification
2385       pnode = self.cfg.GetNodeInfo(
2386         self.cfg.ExpandNodeName(instance.primary_node))
2387       if pnode is None:
2388         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2389                                    self.op.pnode)
2390       os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2391       if not os_obj:
2392         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2393                                    " primary node"  % self.op.os_type)
2394
2395     self.instance = instance
2396
2397   def Exec(self, feedback_fn):
2398     """Reinstall the instance.
2399
2400     """
2401     inst = self.instance
2402
2403     if self.op.os_type is not None:
2404       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2405       inst.os = self.op.os_type
2406       self.cfg.Update(inst)
2407
2408     _StartInstanceDisks(self, inst, None)
2409     try:
2410       feedback_fn("Running the instance OS create scripts...")
2411       if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2412         raise errors.OpExecError("Could not install OS for instance %s"
2413                                  " on node %s" %
2414                                  (inst.name, inst.primary_node))
2415     finally:
2416       _ShutdownInstanceDisks(self, inst)
2417
2418
2419 class LURenameInstance(LogicalUnit):
2420   """Rename an instance.
2421
2422   """
2423   HPATH = "instance-rename"
2424   HTYPE = constants.HTYPE_INSTANCE
2425   _OP_REQP = ["instance_name", "new_name"]
2426
2427   def BuildHooksEnv(self):
2428     """Build hooks env.
2429
2430     This runs on master, primary and secondary nodes of the instance.
2431
2432     """
2433     env = _BuildInstanceHookEnvByObject(self.instance)
2434     env["INSTANCE_NEW_NAME"] = self.op.new_name
2435     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2436           list(self.instance.secondary_nodes))
2437     return env, nl, nl
2438
2439   def CheckPrereq(self):
2440     """Check prerequisites.
2441
2442     This checks that the instance is in the cluster and is not running.
2443
2444     """
2445     instance = self.cfg.GetInstanceInfo(
2446       self.cfg.ExpandInstanceName(self.op.instance_name))
2447     if instance is None:
2448       raise errors.OpPrereqError("Instance '%s' not known" %
2449                                  self.op.instance_name)
2450     if instance.status != "down":
2451       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2452                                  self.op.instance_name)
2453     remote_info = rpc.call_instance_info(instance.primary_node, instance.name,
2454                                          instance.hypervisor)
2455     if remote_info:
2456       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2457                                  (self.op.instance_name,
2458                                   instance.primary_node))
2459     self.instance = instance
2460
2461     # new name verification
2462     name_info = utils.HostInfo(self.op.new_name)
2463
2464     self.op.new_name = new_name = name_info.name
2465     instance_list = self.cfg.GetInstanceList()
2466     if new_name in instance_list:
2467       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2468                                  new_name)
2469
2470     if not getattr(self.op, "ignore_ip", False):
2471       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2472         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2473                                    (name_info.ip, new_name))
2474
2475
2476   def Exec(self, feedback_fn):
2477     """Reinstall the instance.
2478
2479     """
2480     inst = self.instance
2481     old_name = inst.name
2482
2483     if inst.disk_template == constants.DT_FILE:
2484       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2485
2486     self.cfg.RenameInstance(inst.name, self.op.new_name)
2487     # Change the instance lock. This is definitely safe while we hold the BGL
2488     self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2489     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2490
2491     # re-read the instance from the configuration after rename
2492     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2493
2494     if inst.disk_template == constants.DT_FILE:
2495       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2496       result = rpc.call_file_storage_dir_rename(inst.primary_node,
2497                                                 old_file_storage_dir,
2498                                                 new_file_storage_dir)
2499
2500       if not result:
2501         raise errors.OpExecError("Could not connect to node '%s' to rename"
2502                                  " directory '%s' to '%s' (but the instance"
2503                                  " has been renamed in Ganeti)" % (
2504                                  inst.primary_node, old_file_storage_dir,
2505                                  new_file_storage_dir))
2506
2507       if not result[0]:
2508         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2509                                  " (but the instance has been renamed in"
2510                                  " Ganeti)" % (old_file_storage_dir,
2511                                                new_file_storage_dir))
2512
2513     _StartInstanceDisks(self, inst, None)
2514     try:
2515       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2516                                           "sda", "sdb"):
2517         msg = ("Could not run OS rename script for instance %s on node %s"
2518                " (but the instance has been renamed in Ganeti)" %
2519                (inst.name, inst.primary_node))
2520         logger.Error(msg)
2521     finally:
2522       _ShutdownInstanceDisks(self, inst)
2523
2524
2525 class LURemoveInstance(LogicalUnit):
2526   """Remove an instance.
2527
2528   """
2529   HPATH = "instance-remove"
2530   HTYPE = constants.HTYPE_INSTANCE
2531   _OP_REQP = ["instance_name", "ignore_failures"]
2532   REQ_BGL = False
2533
2534   def ExpandNames(self):
2535     self._ExpandAndLockInstance()
2536     self.needed_locks[locking.LEVEL_NODE] = []
2537     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2538
2539   def DeclareLocks(self, level):
2540     if level == locking.LEVEL_NODE:
2541       self._LockInstancesNodes()
2542
2543   def BuildHooksEnv(self):
2544     """Build hooks env.
2545
2546     This runs on master, primary and secondary nodes of the instance.
2547
2548     """
2549     env = _BuildInstanceHookEnvByObject(self.instance)
2550     nl = [self.cfg.GetMasterNode()]
2551     return env, nl, nl
2552
2553   def CheckPrereq(self):
2554     """Check prerequisites.
2555
2556     This checks that the instance is in the cluster.
2557
2558     """
2559     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2560     assert self.instance is not None, \
2561       "Cannot retrieve locked instance %s" % self.op.instance_name
2562
2563   def Exec(self, feedback_fn):
2564     """Remove the instance.
2565
2566     """
2567     instance = self.instance
2568     logger.Info("shutting down instance %s on node %s" %
2569                 (instance.name, instance.primary_node))
2570
2571     if not rpc.call_instance_shutdown(instance.primary_node, instance):
2572       if self.op.ignore_failures:
2573         feedback_fn("Warning: can't shutdown instance")
2574       else:
2575         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2576                                  (instance.name, instance.primary_node))
2577
2578     logger.Info("removing block devices for instance %s" % instance.name)
2579
2580     if not _RemoveDisks(self, instance):
2581       if self.op.ignore_failures:
2582         feedback_fn("Warning: can't remove instance's disks")
2583       else:
2584         raise errors.OpExecError("Can't remove instance's disks")
2585
2586     logger.Info("removing instance %s out of cluster config" % instance.name)
2587
2588     self.cfg.RemoveInstance(instance.name)
2589     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2590
2591
2592 class LUQueryInstances(NoHooksLU):
2593   """Logical unit for querying instances.
2594
2595   """
2596   _OP_REQP = ["output_fields", "names"]
2597   REQ_BGL = False
2598
2599   def ExpandNames(self):
2600     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2601     self.static_fields = frozenset([
2602       "name", "os", "pnode", "snodes",
2603       "admin_state", "admin_ram",
2604       "disk_template", "ip", "mac", "bridge",
2605       "sda_size", "sdb_size", "vcpus", "tags",
2606       "network_port", "kernel_path", "initrd_path",
2607       "hvm_boot_order", "hvm_acpi", "hvm_pae",
2608       "hvm_cdrom_image_path", "hvm_nic_type",
2609       "hvm_disk_type", "vnc_bind_address",
2610       "serial_no", "hypervisor",
2611       ])
2612     _CheckOutputFields(static=self.static_fields,
2613                        dynamic=self.dynamic_fields,
2614                        selected=self.op.output_fields)
2615
2616     self.needed_locks = {}
2617     self.share_locks[locking.LEVEL_INSTANCE] = 1
2618     self.share_locks[locking.LEVEL_NODE] = 1
2619
2620     if self.op.names:
2621       self.wanted = _GetWantedInstances(self, self.op.names)
2622     else:
2623       self.wanted = locking.ALL_SET
2624
2625     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2626     if self.do_locking:
2627       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2628       self.needed_locks[locking.LEVEL_NODE] = []
2629       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2630
2631   def DeclareLocks(self, level):
2632     if level == locking.LEVEL_NODE and self.do_locking:
2633       self._LockInstancesNodes()
2634
2635   def CheckPrereq(self):
2636     """Check prerequisites.
2637
2638     """
2639     pass
2640
2641   def Exec(self, feedback_fn):
2642     """Computes the list of nodes and their attributes.
2643
2644     """
2645     all_info = self.cfg.GetAllInstancesInfo()
2646     if self.do_locking:
2647       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2648     elif self.wanted != locking.ALL_SET:
2649       instance_names = self.wanted
2650       missing = set(instance_names).difference(all_info.keys())
2651       if missing:
2652         raise errors.OpExecError(
2653           "Some instances were removed before retrieving their data: %s"
2654           % missing)
2655     else:
2656       instance_names = all_info.keys()
2657     instance_list = [all_info[iname] for iname in instance_names]
2658
2659     # begin data gathering
2660
2661     nodes = frozenset([inst.primary_node for inst in instance_list])
2662     hv_list = list(set([inst.hypervisor for inst in instance_list]))
2663
2664     bad_nodes = []
2665     if self.dynamic_fields.intersection(self.op.output_fields):
2666       live_data = {}
2667       node_data = rpc.call_all_instances_info(nodes, hv_list)
2668       for name in nodes:
2669         result = node_data[name]
2670         if result:
2671           live_data.update(result)
2672         elif result == False:
2673           bad_nodes.append(name)
2674         # else no instance is alive
2675     else:
2676       live_data = dict([(name, {}) for name in instance_names])
2677
2678     # end data gathering
2679
2680     output = []
2681     for instance in instance_list:
2682       iout = []
2683       for field in self.op.output_fields:
2684         if field == "name":
2685           val = instance.name
2686         elif field == "os":
2687           val = instance.os
2688         elif field == "pnode":
2689           val = instance.primary_node
2690         elif field == "snodes":
2691           val = list(instance.secondary_nodes)
2692         elif field == "admin_state":
2693           val = (instance.status != "down")
2694         elif field == "oper_state":
2695           if instance.primary_node in bad_nodes:
2696             val = None
2697           else:
2698             val = bool(live_data.get(instance.name))
2699         elif field == "status":
2700           if instance.primary_node in bad_nodes:
2701             val = "ERROR_nodedown"
2702           else:
2703             running = bool(live_data.get(instance.name))
2704             if running:
2705               if instance.status != "down":
2706                 val = "running"
2707               else:
2708                 val = "ERROR_up"
2709             else:
2710               if instance.status != "down":
2711                 val = "ERROR_down"
2712               else:
2713                 val = "ADMIN_down"
2714         elif field == "admin_ram":
2715           val = instance.memory
2716         elif field == "oper_ram":
2717           if instance.primary_node in bad_nodes:
2718             val = None
2719           elif instance.name in live_data:
2720             val = live_data[instance.name].get("memory", "?")
2721           else:
2722             val = "-"
2723         elif field == "disk_template":
2724           val = instance.disk_template
2725         elif field == "ip":
2726           val = instance.nics[0].ip
2727         elif field == "bridge":
2728           val = instance.nics[0].bridge
2729         elif field == "mac":
2730           val = instance.nics[0].mac
2731         elif field == "sda_size" or field == "sdb_size":
2732           disk = instance.FindDisk(field[:3])
2733           if disk is None:
2734             val = None
2735           else:
2736             val = disk.size
2737         elif field == "vcpus":
2738           val = instance.vcpus
2739         elif field == "tags":
2740           val = list(instance.GetTags())
2741         elif field == "serial_no":
2742           val = instance.serial_no
2743         elif field in ("network_port", "kernel_path", "initrd_path",
2744                        "hvm_boot_order", "hvm_acpi", "hvm_pae",
2745                        "hvm_cdrom_image_path", "hvm_nic_type",
2746                        "hvm_disk_type", "vnc_bind_address"):
2747           val = getattr(instance, field, None)
2748           if val is not None:
2749             pass
2750           elif field in ("hvm_nic_type", "hvm_disk_type",
2751                          "kernel_path", "initrd_path"):
2752             val = "default"
2753           else:
2754             val = "-"
2755         elif field == "hypervisor":
2756           val = instance.hypervisor
2757         else:
2758           raise errors.ParameterError(field)
2759         iout.append(val)
2760       output.append(iout)
2761
2762     return output
2763
2764
2765 class LUFailoverInstance(LogicalUnit):
2766   """Failover an instance.
2767
2768   """
2769   HPATH = "instance-failover"
2770   HTYPE = constants.HTYPE_INSTANCE
2771   _OP_REQP = ["instance_name", "ignore_consistency"]
2772   REQ_BGL = False
2773
2774   def ExpandNames(self):
2775     self._ExpandAndLockInstance()
2776     self.needed_locks[locking.LEVEL_NODE] = []
2777     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2778
2779   def DeclareLocks(self, level):
2780     if level == locking.LEVEL_NODE:
2781       self._LockInstancesNodes()
2782
2783   def BuildHooksEnv(self):
2784     """Build hooks env.
2785
2786     This runs on master, primary and secondary nodes of the instance.
2787
2788     """
2789     env = {
2790       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2791       }
2792     env.update(_BuildInstanceHookEnvByObject(self.instance))
2793     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2794     return env, nl, nl
2795
2796   def CheckPrereq(self):
2797     """Check prerequisites.
2798
2799     This checks that the instance is in the cluster.
2800
2801     """
2802     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2803     assert self.instance is not None, \
2804       "Cannot retrieve locked instance %s" % self.op.instance_name
2805
2806     if instance.disk_template not in constants.DTS_NET_MIRROR:
2807       raise errors.OpPrereqError("Instance's disk layout is not"
2808                                  " network mirrored, cannot failover.")
2809
2810     secondary_nodes = instance.secondary_nodes
2811     if not secondary_nodes:
2812       raise errors.ProgrammerError("no secondary node but using "
2813                                    "a mirrored disk template")
2814
2815     target_node = secondary_nodes[0]
2816     # check memory requirements on the secondary node
2817     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2818                          instance.name, instance.memory,
2819                          instance.hypervisor)
2820
2821     # check bridge existance
2822     brlist = [nic.bridge for nic in instance.nics]
2823     if not rpc.call_bridges_exist(target_node, brlist):
2824       raise errors.OpPrereqError("One or more target bridges %s does not"
2825                                  " exist on destination node '%s'" %
2826                                  (brlist, target_node))
2827
2828   def Exec(self, feedback_fn):
2829     """Failover an instance.
2830
2831     The failover is done by shutting it down on its present node and
2832     starting it on the secondary.
2833
2834     """
2835     instance = self.instance
2836
2837     source_node = instance.primary_node
2838     target_node = instance.secondary_nodes[0]
2839
2840     feedback_fn("* checking disk consistency between source and target")
2841     for dev in instance.disks:
2842       # for drbd, these are drbd over lvm
2843       if not _CheckDiskConsistency(self, dev, target_node, False):
2844         if instance.status == "up" and not self.op.ignore_consistency:
2845           raise errors.OpExecError("Disk %s is degraded on target node,"
2846                                    " aborting failover." % dev.iv_name)
2847
2848     feedback_fn("* shutting down instance on source node")
2849     logger.Info("Shutting down instance %s on node %s" %
2850                 (instance.name, source_node))
2851
2852     if not rpc.call_instance_shutdown(source_node, instance):
2853       if self.op.ignore_consistency:
2854         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2855                      " anyway. Please make sure node %s is down"  %
2856                      (instance.name, source_node, source_node))
2857       else:
2858         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2859                                  (instance.name, source_node))
2860
2861     feedback_fn("* deactivating the instance's disks on source node")
2862     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2863       raise errors.OpExecError("Can't shut down the instance's disks.")
2864
2865     instance.primary_node = target_node
2866     # distribute new instance config to the other nodes
2867     self.cfg.Update(instance)
2868
2869     # Only start the instance if it's marked as up
2870     if instance.status == "up":
2871       feedback_fn("* activating the instance's disks on target node")
2872       logger.Info("Starting instance %s on node %s" %
2873                   (instance.name, target_node))
2874
2875       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2876                                                ignore_secondaries=True)
2877       if not disks_ok:
2878         _ShutdownInstanceDisks(self, instance)
2879         raise errors.OpExecError("Can't activate the instance's disks")
2880
2881       feedback_fn("* starting the instance on the target node")
2882       if not rpc.call_instance_start(target_node, instance, None):
2883         _ShutdownInstanceDisks(self, instance)
2884         raise errors.OpExecError("Could not start instance %s on node %s." %
2885                                  (instance.name, target_node))
2886
2887
2888 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2889   """Create a tree of block devices on the primary node.
2890
2891   This always creates all devices.
2892
2893   """
2894   if device.children:
2895     for child in device.children:
2896       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2897         return False
2898
2899   lu.cfg.SetDiskID(device, node)
2900   new_id = rpc.call_blockdev_create(node, device, device.size,
2901                                     instance.name, True, info)
2902   if not new_id:
2903     return False
2904   if device.physical_id is None:
2905     device.physical_id = new_id
2906   return True
2907
2908
2909 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2910   """Create a tree of block devices on a secondary node.
2911
2912   If this device type has to be created on secondaries, create it and
2913   all its children.
2914
2915   If not, just recurse to children keeping the same 'force' value.
2916
2917   """
2918   if device.CreateOnSecondary():
2919     force = True
2920   if device.children:
2921     for child in device.children:
2922       if not _CreateBlockDevOnSecondary(lu, node, instance,
2923                                         child, force, info):
2924         return False
2925
2926   if not force:
2927     return True
2928   lu.cfg.SetDiskID(device, node)
2929   new_id = rpc.call_blockdev_create(node, device, device.size,
2930                                     instance.name, False, info)
2931   if not new_id:
2932     return False
2933   if device.physical_id is None:
2934     device.physical_id = new_id
2935   return True
2936
2937
2938 def _GenerateUniqueNames(lu, exts):
2939   """Generate a suitable LV name.
2940
2941   This will generate a logical volume name for the given instance.
2942
2943   """
2944   results = []
2945   for val in exts:
2946     new_id = lu.cfg.GenerateUniqueID()
2947     results.append("%s%s" % (new_id, val))
2948   return results
2949
2950
2951 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
2952                          p_minor, s_minor):
2953   """Generate a drbd8 device complete with its children.
2954
2955   """
2956   port = lu.cfg.AllocatePort()
2957   vgname = lu.cfg.GetVGName()
2958   shared_secret = lu.cfg.GenerateDRBDSecret()
2959   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2960                           logical_id=(vgname, names[0]))
2961   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2962                           logical_id=(vgname, names[1]))
2963   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2964                           logical_id=(primary, secondary, port,
2965                                       p_minor, s_minor,
2966                                       shared_secret),
2967                           children=[dev_data, dev_meta],
2968                           iv_name=iv_name)
2969   return drbd_dev
2970
2971
2972 def _GenerateDiskTemplate(lu, template_name,
2973                           instance_name, primary_node,
2974                           secondary_nodes, disk_sz, swap_sz,
2975                           file_storage_dir, file_driver):
2976   """Generate the entire disk layout for a given template type.
2977
2978   """
2979   #TODO: compute space requirements
2980
2981   vgname = lu.cfg.GetVGName()
2982   if template_name == constants.DT_DISKLESS:
2983     disks = []
2984   elif template_name == constants.DT_PLAIN:
2985     if len(secondary_nodes) != 0:
2986       raise errors.ProgrammerError("Wrong template configuration")
2987
2988     names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
2989     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2990                            logical_id=(vgname, names[0]),
2991                            iv_name = "sda")
2992     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2993                            logical_id=(vgname, names[1]),
2994                            iv_name = "sdb")
2995     disks = [sda_dev, sdb_dev]
2996   elif template_name == constants.DT_DRBD8:
2997     if len(secondary_nodes) != 1:
2998       raise errors.ProgrammerError("Wrong template configuration")
2999     remote_node = secondary_nodes[0]
3000     (minor_pa, minor_pb,
3001      minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3002       [primary_node, primary_node, remote_node, remote_node], instance_name)
3003
3004     names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3005                                       ".sdb_data", ".sdb_meta"])
3006     drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3007                                         disk_sz, names[0:2], "sda",
3008                                         minor_pa, minor_sa)
3009     drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3010                                         swap_sz, names[2:4], "sdb",
3011                                         minor_pb, minor_sb)
3012     disks = [drbd_sda_dev, drbd_sdb_dev]
3013   elif template_name == constants.DT_FILE:
3014     if len(secondary_nodes) != 0:
3015       raise errors.ProgrammerError("Wrong template configuration")
3016
3017     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3018                                 iv_name="sda", logical_id=(file_driver,
3019                                 "%s/sda" % file_storage_dir))
3020     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3021                                 iv_name="sdb", logical_id=(file_driver,
3022                                 "%s/sdb" % file_storage_dir))
3023     disks = [file_sda_dev, file_sdb_dev]
3024   else:
3025     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3026   return disks
3027
3028
3029 def _GetInstanceInfoText(instance):
3030   """Compute that text that should be added to the disk's metadata.
3031
3032   """
3033   return "originstname+%s" % instance.name
3034
3035
3036 def _CreateDisks(lu, instance):
3037   """Create all disks for an instance.
3038
3039   This abstracts away some work from AddInstance.
3040
3041   Args:
3042     instance: the instance object
3043
3044   Returns:
3045     True or False showing the success of the creation process
3046
3047   """
3048   info = _GetInstanceInfoText(instance)
3049
3050   if instance.disk_template == constants.DT_FILE:
3051     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3052     result = rpc.call_file_storage_dir_create(instance.primary_node,
3053                                               file_storage_dir)
3054
3055     if not result:
3056       logger.Error("Could not connect to node '%s'" % instance.primary_node)
3057       return False
3058
3059     if not result[0]:
3060       logger.Error("failed to create directory '%s'" % file_storage_dir)
3061       return False
3062
3063   for device in instance.disks:
3064     logger.Info("creating volume %s for instance %s" %
3065                 (device.iv_name, instance.name))
3066     #HARDCODE
3067     for secondary_node in instance.secondary_nodes:
3068       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3069                                         device, False, info):
3070         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3071                      (device.iv_name, device, secondary_node))
3072         return False
3073     #HARDCODE
3074     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3075                                     instance, device, info):
3076       logger.Error("failed to create volume %s on primary!" %
3077                    device.iv_name)
3078       return False
3079
3080   return True
3081
3082
3083 def _RemoveDisks(lu, instance):
3084   """Remove all disks for an instance.
3085
3086   This abstracts away some work from `AddInstance()` and
3087   `RemoveInstance()`. Note that in case some of the devices couldn't
3088   be removed, the removal will continue with the other ones (compare
3089   with `_CreateDisks()`).
3090
3091   Args:
3092     instance: the instance object
3093
3094   Returns:
3095     True or False showing the success of the removal proces
3096
3097   """
3098   logger.Info("removing block devices for instance %s" % instance.name)
3099
3100   result = True
3101   for device in instance.disks:
3102     for node, disk in device.ComputeNodeTree(instance.primary_node):
3103       lu.cfg.SetDiskID(disk, node)
3104       if not rpc.call_blockdev_remove(node, disk):
3105         logger.Error("could not remove block device %s on node %s,"
3106                      " continuing anyway" %
3107                      (device.iv_name, node))
3108         result = False
3109
3110   if instance.disk_template == constants.DT_FILE:
3111     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3112     if not rpc.call_file_storage_dir_remove(instance.primary_node,
3113                                             file_storage_dir):
3114       logger.Error("could not remove directory '%s'" % file_storage_dir)
3115       result = False
3116
3117   return result
3118
3119
3120 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3121   """Compute disk size requirements in the volume group
3122
3123   This is currently hard-coded for the two-drive layout.
3124
3125   """
3126   # Required free disk space as a function of disk and swap space
3127   req_size_dict = {
3128     constants.DT_DISKLESS: None,
3129     constants.DT_PLAIN: disk_size + swap_size,
3130     # 256 MB are added for drbd metadata, 128MB for each drbd device
3131     constants.DT_DRBD8: disk_size + swap_size + 256,
3132     constants.DT_FILE: None,
3133   }
3134
3135   if disk_template not in req_size_dict:
3136     raise errors.ProgrammerError("Disk template '%s' size requirement"
3137                                  " is unknown" %  disk_template)
3138
3139   return req_size_dict[disk_template]
3140
3141
3142 class LUCreateInstance(LogicalUnit):
3143   """Create an instance.
3144
3145   """
3146   HPATH = "instance-add"
3147   HTYPE = constants.HTYPE_INSTANCE
3148   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3149               "disk_template", "swap_size", "mode", "start", "vcpus",
3150               "wait_for_sync", "ip_check", "mac"]
3151   REQ_BGL = False
3152
3153   def _ExpandNode(self, node):
3154     """Expands and checks one node name.
3155
3156     """
3157     node_full = self.cfg.ExpandNodeName(node)
3158     if node_full is None:
3159       raise errors.OpPrereqError("Unknown node %s" % node)
3160     return node_full
3161
3162   def ExpandNames(self):
3163     """ExpandNames for CreateInstance.
3164
3165     Figure out the right locks for instance creation.
3166
3167     """
3168     self.needed_locks = {}
3169
3170     # set optional parameters to none if they don't exist
3171     for attr in ["kernel_path", "initrd_path", "pnode", "snode",
3172                  "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
3173                  "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
3174                  "vnc_bind_address", "hypervisor"]:
3175       if not hasattr(self.op, attr):
3176         setattr(self.op, attr, None)
3177
3178     # cheap checks, mostly valid constants given
3179
3180     # verify creation mode
3181     if self.op.mode not in (constants.INSTANCE_CREATE,
3182                             constants.INSTANCE_IMPORT):
3183       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3184                                  self.op.mode)
3185
3186     # disk template and mirror node verification
3187     if self.op.disk_template not in constants.DISK_TEMPLATES:
3188       raise errors.OpPrereqError("Invalid disk template name")
3189
3190     if self.op.hypervisor is None:
3191       self.op.hypervisor = self.cfg.GetHypervisorType()
3192
3193     enabled_hvs = self.cfg.GetClusterInfo().enabled_hypervisors
3194     if self.op.hypervisor not in enabled_hvs:
3195       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3196                                  " cluster (%s)" % (self.op.hypervisor,
3197                                   ",".join(enabled_hvs)))
3198
3199     #### instance parameters check
3200
3201     # instance name verification
3202     hostname1 = utils.HostInfo(self.op.instance_name)
3203     self.op.instance_name = instance_name = hostname1.name
3204
3205     # this is just a preventive check, but someone might still add this
3206     # instance in the meantime, and creation will fail at lock-add time
3207     if instance_name in self.cfg.GetInstanceList():
3208       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3209                                  instance_name)
3210
3211     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3212
3213     # ip validity checks
3214     ip = getattr(self.op, "ip", None)
3215     if ip is None or ip.lower() == "none":
3216       inst_ip = None
3217     elif ip.lower() == "auto":
3218       inst_ip = hostname1.ip
3219     else:
3220       if not utils.IsValidIP(ip):
3221         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3222                                    " like a valid IP" % ip)
3223       inst_ip = ip
3224     self.inst_ip = self.op.ip = inst_ip
3225     # used in CheckPrereq for ip ping check
3226     self.check_ip = hostname1.ip
3227
3228     # MAC address verification
3229     if self.op.mac != "auto":
3230       if not utils.IsValidMac(self.op.mac.lower()):
3231         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3232                                    self.op.mac)
3233
3234     # boot order verification
3235     if self.op.hvm_boot_order is not None:
3236       if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3237         raise errors.OpPrereqError("invalid boot order specified,"
3238                                    " must be one or more of [acdn]")
3239     # file storage checks
3240     if (self.op.file_driver and
3241         not self.op.file_driver in constants.FILE_DRIVER):
3242       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3243                                  self.op.file_driver)
3244
3245     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3246       raise errors.OpPrereqError("File storage directory path not absolute")
3247
3248     ### Node/iallocator related checks
3249     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3250       raise errors.OpPrereqError("One and only one of iallocator and primary"
3251                                  " node must be given")
3252
3253     if self.op.iallocator:
3254       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3255     else:
3256       self.op.pnode = self._ExpandNode(self.op.pnode)
3257       nodelist = [self.op.pnode]
3258       if self.op.snode is not None:
3259         self.op.snode = self._ExpandNode(self.op.snode)
3260         nodelist.append(self.op.snode)
3261       self.needed_locks[locking.LEVEL_NODE] = nodelist
3262
3263     # in case of import lock the source node too
3264     if self.op.mode == constants.INSTANCE_IMPORT:
3265       src_node = getattr(self.op, "src_node", None)
3266       src_path = getattr(self.op, "src_path", None)
3267
3268       if src_node is None or src_path is None:
3269         raise errors.OpPrereqError("Importing an instance requires source"
3270                                    " node and path options")
3271
3272       if not os.path.isabs(src_path):
3273         raise errors.OpPrereqError("The source path must be absolute")
3274
3275       self.op.src_node = src_node = self._ExpandNode(src_node)
3276       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3277         self.needed_locks[locking.LEVEL_NODE].append(src_node)
3278
3279     else: # INSTANCE_CREATE
3280       if getattr(self.op, "os_type", None) is None:
3281         raise errors.OpPrereqError("No guest OS specified")
3282
3283   def _RunAllocator(self):
3284     """Run the allocator based on input opcode.
3285
3286     """
3287     disks = [{"size": self.op.disk_size, "mode": "w"},
3288              {"size": self.op.swap_size, "mode": "w"}]
3289     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3290              "bridge": self.op.bridge}]
3291     ial = IAllocator(self.cfg,
3292                      mode=constants.IALLOCATOR_MODE_ALLOC,
3293                      name=self.op.instance_name,
3294                      disk_template=self.op.disk_template,
3295                      tags=[],
3296                      os=self.op.os_type,
3297                      vcpus=self.op.vcpus,
3298                      mem_size=self.op.mem_size,
3299                      disks=disks,
3300                      nics=nics,
3301                      )
3302
3303     ial.Run(self.op.iallocator)
3304
3305     if not ial.success:
3306       raise errors.OpPrereqError("Can't compute nodes using"
3307                                  " iallocator '%s': %s" % (self.op.iallocator,
3308                                                            ial.info))
3309     if len(ial.nodes) != ial.required_nodes:
3310       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3311                                  " of nodes (%s), required %s" %
3312                                  (self.op.iallocator, len(ial.nodes),
3313                                   ial.required_nodes))
3314     self.op.pnode = ial.nodes[0]
3315     logger.ToStdout("Selected nodes for the instance: %s" %
3316                     (", ".join(ial.nodes),))
3317     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3318                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3319     if ial.required_nodes == 2:
3320       self.op.snode = ial.nodes[1]
3321
3322   def BuildHooksEnv(self):
3323     """Build hooks env.
3324
3325     This runs on master, primary and secondary nodes of the instance.
3326
3327     """
3328     env = {
3329       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3330       "INSTANCE_DISK_SIZE": self.op.disk_size,
3331       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3332       "INSTANCE_ADD_MODE": self.op.mode,
3333       }
3334     if self.op.mode == constants.INSTANCE_IMPORT:
3335       env["INSTANCE_SRC_NODE"] = self.op.src_node
3336       env["INSTANCE_SRC_PATH"] = self.op.src_path
3337       env["INSTANCE_SRC_IMAGE"] = self.src_image
3338
3339     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3340       primary_node=self.op.pnode,
3341       secondary_nodes=self.secondaries,
3342       status=self.instance_status,
3343       os_type=self.op.os_type,
3344       memory=self.op.mem_size,
3345       vcpus=self.op.vcpus,
3346       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3347     ))
3348
3349     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3350           self.secondaries)
3351     return env, nl, nl
3352
3353
3354   def CheckPrereq(self):
3355     """Check prerequisites.
3356
3357     """
3358     if (not self.cfg.GetVGName() and
3359         self.op.disk_template not in constants.DTS_NOT_LVM):
3360       raise errors.OpPrereqError("Cluster does not support lvm-based"
3361                                  " instances")
3362
3363
3364     if self.op.mode == constants.INSTANCE_IMPORT:
3365       src_node = self.op.src_node
3366       src_path = self.op.src_path
3367
3368       export_info = rpc.call_export_info(src_node, src_path)
3369
3370       if not export_info:
3371         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3372
3373       if not export_info.has_section(constants.INISECT_EXP):
3374         raise errors.ProgrammerError("Corrupted export config")
3375
3376       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3377       if (int(ei_version) != constants.EXPORT_VERSION):
3378         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3379                                    (ei_version, constants.EXPORT_VERSION))
3380
3381       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3382         raise errors.OpPrereqError("Can't import instance with more than"
3383                                    " one data disk")
3384
3385       # FIXME: are the old os-es, disk sizes, etc. useful?
3386       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3387       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3388                                                          'disk0_dump'))
3389       self.src_image = diskimage
3390
3391     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3392
3393     if self.op.start and not self.op.ip_check:
3394       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3395                                  " adding an instance in start mode")
3396
3397     if self.op.ip_check:
3398       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3399         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3400                                    (self.check_ip, self.op.instance_name))
3401
3402     # bridge verification
3403     bridge = getattr(self.op, "bridge", None)
3404     if bridge is None:
3405       self.op.bridge = self.cfg.GetDefBridge()
3406     else:
3407       self.op.bridge = bridge
3408
3409     #### allocator run
3410
3411     if self.op.iallocator is not None:
3412       self._RunAllocator()
3413
3414     #### node related checks
3415
3416     # check primary node
3417     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3418     assert self.pnode is not None, \
3419       "Cannot retrieve locked node %s" % self.op.pnode
3420     self.secondaries = []
3421
3422     # mirror node verification
3423     if self.op.disk_template in constants.DTS_NET_MIRROR:
3424       if self.op.snode is None:
3425         raise errors.OpPrereqError("The networked disk templates need"
3426                                    " a mirror node")
3427       if self.op.snode == pnode.name:
3428         raise errors.OpPrereqError("The secondary node cannot be"
3429                                    " the primary node.")
3430       self.secondaries.append(self.op.snode)
3431
3432     req_size = _ComputeDiskSize(self.op.disk_template,
3433                                 self.op.disk_size, self.op.swap_size)
3434
3435     # Check lv size requirements
3436     if req_size is not None:
3437       nodenames = [pnode.name] + self.secondaries
3438       nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3439                                     self.op.hypervisor)
3440       for node in nodenames:
3441         info = nodeinfo.get(node, None)
3442         if not info:
3443           raise errors.OpPrereqError("Cannot get current information"
3444                                      " from node '%s'" % node)
3445         vg_free = info.get('vg_free', None)
3446         if not isinstance(vg_free, int):
3447           raise errors.OpPrereqError("Can't compute free disk space on"
3448                                      " node %s" % node)
3449         if req_size > info['vg_free']:
3450           raise errors.OpPrereqError("Not enough disk space on target node %s."
3451                                      " %d MB available, %d MB required" %
3452                                      (node, info['vg_free'], req_size))
3453
3454     # os verification
3455     os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3456     if not os_obj:
3457       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3458                                  " primary node"  % self.op.os_type)
3459
3460     if self.op.kernel_path == constants.VALUE_NONE:
3461       raise errors.OpPrereqError("Can't set instance kernel to none")
3462
3463     # bridge check on primary node
3464     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3465       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3466                                  " destination node '%s'" %
3467                                  (self.op.bridge, pnode.name))
3468
3469     # memory check on primary node
3470     if self.op.start:
3471       _CheckNodeFreeMemory(self, self.pnode.name,
3472                            "creating instance %s" % self.op.instance_name,
3473                            self.op.mem_size, self.op.hypervisor)
3474
3475     # hvm_cdrom_image_path verification
3476     if self.op.hvm_cdrom_image_path is not None:
3477       # FIXME (als): shouldn't these checks happen on the destination node?
3478       if not os.path.isabs(self.op.hvm_cdrom_image_path):
3479         raise errors.OpPrereqError("The path to the HVM CDROM image must"
3480                                    " be an absolute path or None, not %s" %
3481                                    self.op.hvm_cdrom_image_path)
3482       if not os.path.isfile(self.op.hvm_cdrom_image_path):
3483         raise errors.OpPrereqError("The HVM CDROM image must either be a"
3484                                    " regular file or a symlink pointing to"
3485                                    " an existing regular file, not %s" %
3486                                    self.op.hvm_cdrom_image_path)
3487
3488     # vnc_bind_address verification
3489     if self.op.vnc_bind_address is not None:
3490       if not utils.IsValidIP(self.op.vnc_bind_address):
3491         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3492                                    " like a valid IP address" %
3493                                    self.op.vnc_bind_address)
3494
3495     # Xen HVM device type checks
3496     if self.op.hypervisor == constants.HT_XEN_HVM:
3497       if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
3498         raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
3499                                    " hypervisor" % self.op.hvm_nic_type)
3500       if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
3501         raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
3502                                    " hypervisor" % self.op.hvm_disk_type)
3503
3504     if self.op.start:
3505       self.instance_status = 'up'
3506     else:
3507       self.instance_status = 'down'
3508
3509   def Exec(self, feedback_fn):
3510     """Create and add the instance to the cluster.
3511
3512     """
3513     instance = self.op.instance_name
3514     pnode_name = self.pnode.name
3515
3516     if self.op.mac == "auto":
3517       mac_address = self.cfg.GenerateMAC()
3518     else:
3519       mac_address = self.op.mac
3520
3521     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3522     if self.inst_ip is not None:
3523       nic.ip = self.inst_ip
3524
3525     ht_kind = self.op.hypervisor
3526     if ht_kind in constants.HTS_REQ_PORT:
3527       network_port = self.cfg.AllocatePort()
3528     else:
3529       network_port = None
3530
3531     if self.op.vnc_bind_address is None:
3532       self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3533
3534     # this is needed because os.path.join does not accept None arguments
3535     if self.op.file_storage_dir is None:
3536       string_file_storage_dir = ""
3537     else:
3538       string_file_storage_dir = self.op.file_storage_dir
3539
3540     # build the full file storage dir path
3541     file_storage_dir = os.path.normpath(os.path.join(
3542                                         self.cfg.GetFileStorageDir(),
3543                                         string_file_storage_dir, instance))
3544
3545
3546     disks = _GenerateDiskTemplate(self,
3547                                   self.op.disk_template,
3548                                   instance, pnode_name,
3549                                   self.secondaries, self.op.disk_size,
3550                                   self.op.swap_size,
3551                                   file_storage_dir,
3552                                   self.op.file_driver)
3553
3554     iobj = objects.Instance(name=instance, os=self.op.os_type,
3555                             primary_node=pnode_name,
3556                             memory=self.op.mem_size,
3557                             vcpus=self.op.vcpus,
3558                             nics=[nic], disks=disks,
3559                             disk_template=self.op.disk_template,
3560                             status=self.instance_status,
3561                             network_port=network_port,
3562                             kernel_path=self.op.kernel_path,
3563                             initrd_path=self.op.initrd_path,
3564                             hvm_boot_order=self.op.hvm_boot_order,
3565                             hvm_acpi=self.op.hvm_acpi,
3566                             hvm_pae=self.op.hvm_pae,
3567                             hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3568                             vnc_bind_address=self.op.vnc_bind_address,
3569                             hvm_nic_type=self.op.hvm_nic_type,
3570                             hvm_disk_type=self.op.hvm_disk_type,
3571                             hypervisor=self.op.hypervisor,
3572                             )
3573
3574     feedback_fn("* creating instance disks...")
3575     if not _CreateDisks(self, iobj):
3576       _RemoveDisks(self, iobj)
3577       self.cfg.ReleaseDRBDMinors(instance)
3578       raise errors.OpExecError("Device creation failed, reverting...")
3579
3580     feedback_fn("adding instance %s to cluster config" % instance)
3581
3582     self.cfg.AddInstance(iobj)
3583     # Declare that we don't want to remove the instance lock anymore, as we've
3584     # added the instance to the config
3585     del self.remove_locks[locking.LEVEL_INSTANCE]
3586     # Remove the temp. assignements for the instance's drbds
3587     self.cfg.ReleaseDRBDMinors(instance)
3588
3589     if self.op.wait_for_sync:
3590       disk_abort = not _WaitForSync(self, iobj)
3591     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3592       # make sure the disks are not degraded (still sync-ing is ok)
3593       time.sleep(15)
3594       feedback_fn("* checking mirrors status")
3595       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3596     else:
3597       disk_abort = False
3598
3599     if disk_abort:
3600       _RemoveDisks(self, iobj)
3601       self.cfg.RemoveInstance(iobj.name)
3602       # Make sure the instance lock gets removed
3603       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3604       raise errors.OpExecError("There are some degraded disks for"
3605                                " this instance")
3606
3607     feedback_fn("creating os for instance %s on node %s" %
3608                 (instance, pnode_name))
3609
3610     if iobj.disk_template != constants.DT_DISKLESS:
3611       if self.op.mode == constants.INSTANCE_CREATE:
3612         feedback_fn("* running the instance OS create scripts...")
3613         if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3614           raise errors.OpExecError("could not add os for instance %s"
3615                                    " on node %s" %
3616                                    (instance, pnode_name))
3617
3618       elif self.op.mode == constants.INSTANCE_IMPORT:
3619         feedback_fn("* running the instance OS import scripts...")
3620         src_node = self.op.src_node
3621         src_image = self.src_image
3622         cluster_name = self.cfg.GetClusterName()
3623         if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3624                                            src_node, src_image, cluster_name):
3625           raise errors.OpExecError("Could not import os for instance"
3626                                    " %s on node %s" %
3627                                    (instance, pnode_name))
3628       else:
3629         # also checked in the prereq part
3630         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3631                                      % self.op.mode)
3632
3633     if self.op.start:
3634       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3635       feedback_fn("* starting instance...")
3636       if not rpc.call_instance_start(pnode_name, iobj, None):
3637         raise errors.OpExecError("Could not start instance")
3638
3639
3640 class LUConnectConsole(NoHooksLU):
3641   """Connect to an instance's console.
3642
3643   This is somewhat special in that it returns the command line that
3644   you need to run on the master node in order to connect to the
3645   console.
3646
3647   """
3648   _OP_REQP = ["instance_name"]
3649   REQ_BGL = False
3650
3651   def ExpandNames(self):
3652     self._ExpandAndLockInstance()
3653
3654   def CheckPrereq(self):
3655     """Check prerequisites.
3656
3657     This checks that the instance is in the cluster.
3658
3659     """
3660     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3661     assert self.instance is not None, \
3662       "Cannot retrieve locked instance %s" % self.op.instance_name
3663
3664   def Exec(self, feedback_fn):
3665     """Connect to the console of an instance
3666
3667     """
3668     instance = self.instance
3669     node = instance.primary_node
3670
3671     node_insts = rpc.call_instance_list([node],
3672                                         [instance.hypervisor])[node]
3673     if node_insts is False:
3674       raise errors.OpExecError("Can't connect to node %s." % node)
3675
3676     if instance.name not in node_insts:
3677       raise errors.OpExecError("Instance %s is not running." % instance.name)
3678
3679     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3680
3681     hyper = hypervisor.GetHypervisor(instance.hypervisor)
3682     console_cmd = hyper.GetShellCommandForConsole(instance)
3683
3684     # build ssh cmdline
3685     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3686
3687
3688 class LUReplaceDisks(LogicalUnit):
3689   """Replace the disks of an instance.
3690
3691   """
3692   HPATH = "mirrors-replace"
3693   HTYPE = constants.HTYPE_INSTANCE
3694   _OP_REQP = ["instance_name", "mode", "disks"]
3695   REQ_BGL = False
3696
3697   def ExpandNames(self):
3698     self._ExpandAndLockInstance()
3699
3700     if not hasattr(self.op, "remote_node"):
3701       self.op.remote_node = None
3702
3703     ia_name = getattr(self.op, "iallocator", None)
3704     if ia_name is not None:
3705       if self.op.remote_node is not None:
3706         raise errors.OpPrereqError("Give either the iallocator or the new"
3707                                    " secondary, not both")
3708       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3709     elif self.op.remote_node is not None:
3710       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3711       if remote_node is None:
3712         raise errors.OpPrereqError("Node '%s' not known" %
3713                                    self.op.remote_node)
3714       self.op.remote_node = remote_node
3715       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3716       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3717     else:
3718       self.needed_locks[locking.LEVEL_NODE] = []
3719       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3720
3721   def DeclareLocks(self, level):
3722     # If we're not already locking all nodes in the set we have to declare the
3723     # instance's primary/secondary nodes.
3724     if (level == locking.LEVEL_NODE and
3725         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3726       self._LockInstancesNodes()
3727
3728   def _RunAllocator(self):
3729     """Compute a new secondary node using an IAllocator.
3730
3731     """
3732     ial = IAllocator(self.cfg,
3733                      mode=constants.IALLOCATOR_MODE_RELOC,
3734                      name=self.op.instance_name,
3735                      relocate_from=[self.sec_node])
3736
3737     ial.Run(self.op.iallocator)
3738
3739     if not ial.success:
3740       raise errors.OpPrereqError("Can't compute nodes using"
3741                                  " iallocator '%s': %s" % (self.op.iallocator,
3742                                                            ial.info))
3743     if len(ial.nodes) != ial.required_nodes:
3744       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3745                                  " of nodes (%s), required %s" %
3746                                  (len(ial.nodes), ial.required_nodes))
3747     self.op.remote_node = ial.nodes[0]
3748     logger.ToStdout("Selected new secondary for the instance: %s" %
3749                     self.op.remote_node)
3750
3751   def BuildHooksEnv(self):
3752     """Build hooks env.
3753
3754     This runs on the master, the primary and all the secondaries.
3755
3756     """
3757     env = {
3758       "MODE": self.op.mode,
3759       "NEW_SECONDARY": self.op.remote_node,
3760       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3761       }
3762     env.update(_BuildInstanceHookEnvByObject(self.instance))
3763     nl = [
3764       self.cfg.GetMasterNode(),
3765       self.instance.primary_node,
3766       ]
3767     if self.op.remote_node is not None:
3768       nl.append(self.op.remote_node)
3769     return env, nl, nl
3770
3771   def CheckPrereq(self):
3772     """Check prerequisites.
3773
3774     This checks that the instance is in the cluster.
3775
3776     """
3777     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3778     assert instance is not None, \
3779       "Cannot retrieve locked instance %s" % self.op.instance_name
3780     self.instance = instance
3781
3782     if instance.disk_template not in constants.DTS_NET_MIRROR:
3783       raise errors.OpPrereqError("Instance's disk layout is not"
3784                                  " network mirrored.")
3785
3786     if len(instance.secondary_nodes) != 1:
3787       raise errors.OpPrereqError("The instance has a strange layout,"
3788                                  " expected one secondary but found %d" %
3789                                  len(instance.secondary_nodes))
3790
3791     self.sec_node = instance.secondary_nodes[0]
3792
3793     ia_name = getattr(self.op, "iallocator", None)
3794     if ia_name is not None:
3795       self._RunAllocator()
3796
3797     remote_node = self.op.remote_node
3798     if remote_node is not None:
3799       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3800       assert self.remote_node_info is not None, \
3801         "Cannot retrieve locked node %s" % remote_node
3802     else:
3803       self.remote_node_info = None
3804     if remote_node == instance.primary_node:
3805       raise errors.OpPrereqError("The specified node is the primary node of"
3806                                  " the instance.")
3807     elif remote_node == self.sec_node:
3808       if self.op.mode == constants.REPLACE_DISK_SEC:
3809         # this is for DRBD8, where we can't execute the same mode of
3810         # replacement as for drbd7 (no different port allocated)
3811         raise errors.OpPrereqError("Same secondary given, cannot execute"
3812                                    " replacement")
3813     if instance.disk_template == constants.DT_DRBD8:
3814       if (self.op.mode == constants.REPLACE_DISK_ALL and
3815           remote_node is not None):
3816         # switch to replace secondary mode
3817         self.op.mode = constants.REPLACE_DISK_SEC
3818
3819       if self.op.mode == constants.REPLACE_DISK_ALL:
3820         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3821                                    " secondary disk replacement, not"
3822                                    " both at once")
3823       elif self.op.mode == constants.REPLACE_DISK_PRI:
3824         if remote_node is not None:
3825           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3826                                      " the secondary while doing a primary"
3827                                      " node disk replacement")
3828         self.tgt_node = instance.primary_node
3829         self.oth_node = instance.secondary_nodes[0]
3830       elif self.op.mode == constants.REPLACE_DISK_SEC:
3831         self.new_node = remote_node # this can be None, in which case
3832                                     # we don't change the secondary
3833         self.tgt_node = instance.secondary_nodes[0]
3834         self.oth_node = instance.primary_node
3835       else:
3836         raise errors.ProgrammerError("Unhandled disk replace mode")
3837
3838     for name in self.op.disks:
3839       if instance.FindDisk(name) is None:
3840         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3841                                    (name, instance.name))
3842
3843   def _ExecD8DiskOnly(self, feedback_fn):
3844     """Replace a disk on the primary or secondary for dbrd8.
3845
3846     The algorithm for replace is quite complicated:
3847       - for each disk to be replaced:
3848         - create new LVs on the target node with unique names
3849         - detach old LVs from the drbd device
3850         - rename old LVs to name_replaced.<time_t>
3851         - rename new LVs to old LVs
3852         - attach the new LVs (with the old names now) to the drbd device
3853       - wait for sync across all devices
3854       - for each modified disk:
3855         - remove old LVs (which have the name name_replaces.<time_t>)
3856
3857     Failures are not very well handled.
3858
3859     """
3860     steps_total = 6
3861     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3862     instance = self.instance
3863     iv_names = {}
3864     vgname = self.cfg.GetVGName()
3865     # start of work
3866     cfg = self.cfg
3867     tgt_node = self.tgt_node
3868     oth_node = self.oth_node
3869
3870     # Step: check device activation
3871     self.proc.LogStep(1, steps_total, "check device existence")
3872     info("checking volume groups")
3873     my_vg = cfg.GetVGName()
3874     results = rpc.call_vg_list([oth_node, tgt_node])
3875     if not results:
3876       raise errors.OpExecError("Can't list volume groups on the nodes")
3877     for node in oth_node, tgt_node:
3878       res = results.get(node, False)
3879       if not res or my_vg not in res:
3880         raise errors.OpExecError("Volume group '%s' not found on %s" %
3881                                  (my_vg, node))
3882     for dev in instance.disks:
3883       if not dev.iv_name in self.op.disks:
3884         continue
3885       for node in tgt_node, oth_node:
3886         info("checking %s on %s" % (dev.iv_name, node))
3887         cfg.SetDiskID(dev, node)
3888         if not rpc.call_blockdev_find(node, dev):
3889           raise errors.OpExecError("Can't find device %s on node %s" %
3890                                    (dev.iv_name, node))
3891
3892     # Step: check other node consistency
3893     self.proc.LogStep(2, steps_total, "check peer consistency")
3894     for dev in instance.disks:
3895       if not dev.iv_name in self.op.disks:
3896         continue
3897       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3898       if not _CheckDiskConsistency(self, dev, oth_node,
3899                                    oth_node==instance.primary_node):
3900         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3901                                  " to replace disks on this node (%s)" %
3902                                  (oth_node, tgt_node))
3903
3904     # Step: create new storage
3905     self.proc.LogStep(3, steps_total, "allocate new storage")
3906     for dev in instance.disks:
3907       if not dev.iv_name in self.op.disks:
3908         continue
3909       size = dev.size
3910       cfg.SetDiskID(dev, tgt_node)
3911       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3912       names = _GenerateUniqueNames(self, lv_names)
3913       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3914                              logical_id=(vgname, names[0]))
3915       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3916                              logical_id=(vgname, names[1]))
3917       new_lvs = [lv_data, lv_meta]
3918       old_lvs = dev.children
3919       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3920       info("creating new local storage on %s for %s" %
3921            (tgt_node, dev.iv_name))
3922       # since we *always* want to create this LV, we use the
3923       # _Create...OnPrimary (which forces the creation), even if we
3924       # are talking about the secondary node
3925       for new_lv in new_lvs:
3926         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
3927                                         _GetInstanceInfoText(instance)):
3928           raise errors.OpExecError("Failed to create new LV named '%s' on"
3929                                    " node '%s'" %
3930                                    (new_lv.logical_id[1], tgt_node))
3931
3932     # Step: for each lv, detach+rename*2+attach
3933     self.proc.LogStep(4, steps_total, "change drbd configuration")
3934     for dev, old_lvs, new_lvs in iv_names.itervalues():
3935       info("detaching %s drbd from local storage" % dev.iv_name)
3936       if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3937         raise errors.OpExecError("Can't detach drbd from local storage on node"
3938                                  " %s for device %s" % (tgt_node, dev.iv_name))
3939       #dev.children = []
3940       #cfg.Update(instance)
3941
3942       # ok, we created the new LVs, so now we know we have the needed
3943       # storage; as such, we proceed on the target node to rename
3944       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3945       # using the assumption that logical_id == physical_id (which in
3946       # turn is the unique_id on that node)
3947
3948       # FIXME(iustin): use a better name for the replaced LVs
3949       temp_suffix = int(time.time())
3950       ren_fn = lambda d, suff: (d.physical_id[0],
3951                                 d.physical_id[1] + "_replaced-%s" % suff)
3952       # build the rename list based on what LVs exist on the node
3953       rlist = []
3954       for to_ren in old_lvs:
3955         find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3956         if find_res is not None: # device exists
3957           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3958
3959       info("renaming the old LVs on the target node")
3960       if not rpc.call_blockdev_rename(tgt_node, rlist):
3961         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3962       # now we rename the new LVs to the old LVs
3963       info("renaming the new LVs on the target node")
3964       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3965       if not rpc.call_blockdev_rename(tgt_node, rlist):
3966         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3967
3968       for old, new in zip(old_lvs, new_lvs):
3969         new.logical_id = old.logical_id
3970         cfg.SetDiskID(new, tgt_node)
3971
3972       for disk in old_lvs:
3973         disk.logical_id = ren_fn(disk, temp_suffix)
3974         cfg.SetDiskID(disk, tgt_node)
3975
3976       # now that the new lvs have the old name, we can add them to the device
3977       info("adding new mirror component on %s" % tgt_node)
3978       if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3979         for new_lv in new_lvs:
3980           if not rpc.call_blockdev_remove(tgt_node, new_lv):
3981             warning("Can't rollback device %s", hint="manually cleanup unused"
3982                     " logical volumes")
3983         raise errors.OpExecError("Can't add local storage to drbd")
3984
3985       dev.children = new_lvs
3986       cfg.Update(instance)
3987
3988     # Step: wait for sync
3989
3990     # this can fail as the old devices are degraded and _WaitForSync
3991     # does a combined result over all disks, so we don't check its
3992     # return value
3993     self.proc.LogStep(5, steps_total, "sync devices")
3994     _WaitForSync(self, instance, unlock=True)
3995
3996     # so check manually all the devices
3997     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3998       cfg.SetDiskID(dev, instance.primary_node)
3999       is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
4000       if is_degr:
4001         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4002
4003     # Step: remove old storage
4004     self.proc.LogStep(6, steps_total, "removing old storage")
4005     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4006       info("remove logical volumes for %s" % name)
4007       for lv in old_lvs:
4008         cfg.SetDiskID(lv, tgt_node)
4009         if not rpc.call_blockdev_remove(tgt_node, lv):
4010           warning("Can't remove old LV", hint="manually remove unused LVs")
4011           continue
4012
4013   def _ExecD8Secondary(self, feedback_fn):
4014     """Replace the secondary node for drbd8.
4015
4016     The algorithm for replace is quite complicated:
4017       - for all disks of the instance:
4018         - create new LVs on the new node with same names
4019         - shutdown the drbd device on the old secondary
4020         - disconnect the drbd network on the primary
4021         - create the drbd device on the new secondary
4022         - network attach the drbd on the primary, using an artifice:
4023           the drbd code for Attach() will connect to the network if it
4024           finds a device which is connected to the good local disks but
4025           not network enabled
4026       - wait for sync across all devices
4027       - remove all disks from the old secondary
4028
4029     Failures are not very well handled.
4030
4031     """
4032     steps_total = 6
4033     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4034     instance = self.instance
4035     iv_names = {}
4036     vgname = self.cfg.GetVGName()
4037     # start of work
4038     cfg = self.cfg
4039     old_node = self.tgt_node
4040     new_node = self.new_node
4041     pri_node = instance.primary_node
4042
4043     # Step: check device activation
4044     self.proc.LogStep(1, steps_total, "check device existence")
4045     info("checking volume groups")
4046     my_vg = cfg.GetVGName()
4047     results = rpc.call_vg_list([pri_node, new_node])
4048     if not results:
4049       raise errors.OpExecError("Can't list volume groups on the nodes")
4050     for node in pri_node, new_node:
4051       res = results.get(node, False)
4052       if not res or my_vg not in res:
4053         raise errors.OpExecError("Volume group '%s' not found on %s" %
4054                                  (my_vg, node))
4055     for dev in instance.disks:
4056       if not dev.iv_name in self.op.disks:
4057         continue
4058       info("checking %s on %s" % (dev.iv_name, pri_node))
4059       cfg.SetDiskID(dev, pri_node)
4060       if not rpc.call_blockdev_find(pri_node, dev):
4061         raise errors.OpExecError("Can't find device %s on node %s" %
4062                                  (dev.iv_name, pri_node))
4063
4064     # Step: check other node consistency
4065     self.proc.LogStep(2, steps_total, "check peer consistency")
4066     for dev in instance.disks:
4067       if not dev.iv_name in self.op.disks:
4068         continue
4069       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4070       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4071         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4072                                  " unsafe to replace the secondary" %
4073                                  pri_node)
4074
4075     # Step: create new storage
4076     self.proc.LogStep(3, steps_total, "allocate new storage")
4077     for dev in instance.disks:
4078       size = dev.size
4079       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4080       # since we *always* want to create this LV, we use the
4081       # _Create...OnPrimary (which forces the creation), even if we
4082       # are talking about the secondary node
4083       for new_lv in dev.children:
4084         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4085                                         _GetInstanceInfoText(instance)):
4086           raise errors.OpExecError("Failed to create new LV named '%s' on"
4087                                    " node '%s'" %
4088                                    (new_lv.logical_id[1], new_node))
4089
4090
4091     # Step 4: dbrd minors and drbd setups changes
4092     # after this, we must manually remove the drbd minors on both the
4093     # error and the success paths
4094     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4095                                    instance.name)
4096     logging.debug("Allocated minors %s" % (minors,))
4097     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4098     for dev, new_minor in zip(instance.disks, minors):
4099       size = dev.size
4100       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4101       # create new devices on new_node
4102       if pri_node == dev.logical_id[0]:
4103         new_logical_id = (pri_node, new_node,
4104                           dev.logical_id[2], dev.logical_id[3], new_minor,
4105                           dev.logical_id[5])
4106       else:
4107         new_logical_id = (new_node, pri_node,
4108                           dev.logical_id[2], new_minor, dev.logical_id[4],
4109                           dev.logical_id[5])
4110       iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4111       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4112                     new_logical_id)
4113       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4114                               logical_id=new_logical_id,
4115                               children=dev.children)
4116       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4117                                         new_drbd, False,
4118                                         _GetInstanceInfoText(instance)):
4119         self.cfg.ReleaseDRBDMinors(instance.name)
4120         raise errors.OpExecError("Failed to create new DRBD on"
4121                                  " node '%s'" % new_node)
4122
4123     for dev in instance.disks:
4124       # we have new devices, shutdown the drbd on the old secondary
4125       info("shutting down drbd for %s on old node" % dev.iv_name)
4126       cfg.SetDiskID(dev, old_node)
4127       if not rpc.call_blockdev_shutdown(old_node, dev):
4128         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4129                 hint="Please cleanup this device manually as soon as possible")
4130
4131     info("detaching primary drbds from the network (=> standalone)")
4132     done = 0
4133     for dev in instance.disks:
4134       cfg.SetDiskID(dev, pri_node)
4135       # set the network part of the physical (unique in bdev terms) id
4136       # to None, meaning detach from network
4137       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4138       # and 'find' the device, which will 'fix' it to match the
4139       # standalone state
4140       if rpc.call_blockdev_find(pri_node, dev):
4141         done += 1
4142       else:
4143         warning("Failed to detach drbd %s from network, unusual case" %
4144                 dev.iv_name)
4145
4146     if not done:
4147       # no detaches succeeded (very unlikely)
4148       self.cfg.ReleaseDRBDMinors(instance.name)
4149       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4150
4151     # if we managed to detach at least one, we update all the disks of
4152     # the instance to point to the new secondary
4153     info("updating instance configuration")
4154     for dev, _, new_logical_id in iv_names.itervalues():
4155       dev.logical_id = new_logical_id
4156       cfg.SetDiskID(dev, pri_node)
4157     cfg.Update(instance)
4158     # we can remove now the temp minors as now the new values are
4159     # written to the config file (and therefore stable)
4160     self.cfg.ReleaseDRBDMinors(instance.name)
4161
4162     # and now perform the drbd attach
4163     info("attaching primary drbds to new secondary (standalone => connected)")
4164     failures = []
4165     for dev in instance.disks:
4166       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4167       # since the attach is smart, it's enough to 'find' the device,
4168       # it will automatically activate the network, if the physical_id
4169       # is correct
4170       cfg.SetDiskID(dev, pri_node)
4171       logging.debug("Disk to attach: %s", dev)
4172       if not rpc.call_blockdev_find(pri_node, dev):
4173         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4174                 "please do a gnt-instance info to see the status of disks")
4175
4176     # this can fail as the old devices are degraded and _WaitForSync
4177     # does a combined result over all disks, so we don't check its
4178     # return value
4179     self.proc.LogStep(5, steps_total, "sync devices")
4180     _WaitForSync(self, instance, unlock=True)
4181
4182     # so check manually all the devices
4183     for name, (dev, old_lvs, _) in iv_names.iteritems():
4184       cfg.SetDiskID(dev, pri_node)
4185       is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
4186       if is_degr:
4187         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4188
4189     self.proc.LogStep(6, steps_total, "removing old storage")
4190     for name, (dev, old_lvs, _) in iv_names.iteritems():
4191       info("remove logical volumes for %s" % name)
4192       for lv in old_lvs:
4193         cfg.SetDiskID(lv, old_node)
4194         if not rpc.call_blockdev_remove(old_node, lv):
4195           warning("Can't remove LV on old secondary",
4196                   hint="Cleanup stale volumes by hand")
4197
4198   def Exec(self, feedback_fn):
4199     """Execute disk replacement.
4200
4201     This dispatches the disk replacement to the appropriate handler.
4202
4203     """
4204     instance = self.instance
4205
4206     # Activate the instance disks if we're replacing them on a down instance
4207     if instance.status == "down":
4208       _StartInstanceDisks(self, instance, True)
4209
4210     if instance.disk_template == constants.DT_DRBD8:
4211       if self.op.remote_node is None:
4212         fn = self._ExecD8DiskOnly
4213       else:
4214         fn = self._ExecD8Secondary
4215     else:
4216       raise errors.ProgrammerError("Unhandled disk replacement case")
4217
4218     ret = fn(feedback_fn)
4219
4220     # Deactivate the instance disks if we're replacing them on a down instance
4221     if instance.status == "down":
4222       _SafeShutdownInstanceDisks(self, instance)
4223
4224     return ret
4225
4226
4227 class LUGrowDisk(LogicalUnit):
4228   """Grow a disk of an instance.
4229
4230   """
4231   HPATH = "disk-grow"
4232   HTYPE = constants.HTYPE_INSTANCE
4233   _OP_REQP = ["instance_name", "disk", "amount"]
4234   REQ_BGL = False
4235
4236   def ExpandNames(self):
4237     self._ExpandAndLockInstance()
4238     self.needed_locks[locking.LEVEL_NODE] = []
4239     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4240
4241   def DeclareLocks(self, level):
4242     if level == locking.LEVEL_NODE:
4243       self._LockInstancesNodes()
4244
4245   def BuildHooksEnv(self):
4246     """Build hooks env.
4247
4248     This runs on the master, the primary and all the secondaries.
4249
4250     """
4251     env = {
4252       "DISK": self.op.disk,
4253       "AMOUNT": self.op.amount,
4254       }
4255     env.update(_BuildInstanceHookEnvByObject(self.instance))
4256     nl = [
4257       self.cfg.GetMasterNode(),
4258       self.instance.primary_node,
4259       ]
4260     return env, nl, nl
4261
4262   def CheckPrereq(self):
4263     """Check prerequisites.
4264
4265     This checks that the instance is in the cluster.
4266
4267     """
4268     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4269     assert instance is not None, \
4270       "Cannot retrieve locked instance %s" % self.op.instance_name
4271
4272     self.instance = instance
4273
4274     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4275       raise errors.OpPrereqError("Instance's disk layout does not support"
4276                                  " growing.")
4277
4278     if instance.FindDisk(self.op.disk) is None:
4279       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4280                                  (self.op.disk, instance.name))
4281
4282     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4283     nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4284                                   instance.hypervisor)
4285     for node in nodenames:
4286       info = nodeinfo.get(node, None)
4287       if not info:
4288         raise errors.OpPrereqError("Cannot get current information"
4289                                    " from node '%s'" % node)
4290       vg_free = info.get('vg_free', None)
4291       if not isinstance(vg_free, int):
4292         raise errors.OpPrereqError("Can't compute free disk space on"
4293                                    " node %s" % node)
4294       if self.op.amount > info['vg_free']:
4295         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4296                                    " %d MiB available, %d MiB required" %
4297                                    (node, info['vg_free'], self.op.amount))
4298
4299   def Exec(self, feedback_fn):
4300     """Execute disk grow.
4301
4302     """
4303     instance = self.instance
4304     disk = instance.FindDisk(self.op.disk)
4305     for node in (instance.secondary_nodes + (instance.primary_node,)):
4306       self.cfg.SetDiskID(disk, node)
4307       result = rpc.call_blockdev_grow(node, disk, self.op.amount)
4308       if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
4309         raise errors.OpExecError("grow request failed to node %s" % node)
4310       elif not result[0]:
4311         raise errors.OpExecError("grow request failed to node %s: %s" %
4312                                  (node, result[1]))
4313     disk.RecordGrow(self.op.amount)
4314     self.cfg.Update(instance)
4315     return
4316
4317
4318 class LUQueryInstanceData(NoHooksLU):
4319   """Query runtime instance data.
4320
4321   """
4322   _OP_REQP = ["instances"]
4323   REQ_BGL = False
4324
4325   def ExpandNames(self):
4326     self.needed_locks = {}
4327     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4328
4329     if not isinstance(self.op.instances, list):
4330       raise errors.OpPrereqError("Invalid argument type 'instances'")
4331
4332     if self.op.instances:
4333       self.wanted_names = []
4334       for name in self.op.instances:
4335         full_name = self.cfg.ExpandInstanceName(name)
4336         if full_name is None:
4337           raise errors.OpPrereqError("Instance '%s' not known" %
4338                                      self.op.instance_name)
4339         self.wanted_names.append(full_name)
4340       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4341     else:
4342       self.wanted_names = None
4343       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4344
4345     self.needed_locks[locking.LEVEL_NODE] = []
4346     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4347
4348   def DeclareLocks(self, level):
4349     if level == locking.LEVEL_NODE:
4350       self._LockInstancesNodes()
4351
4352   def CheckPrereq(self):
4353     """Check prerequisites.
4354
4355     This only checks the optional instance list against the existing names.
4356
4357     """
4358     if self.wanted_names is None:
4359       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4360
4361     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4362                              in self.wanted_names]
4363     return
4364
4365   def _ComputeDiskStatus(self, instance, snode, dev):
4366     """Compute block device status.
4367
4368     """
4369     self.cfg.SetDiskID(dev, instance.primary_node)
4370     dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4371     if dev.dev_type in constants.LDS_DRBD:
4372       # we change the snode then (otherwise we use the one passed in)
4373       if dev.logical_id[0] == instance.primary_node:
4374         snode = dev.logical_id[1]
4375       else:
4376         snode = dev.logical_id[0]
4377
4378     if snode:
4379       self.cfg.SetDiskID(dev, snode)
4380       dev_sstatus = rpc.call_blockdev_find(snode, dev)
4381     else:
4382       dev_sstatus = None
4383
4384     if dev.children:
4385       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4386                       for child in dev.children]
4387     else:
4388       dev_children = []
4389
4390     data = {
4391       "iv_name": dev.iv_name,
4392       "dev_type": dev.dev_type,
4393       "logical_id": dev.logical_id,
4394       "physical_id": dev.physical_id,
4395       "pstatus": dev_pstatus,
4396       "sstatus": dev_sstatus,
4397       "children": dev_children,
4398       }
4399
4400     return data
4401
4402   def Exec(self, feedback_fn):
4403     """Gather and return data"""
4404     result = {}
4405     for instance in self.wanted_instances:
4406       remote_info = rpc.call_instance_info(instance.primary_node,
4407                                            instance.name,
4408                                            instance.hypervisor)
4409       if remote_info and "state" in remote_info:
4410         remote_state = "up"
4411       else:
4412         remote_state = "down"
4413       if instance.status == "down":
4414         config_state = "down"
4415       else:
4416         config_state = "up"
4417
4418       disks = [self._ComputeDiskStatus(instance, None, device)
4419                for device in instance.disks]
4420
4421       idict = {
4422         "name": instance.name,
4423         "config_state": config_state,
4424         "run_state": remote_state,
4425         "pnode": instance.primary_node,
4426         "snodes": instance.secondary_nodes,
4427         "os": instance.os,
4428         "memory": instance.memory,
4429         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4430         "disks": disks,
4431         "vcpus": instance.vcpus,
4432         "hypervisor": instance.hypervisor,
4433         }
4434
4435       htkind = instance.hypervisor
4436       if htkind == constants.HT_XEN_PVM:
4437         idict["kernel_path"] = instance.kernel_path
4438         idict["initrd_path"] = instance.initrd_path
4439
4440       if htkind == constants.HT_XEN_HVM:
4441         idict["hvm_boot_order"] = instance.hvm_boot_order
4442         idict["hvm_acpi"] = instance.hvm_acpi
4443         idict["hvm_pae"] = instance.hvm_pae
4444         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4445         idict["hvm_nic_type"] = instance.hvm_nic_type
4446         idict["hvm_disk_type"] = instance.hvm_disk_type
4447
4448       if htkind in constants.HTS_REQ_PORT:
4449         if instance.vnc_bind_address is None:
4450           vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4451         else:
4452           vnc_bind_address = instance.vnc_bind_address
4453         if instance.network_port is None:
4454           vnc_console_port = None
4455         elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4456           vnc_console_port = "%s:%s" % (instance.primary_node,
4457                                        instance.network_port)
4458         elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4459           vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4460                                                    instance.network_port,
4461                                                    instance.primary_node)
4462         else:
4463           vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4464                                         instance.network_port)
4465         idict["vnc_console_port"] = vnc_console_port
4466         idict["vnc_bind_address"] = vnc_bind_address
4467         idict["network_port"] = instance.network_port
4468
4469       result[instance.name] = idict
4470
4471     return result
4472
4473
4474 class LUSetInstanceParams(LogicalUnit):
4475   """Modifies an instances's parameters.
4476
4477   """
4478   HPATH = "instance-modify"
4479   HTYPE = constants.HTYPE_INSTANCE
4480   _OP_REQP = ["instance_name"]
4481   REQ_BGL = False
4482
4483   def ExpandNames(self):
4484     self._ExpandAndLockInstance()
4485
4486   def BuildHooksEnv(self):
4487     """Build hooks env.
4488
4489     This runs on the master, primary and secondaries.
4490
4491     """
4492     args = dict()
4493     if self.mem:
4494       args['memory'] = self.mem
4495     if self.vcpus:
4496       args['vcpus'] = self.vcpus
4497     if self.do_ip or self.do_bridge or self.mac:
4498       if self.do_ip:
4499         ip = self.ip
4500       else:
4501         ip = self.instance.nics[0].ip
4502       if self.bridge:
4503         bridge = self.bridge
4504       else:
4505         bridge = self.instance.nics[0].bridge
4506       if self.mac:
4507         mac = self.mac
4508       else:
4509         mac = self.instance.nics[0].mac
4510       args['nics'] = [(ip, bridge, mac)]
4511     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4512     nl = [self.cfg.GetMasterNode(),
4513           self.instance.primary_node] + list(self.instance.secondary_nodes)
4514     return env, nl, nl
4515
4516   def CheckPrereq(self):
4517     """Check prerequisites.
4518
4519     This only checks the instance list against the existing names.
4520
4521     """
4522     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4523     # a separate CheckArguments function, if we implement one, so the operation
4524     # can be aborted without waiting for any lock, should it have an error...
4525     self.mem = getattr(self.op, "mem", None)
4526     self.vcpus = getattr(self.op, "vcpus", None)
4527     self.ip = getattr(self.op, "ip", None)
4528     self.mac = getattr(self.op, "mac", None)
4529     self.bridge = getattr(self.op, "bridge", None)
4530     self.kernel_path = getattr(self.op, "kernel_path", None)
4531     self.initrd_path = getattr(self.op, "initrd_path", None)
4532     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4533     self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4534     self.hvm_pae = getattr(self.op, "hvm_pae", None)
4535     self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
4536     self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
4537     self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4538     self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4539     self.force = getattr(self.op, "force", None)
4540     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4541                  self.kernel_path, self.initrd_path, self.hvm_boot_order,
4542                  self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4543                  self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
4544     if all_parms.count(None) == len(all_parms):
4545       raise errors.OpPrereqError("No changes submitted")
4546     if self.mem is not None:
4547       try:
4548         self.mem = int(self.mem)
4549       except ValueError, err:
4550         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4551     if self.vcpus is not None:
4552       try:
4553         self.vcpus = int(self.vcpus)
4554       except ValueError, err:
4555         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4556     if self.ip is not None:
4557       self.do_ip = True
4558       if self.ip.lower() == "none":
4559         self.ip = None
4560       else:
4561         if not utils.IsValidIP(self.ip):
4562           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4563     else:
4564       self.do_ip = False
4565     self.do_bridge = (self.bridge is not None)
4566     if self.mac is not None:
4567       if self.cfg.IsMacInUse(self.mac):
4568         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4569                                    self.mac)
4570       if not utils.IsValidMac(self.mac):
4571         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4572
4573     if self.kernel_path is not None:
4574       self.do_kernel_path = True
4575       if self.kernel_path == constants.VALUE_NONE:
4576         raise errors.OpPrereqError("Can't set instance to no kernel")
4577
4578       if self.kernel_path != constants.VALUE_DEFAULT:
4579         if not os.path.isabs(self.kernel_path):
4580           raise errors.OpPrereqError("The kernel path must be an absolute"
4581                                     " filename")
4582     else:
4583       self.do_kernel_path = False
4584
4585     if self.initrd_path is not None:
4586       self.do_initrd_path = True
4587       if self.initrd_path not in (constants.VALUE_NONE,
4588                                   constants.VALUE_DEFAULT):
4589         if not os.path.isabs(self.initrd_path):
4590           raise errors.OpPrereqError("The initrd path must be an absolute"
4591                                     " filename")
4592     else:
4593       self.do_initrd_path = False
4594
4595     # boot order verification
4596     if self.hvm_boot_order is not None:
4597       if self.hvm_boot_order != constants.VALUE_DEFAULT:
4598         if len(self.hvm_boot_order.strip("acdn")) != 0:
4599           raise errors.OpPrereqError("invalid boot order specified,"
4600                                      " must be one or more of [acdn]"
4601                                      " or 'default'")
4602
4603     # hvm_cdrom_image_path verification
4604     if self.op.hvm_cdrom_image_path is not None:
4605       if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
4606               self.op.hvm_cdrom_image_path.lower() == "none"):
4607         raise errors.OpPrereqError("The path to the HVM CDROM image must"
4608                                    " be an absolute path or None, not %s" %
4609                                    self.op.hvm_cdrom_image_path)
4610       if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
4611               self.op.hvm_cdrom_image_path.lower() == "none"):
4612         raise errors.OpPrereqError("The HVM CDROM image must either be a"
4613                                    " regular file or a symlink pointing to"
4614                                    " an existing regular file, not %s" %
4615                                    self.op.hvm_cdrom_image_path)
4616
4617     # vnc_bind_address verification
4618     if self.op.vnc_bind_address is not None:
4619       if not utils.IsValidIP(self.op.vnc_bind_address):
4620         raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4621                                    " like a valid IP address" %
4622                                    self.op.vnc_bind_address)
4623
4624     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4625     assert self.instance is not None, \
4626       "Cannot retrieve locked instance %s" % self.op.instance_name
4627     self.warn = []
4628     if self.mem is not None and not self.force:
4629       pnode = self.instance.primary_node
4630       nodelist = [pnode]
4631       nodelist.extend(instance.secondary_nodes)
4632       instance_info = rpc.call_instance_info(pnode, instance.name,
4633                                              instance.hypervisor)
4634       nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName(),
4635                                     instance.hypervisor)
4636
4637       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4638         # Assume the primary node is unreachable and go ahead
4639         self.warn.append("Can't get info from primary node %s" % pnode)
4640       else:
4641         if instance_info:
4642           current_mem = instance_info['memory']
4643         else:
4644           # Assume instance not running
4645           # (there is a slight race condition here, but it's not very probable,
4646           # and we have no other way to check)
4647           current_mem = 0
4648         miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4649         if miss_mem > 0:
4650           raise errors.OpPrereqError("This change will prevent the instance"
4651                                      " from starting, due to %d MB of memory"
4652                                      " missing on its primary node" % miss_mem)
4653
4654       for node in instance.secondary_nodes:
4655         if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4656           self.warn.append("Can't get info from secondary node %s" % node)
4657         elif self.mem > nodeinfo[node]['memory_free']:
4658           self.warn.append("Not enough memory to failover instance to secondary"
4659                            " node %s" % node)
4660
4661     # Xen HVM device type checks
4662     if instance.hypervisor == constants.HT_XEN_HVM:
4663       if self.op.hvm_nic_type is not None:
4664         if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
4665           raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
4666                                      " HVM  hypervisor" % self.op.hvm_nic_type)
4667       if self.op.hvm_disk_type is not None:
4668         if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
4669           raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
4670                                      " HVM hypervisor" % self.op.hvm_disk_type)
4671
4672     return
4673
4674   def Exec(self, feedback_fn):
4675     """Modifies an instance.
4676
4677     All parameters take effect only at the next restart of the instance.
4678     """
4679     # Process here the warnings from CheckPrereq, as we don't have a
4680     # feedback_fn there.
4681     for warn in self.warn:
4682       feedback_fn("WARNING: %s" % warn)
4683
4684     result = []
4685     instance = self.instance
4686     if self.mem:
4687       instance.memory = self.mem
4688       result.append(("mem", self.mem))
4689     if self.vcpus:
4690       instance.vcpus = self.vcpus
4691       result.append(("vcpus",  self.vcpus))
4692     if self.do_ip:
4693       instance.nics[0].ip = self.ip
4694       result.append(("ip", self.ip))
4695     if self.bridge:
4696       instance.nics[0].bridge = self.bridge
4697       result.append(("bridge", self.bridge))
4698     if self.mac:
4699       instance.nics[0].mac = self.mac
4700       result.append(("mac", self.mac))
4701     if self.do_kernel_path:
4702       instance.kernel_path = self.kernel_path
4703       result.append(("kernel_path", self.kernel_path))
4704     if self.do_initrd_path:
4705       instance.initrd_path = self.initrd_path
4706       result.append(("initrd_path", self.initrd_path))
4707     if self.hvm_boot_order:
4708       if self.hvm_boot_order == constants.VALUE_DEFAULT:
4709         instance.hvm_boot_order = None
4710       else:
4711         instance.hvm_boot_order = self.hvm_boot_order
4712       result.append(("hvm_boot_order", self.hvm_boot_order))
4713     if self.hvm_acpi is not None:
4714       instance.hvm_acpi = self.hvm_acpi
4715       result.append(("hvm_acpi", self.hvm_acpi))
4716     if self.hvm_pae is not None:
4717       instance.hvm_pae = self.hvm_pae
4718       result.append(("hvm_pae", self.hvm_pae))
4719     if self.hvm_nic_type is not None:
4720       instance.hvm_nic_type = self.hvm_nic_type
4721       result.append(("hvm_nic_type", self.hvm_nic_type))
4722     if self.hvm_disk_type is not None:
4723       instance.hvm_disk_type = self.hvm_disk_type
4724       result.append(("hvm_disk_type", self.hvm_disk_type))
4725     if self.hvm_cdrom_image_path:
4726       if self.hvm_cdrom_image_path == constants.VALUE_NONE:
4727         instance.hvm_cdrom_image_path = None
4728       else:
4729         instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4730       result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4731     if self.vnc_bind_address:
4732       instance.vnc_bind_address = self.vnc_bind_address
4733       result.append(("vnc_bind_address", self.vnc_bind_address))
4734
4735     self.cfg.Update(instance)
4736
4737     return result
4738
4739
4740 class LUQueryExports(NoHooksLU):
4741   """Query the exports list
4742
4743   """
4744   _OP_REQP = ['nodes']
4745   REQ_BGL = False
4746
4747   def ExpandNames(self):
4748     self.needed_locks = {}
4749     self.share_locks[locking.LEVEL_NODE] = 1
4750     if not self.op.nodes:
4751       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4752     else:
4753       self.needed_locks[locking.LEVEL_NODE] = \
4754         _GetWantedNodes(self, self.op.nodes)
4755
4756   def CheckPrereq(self):
4757     """Check prerequisites.
4758
4759     """
4760     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4761
4762   def Exec(self, feedback_fn):
4763     """Compute the list of all the exported system images.
4764
4765     Returns:
4766       a dictionary with the structure node->(export-list)
4767       where export-list is a list of the instances exported on
4768       that node.
4769
4770     """
4771     return rpc.call_export_list(self.nodes)
4772
4773
4774 class LUExportInstance(LogicalUnit):
4775   """Export an instance to an image in the cluster.
4776
4777   """
4778   HPATH = "instance-export"
4779   HTYPE = constants.HTYPE_INSTANCE
4780   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4781   REQ_BGL = False
4782
4783   def ExpandNames(self):
4784     self._ExpandAndLockInstance()
4785     # FIXME: lock only instance primary and destination node
4786     #
4787     # Sad but true, for now we have do lock all nodes, as we don't know where
4788     # the previous export might be, and and in this LU we search for it and
4789     # remove it from its current node. In the future we could fix this by:
4790     #  - making a tasklet to search (share-lock all), then create the new one,
4791     #    then one to remove, after
4792     #  - removing the removal operation altoghether
4793     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4794
4795   def DeclareLocks(self, level):
4796     """Last minute lock declaration."""
4797     # All nodes are locked anyway, so nothing to do here.
4798
4799   def BuildHooksEnv(self):
4800     """Build hooks env.
4801
4802     This will run on the master, primary node and target node.
4803
4804     """
4805     env = {
4806       "EXPORT_NODE": self.op.target_node,
4807       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4808       }
4809     env.update(_BuildInstanceHookEnvByObject(self.instance))
4810     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4811           self.op.target_node]
4812     return env, nl, nl
4813
4814   def CheckPrereq(self):
4815     """Check prerequisites.
4816
4817     This checks that the instance and node names are valid.
4818
4819     """
4820     instance_name = self.op.instance_name
4821     self.instance = self.cfg.GetInstanceInfo(instance_name)
4822     assert self.instance is not None, \
4823           "Cannot retrieve locked instance %s" % self.op.instance_name
4824
4825     self.dst_node = self.cfg.GetNodeInfo(
4826       self.cfg.ExpandNodeName(self.op.target_node))
4827
4828     assert self.dst_node is not None, \
4829           "Cannot retrieve locked node %s" % self.op.target_node
4830
4831     # instance disk type verification
4832     for disk in self.instance.disks:
4833       if disk.dev_type == constants.LD_FILE:
4834         raise errors.OpPrereqError("Export not supported for instances with"
4835                                    " file-based disks")
4836
4837   def Exec(self, feedback_fn):
4838     """Export an instance to an image in the cluster.
4839
4840     """
4841     instance = self.instance
4842     dst_node = self.dst_node
4843     src_node = instance.primary_node
4844     if self.op.shutdown:
4845       # shutdown the instance, but not the disks
4846       if not rpc.call_instance_shutdown(src_node, instance):
4847         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4848                                  (instance.name, src_node))
4849
4850     vgname = self.cfg.GetVGName()
4851
4852     snap_disks = []
4853
4854     try:
4855       for disk in instance.disks:
4856         if disk.iv_name == "sda":
4857           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4858           new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4859
4860           if not new_dev_name:
4861             logger.Error("could not snapshot block device %s on node %s" %
4862                          (disk.logical_id[1], src_node))
4863           else:
4864             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4865                                       logical_id=(vgname, new_dev_name),
4866                                       physical_id=(vgname, new_dev_name),
4867                                       iv_name=disk.iv_name)
4868             snap_disks.append(new_dev)
4869
4870     finally:
4871       if self.op.shutdown and instance.status == "up":
4872         if not rpc.call_instance_start(src_node, instance, None):
4873           _ShutdownInstanceDisks(self, instance)
4874           raise errors.OpExecError("Could not start instance")
4875
4876     # TODO: check for size
4877
4878     cluster_name = self.cfg.GetClusterName()
4879     for dev in snap_disks:
4880       if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4881                                       instance, cluster_name):
4882         logger.Error("could not export block device %s from node %s to node %s"
4883                      % (dev.logical_id[1], src_node, dst_node.name))
4884       if not rpc.call_blockdev_remove(src_node, dev):
4885         logger.Error("could not remove snapshot block device %s from node %s" %
4886                      (dev.logical_id[1], src_node))
4887
4888     if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4889       logger.Error("could not finalize export for instance %s on node %s" %
4890                    (instance.name, dst_node.name))
4891
4892     nodelist = self.cfg.GetNodeList()
4893     nodelist.remove(dst_node.name)
4894
4895     # on one-node clusters nodelist will be empty after the removal
4896     # if we proceed the backup would be removed because OpQueryExports
4897     # substitutes an empty list with the full cluster node list.
4898     if nodelist:
4899       exportlist = rpc.call_export_list(nodelist)
4900       for node in exportlist:
4901         if instance.name in exportlist[node]:
4902           if not rpc.call_export_remove(node, instance.name):
4903             logger.Error("could not remove older export for instance %s"
4904                          " on node %s" % (instance.name, node))
4905
4906
4907 class LURemoveExport(NoHooksLU):
4908   """Remove exports related to the named instance.
4909
4910   """
4911   _OP_REQP = ["instance_name"]
4912   REQ_BGL = False
4913
4914   def ExpandNames(self):
4915     self.needed_locks = {}
4916     # We need all nodes to be locked in order for RemoveExport to work, but we
4917     # don't need to lock the instance itself, as nothing will happen to it (and
4918     # we can remove exports also for a removed instance)
4919     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4920
4921   def CheckPrereq(self):
4922     """Check prerequisites.
4923     """
4924     pass
4925
4926   def Exec(self, feedback_fn):
4927     """Remove any export.
4928
4929     """
4930     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4931     # If the instance was not found we'll try with the name that was passed in.
4932     # This will only work if it was an FQDN, though.
4933     fqdn_warn = False
4934     if not instance_name:
4935       fqdn_warn = True
4936       instance_name = self.op.instance_name
4937
4938     exportlist = rpc.call_export_list(self.acquired_locks[locking.LEVEL_NODE])
4939     found = False
4940     for node in exportlist:
4941       if instance_name in exportlist[node]:
4942         found = True
4943         if not rpc.call_export_remove(node, instance_name):
4944           logger.Error("could not remove export for instance %s"
4945                        " on node %s" % (instance_name, node))
4946
4947     if fqdn_warn and not found:
4948       feedback_fn("Export not found. If trying to remove an export belonging"
4949                   " to a deleted instance please use its Fully Qualified"
4950                   " Domain Name.")
4951
4952
4953 class TagsLU(NoHooksLU):
4954   """Generic tags LU.
4955
4956   This is an abstract class which is the parent of all the other tags LUs.
4957
4958   """
4959
4960   def ExpandNames(self):
4961     self.needed_locks = {}
4962     if self.op.kind == constants.TAG_NODE:
4963       name = self.cfg.ExpandNodeName(self.op.name)
4964       if name is None:
4965         raise errors.OpPrereqError("Invalid node name (%s)" %
4966                                    (self.op.name,))
4967       self.op.name = name
4968       self.needed_locks[locking.LEVEL_NODE] = name
4969     elif self.op.kind == constants.TAG_INSTANCE:
4970       name = self.cfg.ExpandInstanceName(self.op.name)
4971       if name is None:
4972         raise errors.OpPrereqError("Invalid instance name (%s)" %
4973                                    (self.op.name,))
4974       self.op.name = name
4975       self.needed_locks[locking.LEVEL_INSTANCE] = name
4976
4977   def CheckPrereq(self):
4978     """Check prerequisites.
4979
4980     """
4981     if self.op.kind == constants.TAG_CLUSTER:
4982       self.target = self.cfg.GetClusterInfo()
4983     elif self.op.kind == constants.TAG_NODE:
4984       self.target = self.cfg.GetNodeInfo(self.op.name)
4985     elif self.op.kind == constants.TAG_INSTANCE:
4986       self.target = self.cfg.GetInstanceInfo(self.op.name)
4987     else:
4988       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4989                                  str(self.op.kind))
4990
4991
4992 class LUGetTags(TagsLU):
4993   """Returns the tags of a given object.
4994
4995   """
4996   _OP_REQP = ["kind", "name"]
4997   REQ_BGL = False
4998
4999   def Exec(self, feedback_fn):
5000     """Returns the tag list.
5001
5002     """
5003     return list(self.target.GetTags())
5004
5005
5006 class LUSearchTags(NoHooksLU):
5007   """Searches the tags for a given pattern.
5008
5009   """
5010   _OP_REQP = ["pattern"]
5011   REQ_BGL = False
5012
5013   def ExpandNames(self):
5014     self.needed_locks = {}
5015
5016   def CheckPrereq(self):
5017     """Check prerequisites.
5018
5019     This checks the pattern passed for validity by compiling it.
5020
5021     """
5022     try:
5023       self.re = re.compile(self.op.pattern)
5024     except re.error, err:
5025       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5026                                  (self.op.pattern, err))
5027
5028   def Exec(self, feedback_fn):
5029     """Returns the tag list.
5030
5031     """
5032     cfg = self.cfg
5033     tgts = [("/cluster", cfg.GetClusterInfo())]
5034     ilist = cfg.GetAllInstancesInfo().values()
5035     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5036     nlist = cfg.GetAllNodesInfo().values()
5037     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5038     results = []
5039     for path, target in tgts:
5040       for tag in target.GetTags():
5041         if self.re.search(tag):
5042           results.append((path, tag))
5043     return results
5044
5045
5046 class LUAddTags(TagsLU):
5047   """Sets a tag on a given object.
5048
5049   """
5050   _OP_REQP = ["kind", "name", "tags"]
5051   REQ_BGL = False
5052
5053   def CheckPrereq(self):
5054     """Check prerequisites.
5055
5056     This checks the type and length of the tag name and value.
5057
5058     """
5059     TagsLU.CheckPrereq(self)
5060     for tag in self.op.tags:
5061       objects.TaggableObject.ValidateTag(tag)
5062
5063   def Exec(self, feedback_fn):
5064     """Sets the tag.
5065
5066     """
5067     try:
5068       for tag in self.op.tags:
5069         self.target.AddTag(tag)
5070     except errors.TagError, err:
5071       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5072     try:
5073       self.cfg.Update(self.target)
5074     except errors.ConfigurationError:
5075       raise errors.OpRetryError("There has been a modification to the"
5076                                 " config file and the operation has been"
5077                                 " aborted. Please retry.")
5078
5079
5080 class LUDelTags(TagsLU):
5081   """Delete a list of tags from a given object.
5082
5083   """
5084   _OP_REQP = ["kind", "name", "tags"]
5085   REQ_BGL = False
5086
5087   def CheckPrereq(self):
5088     """Check prerequisites.
5089
5090     This checks that we have the given tag.
5091
5092     """
5093     TagsLU.CheckPrereq(self)
5094     for tag in self.op.tags:
5095       objects.TaggableObject.ValidateTag(tag)
5096     del_tags = frozenset(self.op.tags)
5097     cur_tags = self.target.GetTags()
5098     if not del_tags <= cur_tags:
5099       diff_tags = del_tags - cur_tags
5100       diff_names = ["'%s'" % tag for tag in diff_tags]
5101       diff_names.sort()
5102       raise errors.OpPrereqError("Tag(s) %s not found" %
5103                                  (",".join(diff_names)))
5104
5105   def Exec(self, feedback_fn):
5106     """Remove the tag from the object.
5107
5108     """
5109     for tag in self.op.tags:
5110       self.target.RemoveTag(tag)
5111     try:
5112       self.cfg.Update(self.target)
5113     except errors.ConfigurationError:
5114       raise errors.OpRetryError("There has been a modification to the"
5115                                 " config file and the operation has been"
5116                                 " aborted. Please retry.")
5117
5118
5119 class LUTestDelay(NoHooksLU):
5120   """Sleep for a specified amount of time.
5121
5122   This LU sleeps on the master and/or nodes for a specified amount of
5123   time.
5124
5125   """
5126   _OP_REQP = ["duration", "on_master", "on_nodes"]
5127   REQ_BGL = False
5128
5129   def ExpandNames(self):
5130     """Expand names and set required locks.
5131
5132     This expands the node list, if any.
5133
5134     """
5135     self.needed_locks = {}
5136     if self.op.on_nodes:
5137       # _GetWantedNodes can be used here, but is not always appropriate to use
5138       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5139       # more information.
5140       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5141       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5142
5143   def CheckPrereq(self):
5144     """Check prerequisites.
5145
5146     """
5147
5148   def Exec(self, feedback_fn):
5149     """Do the actual sleep.
5150
5151     """
5152     if self.op.on_master:
5153       if not utils.TestDelay(self.op.duration):
5154         raise errors.OpExecError("Error during master delay test")
5155     if self.op.on_nodes:
5156       result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5157       if not result:
5158         raise errors.OpExecError("Complete failure from rpc call")
5159       for node, node_result in result.items():
5160         if not node_result:
5161           raise errors.OpExecError("Failure during rpc call to node %s,"
5162                                    " result: %s" % (node, node_result))
5163
5164
5165 class IAllocator(object):
5166   """IAllocator framework.
5167
5168   An IAllocator instance has three sets of attributes:
5169     - cfg that is needed to query the cluster
5170     - input data (all members of the _KEYS class attribute are required)
5171     - four buffer attributes (in|out_data|text), that represent the
5172       input (to the external script) in text and data structure format,
5173       and the output from it, again in two formats
5174     - the result variables from the script (success, info, nodes) for
5175       easy usage
5176
5177   """
5178   _ALLO_KEYS = [
5179     "mem_size", "disks", "disk_template",
5180     "os", "tags", "nics", "vcpus",
5181     ]
5182   _RELO_KEYS = [
5183     "relocate_from",
5184     ]
5185
5186   def __init__(self, cfg, mode, name, **kwargs):
5187     self.cfg = cfg
5188     # init buffer variables
5189     self.in_text = self.out_text = self.in_data = self.out_data = None
5190     # init all input fields so that pylint is happy
5191     self.mode = mode
5192     self.name = name
5193     self.mem_size = self.disks = self.disk_template = None
5194     self.os = self.tags = self.nics = self.vcpus = None
5195     self.relocate_from = None
5196     # computed fields
5197     self.required_nodes = None
5198     # init result fields
5199     self.success = self.info = self.nodes = None
5200     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5201       keyset = self._ALLO_KEYS
5202     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5203       keyset = self._RELO_KEYS
5204     else:
5205       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5206                                    " IAllocator" % self.mode)
5207     for key in kwargs:
5208       if key not in keyset:
5209         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5210                                      " IAllocator" % key)
5211       setattr(self, key, kwargs[key])
5212     for key in keyset:
5213       if key not in kwargs:
5214         raise errors.ProgrammerError("Missing input parameter '%s' to"
5215                                      " IAllocator" % key)
5216     self._BuildInputData()
5217
5218   def _ComputeClusterData(self):
5219     """Compute the generic allocator input data.
5220
5221     This is the data that is independent of the actual operation.
5222
5223     """
5224     cfg = self.cfg
5225     cluster_info = cfg.GetClusterInfo()
5226     # cluster data
5227     data = {
5228       "version": 1,
5229       "cluster_name": self.cfg.GetClusterName(),
5230       "cluster_tags": list(cluster_info.GetTags()),
5231       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5232       # we don't have job IDs
5233       }
5234
5235     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5236
5237     # node data
5238     node_results = {}
5239     node_list = cfg.GetNodeList()
5240     # FIXME: here we have only one hypervisor information, but
5241     # instance can belong to different hypervisors
5242     node_data = rpc.call_node_info(node_list, cfg.GetVGName(),
5243                                    cfg.GetHypervisorType())
5244     for nname in node_list:
5245       ninfo = cfg.GetNodeInfo(nname)
5246       if nname not in node_data or not isinstance(node_data[nname], dict):
5247         raise errors.OpExecError("Can't get data for node %s" % nname)
5248       remote_info = node_data[nname]
5249       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5250                    'vg_size', 'vg_free', 'cpu_total']:
5251         if attr not in remote_info:
5252           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5253                                    (nname, attr))
5254         try:
5255           remote_info[attr] = int(remote_info[attr])
5256         except ValueError, err:
5257           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5258                                    " %s" % (nname, attr, str(err)))
5259       # compute memory used by primary instances
5260       i_p_mem = i_p_up_mem = 0
5261       for iinfo in i_list:
5262         if iinfo.primary_node == nname:
5263           i_p_mem += iinfo.memory
5264           if iinfo.status == "up":
5265             i_p_up_mem += iinfo.memory
5266
5267       # compute memory used by instances
5268       pnr = {
5269         "tags": list(ninfo.GetTags()),
5270         "total_memory": remote_info['memory_total'],
5271         "reserved_memory": remote_info['memory_dom0'],
5272         "free_memory": remote_info['memory_free'],
5273         "i_pri_memory": i_p_mem,
5274         "i_pri_up_memory": i_p_up_mem,
5275         "total_disk": remote_info['vg_size'],
5276         "free_disk": remote_info['vg_free'],
5277         "primary_ip": ninfo.primary_ip,
5278         "secondary_ip": ninfo.secondary_ip,
5279         "total_cpus": remote_info['cpu_total'],
5280         }
5281       node_results[nname] = pnr
5282     data["nodes"] = node_results
5283
5284     # instance data
5285     instance_data = {}
5286     for iinfo in i_list:
5287       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5288                   for n in iinfo.nics]
5289       pir = {
5290         "tags": list(iinfo.GetTags()),
5291         "should_run": iinfo.status == "up",
5292         "vcpus": iinfo.vcpus,
5293         "memory": iinfo.memory,
5294         "os": iinfo.os,
5295         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5296         "nics": nic_data,
5297         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5298         "disk_template": iinfo.disk_template,
5299         "hypervisor": iinfo.hypervisor,
5300         }
5301       instance_data[iinfo.name] = pir
5302
5303     data["instances"] = instance_data
5304
5305     self.in_data = data
5306
5307   def _AddNewInstance(self):
5308     """Add new instance data to allocator structure.
5309
5310     This in combination with _AllocatorGetClusterData will create the
5311     correct structure needed as input for the allocator.
5312
5313     The checks for the completeness of the opcode must have already been
5314     done.
5315
5316     """
5317     data = self.in_data
5318     if len(self.disks) != 2:
5319       raise errors.OpExecError("Only two-disk configurations supported")
5320
5321     disk_space = _ComputeDiskSize(self.disk_template,
5322                                   self.disks[0]["size"], self.disks[1]["size"])
5323
5324     if self.disk_template in constants.DTS_NET_MIRROR:
5325       self.required_nodes = 2
5326     else:
5327       self.required_nodes = 1
5328     request = {
5329       "type": "allocate",
5330       "name": self.name,
5331       "disk_template": self.disk_template,
5332       "tags": self.tags,
5333       "os": self.os,
5334       "vcpus": self.vcpus,
5335       "memory": self.mem_size,
5336       "disks": self.disks,
5337       "disk_space_total": disk_space,
5338       "nics": self.nics,
5339       "required_nodes": self.required_nodes,
5340       }
5341     data["request"] = request
5342
5343   def _AddRelocateInstance(self):
5344     """Add relocate instance data to allocator structure.
5345
5346     This in combination with _IAllocatorGetClusterData will create the
5347     correct structure needed as input for the allocator.
5348
5349     The checks for the completeness of the opcode must have already been
5350     done.
5351
5352     """
5353     instance = self.cfg.GetInstanceInfo(self.name)
5354     if instance is None:
5355       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5356                                    " IAllocator" % self.name)
5357
5358     if instance.disk_template not in constants.DTS_NET_MIRROR:
5359       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5360
5361     if len(instance.secondary_nodes) != 1:
5362       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5363
5364     self.required_nodes = 1
5365
5366     disk_space = _ComputeDiskSize(instance.disk_template,
5367                                   instance.disks[0].size,
5368                                   instance.disks[1].size)
5369
5370     request = {
5371       "type": "relocate",
5372       "name": self.name,
5373       "disk_space_total": disk_space,
5374       "required_nodes": self.required_nodes,
5375       "relocate_from": self.relocate_from,
5376       }
5377     self.in_data["request"] = request
5378
5379   def _BuildInputData(self):
5380     """Build input data structures.
5381
5382     """
5383     self._ComputeClusterData()
5384
5385     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5386       self._AddNewInstance()
5387     else:
5388       self._AddRelocateInstance()
5389
5390     self.in_text = serializer.Dump(self.in_data)
5391
5392   def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
5393     """Run an instance allocator and return the results.
5394
5395     """
5396     data = self.in_text
5397
5398     result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
5399
5400     if not isinstance(result, (list, tuple)) or len(result) != 4:
5401       raise errors.OpExecError("Invalid result from master iallocator runner")
5402
5403     rcode, stdout, stderr, fail = result
5404
5405     if rcode == constants.IARUN_NOTFOUND:
5406       raise errors.OpExecError("Can't find allocator '%s'" % name)
5407     elif rcode == constants.IARUN_FAILURE:
5408       raise errors.OpExecError("Instance allocator call failed: %s,"
5409                                " output: %s" % (fail, stdout+stderr))
5410     self.out_text = stdout
5411     if validate:
5412       self._ValidateResult()
5413
5414   def _ValidateResult(self):
5415     """Process the allocator results.
5416
5417     This will process and if successful save the result in
5418     self.out_data and the other parameters.
5419
5420     """
5421     try:
5422       rdict = serializer.Load(self.out_text)
5423     except Exception, err:
5424       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5425
5426     if not isinstance(rdict, dict):
5427       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5428
5429     for key in "success", "info", "nodes":
5430       if key not in rdict:
5431         raise errors.OpExecError("Can't parse iallocator results:"
5432                                  " missing key '%s'" % key)
5433       setattr(self, key, rdict[key])
5434
5435     if not isinstance(rdict["nodes"], list):
5436       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5437                                " is not a list")
5438     self.out_data = rdict
5439
5440
5441 class LUTestAllocator(NoHooksLU):
5442   """Run allocator tests.
5443
5444   This LU runs the allocator tests
5445
5446   """
5447   _OP_REQP = ["direction", "mode", "name"]
5448
5449   def CheckPrereq(self):
5450     """Check prerequisites.
5451
5452     This checks the opcode parameters depending on the director and mode test.
5453
5454     """
5455     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5456       for attr in ["name", "mem_size", "disks", "disk_template",
5457                    "os", "tags", "nics", "vcpus"]:
5458         if not hasattr(self.op, attr):
5459           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5460                                      attr)
5461       iname = self.cfg.ExpandInstanceName(self.op.name)
5462       if iname is not None:
5463         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5464                                    iname)
5465       if not isinstance(self.op.nics, list):
5466         raise errors.OpPrereqError("Invalid parameter 'nics'")
5467       for row in self.op.nics:
5468         if (not isinstance(row, dict) or
5469             "mac" not in row or
5470             "ip" not in row or
5471             "bridge" not in row):
5472           raise errors.OpPrereqError("Invalid contents of the"
5473                                      " 'nics' parameter")
5474       if not isinstance(self.op.disks, list):
5475         raise errors.OpPrereqError("Invalid parameter 'disks'")
5476       if len(self.op.disks) != 2:
5477         raise errors.OpPrereqError("Only two-disk configurations supported")
5478       for row in self.op.disks:
5479         if (not isinstance(row, dict) or
5480             "size" not in row or
5481             not isinstance(row["size"], int) or
5482             "mode" not in row or
5483             row["mode"] not in ['r', 'w']):
5484           raise errors.OpPrereqError("Invalid contents of the"
5485                                      " 'disks' parameter")
5486     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5487       if not hasattr(self.op, "name"):
5488         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5489       fname = self.cfg.ExpandInstanceName(self.op.name)
5490       if fname is None:
5491         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5492                                    self.op.name)
5493       self.op.name = fname
5494       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5495     else:
5496       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5497                                  self.op.mode)
5498
5499     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5500       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5501         raise errors.OpPrereqError("Missing allocator name")
5502     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5503       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5504                                  self.op.direction)
5505
5506   def Exec(self, feedback_fn):
5507     """Run the allocator test.
5508
5509     """
5510     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5511       ial = IAllocator(self.cfg,
5512                        mode=self.op.mode,
5513                        name=self.op.name,
5514                        mem_size=self.op.mem_size,
5515                        disks=self.op.disks,
5516                        disk_template=self.op.disk_template,
5517                        os=self.op.os,
5518                        tags=self.op.tags,
5519                        nics=self.op.nics,
5520                        vcpus=self.op.vcpus,
5521                        )
5522     else:
5523       ial = IAllocator(self.cfg,
5524                        mode=self.op.mode,
5525                        name=self.op.name,
5526                        relocate_from=list(self.relocate_from),
5527                        )
5528
5529     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5530       result = ial.in_text
5531     else:
5532       ial.Run(self.op.allocator, validate=False)
5533       result = ial.out_text
5534     return result