Change over to beparams
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_MASTER: the LU needs to run on the master node
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_MASTER = True
68   REQ_BGL = True
69
70   def __init__(self, processor, op, context, rpc):
71     """Constructor for LogicalUnit.
72
73     This needs to be overriden in derived classes in order to check op
74     validity.
75
76     """
77     self.proc = processor
78     self.op = op
79     self.cfg = context.cfg
80     self.context = context
81     self.rpc = rpc
82     # Dicts used to declare locking needs to mcpu
83     self.needed_locks = None
84     self.acquired_locks = {}
85     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86     self.add_locks = {}
87     self.remove_locks = {}
88     # Used to force good behavior when calling helper functions
89     self.recalculate_locks = {}
90     self.__ssh = None
91
92     for attr_name in self._OP_REQP:
93       attr_val = getattr(op, attr_name, None)
94       if attr_val is None:
95         raise errors.OpPrereqError("Required parameter '%s' missing" %
96                                    attr_name)
97
98     if not self.cfg.IsCluster():
99       raise errors.OpPrereqError("Cluster not initialized yet,"
100                                  " use 'gnt-cluster init' first.")
101     if self.REQ_MASTER:
102       master = self.cfg.GetMasterNode()
103       if master != utils.HostInfo().name:
104         raise errors.OpPrereqError("Commands must be run on the master"
105                                    " node %s" % master)
106
107   def __GetSSH(self):
108     """Returns the SshRunner object
109
110     """
111     if not self.__ssh:
112       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113     return self.__ssh
114
115   ssh = property(fget=__GetSSH)
116
117   def ExpandNames(self):
118     """Expand names for this LU.
119
120     This method is called before starting to execute the opcode, and it should
121     update all the parameters of the opcode to their canonical form (e.g. a
122     short node name must be fully expanded after this method has successfully
123     completed). This way locking, hooks, logging, ecc. can work correctly.
124
125     LUs which implement this method must also populate the self.needed_locks
126     member, as a dict with lock levels as keys, and a list of needed lock names
127     as values. Rules:
128       - Use an empty dict if you don't need any lock
129       - If you don't need any lock at a particular level omit that level
130       - Don't put anything for the BGL level
131       - If you want all locks at a level use locking.ALL_SET as a value
132
133     If you need to share locks (rather than acquire them exclusively) at one
134     level you can modify self.share_locks, setting a true value (usually 1) for
135     that level. By default locks are not shared.
136
137     Examples:
138     # Acquire all nodes and one instance
139     self.needed_locks = {
140       locking.LEVEL_NODE: locking.ALL_SET,
141       locking.LEVEL_INSTANCE: ['instance1.example.tld'],
142     }
143     # Acquire just two nodes
144     self.needed_locks = {
145       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
146     }
147     # Acquire no locks
148     self.needed_locks = {} # No, you can't leave it to the default value None
149
150     """
151     # The implementation of this method is mandatory only if the new LU is
152     # concurrent, so that old LUs don't need to be changed all at the same
153     # time.
154     if self.REQ_BGL:
155       self.needed_locks = {} # Exclusive LUs don't need locks.
156     else:
157       raise NotImplementedError
158
159   def DeclareLocks(self, level):
160     """Declare LU locking needs for a level
161
162     While most LUs can just declare their locking needs at ExpandNames time,
163     sometimes there's the need to calculate some locks after having acquired
164     the ones before. This function is called just before acquiring locks at a
165     particular level, but after acquiring the ones at lower levels, and permits
166     such calculations. It can be used to modify self.needed_locks, and by
167     default it does nothing.
168
169     This function is only called if you have something already set in
170     self.needed_locks for the level.
171
172     @param level: Locking level which is going to be locked
173     @type level: member of ganeti.locking.LEVELS
174
175     """
176
177   def CheckPrereq(self):
178     """Check prerequisites for this LU.
179
180     This method should check that the prerequisites for the execution
181     of this LU are fulfilled. It can do internode communication, but
182     it should be idempotent - no cluster or system changes are
183     allowed.
184
185     The method should raise errors.OpPrereqError in case something is
186     not fulfilled. Its return value is ignored.
187
188     This method should also update all the parameters of the opcode to
189     their canonical form if it hasn't been done by ExpandNames before.
190
191     """
192     raise NotImplementedError
193
194   def Exec(self, feedback_fn):
195     """Execute the LU.
196
197     This method should implement the actual work. It should raise
198     errors.OpExecError for failures that are somewhat dealt with in
199     code, or expected.
200
201     """
202     raise NotImplementedError
203
204   def BuildHooksEnv(self):
205     """Build hooks environment for this LU.
206
207     This method should return a three-node tuple consisting of: a dict
208     containing the environment that will be used for running the
209     specific hook for this LU, a list of node names on which the hook
210     should run before the execution, and a list of node names on which
211     the hook should run after the execution.
212
213     The keys of the dict must not have 'GANETI_' prefixed as this will
214     be handled in the hooks runner. Also note additional keys will be
215     added by the hooks runner. If the LU doesn't define any
216     environment, an empty dict (and not None) should be returned.
217
218     No nodes should be returned as an empty list (and not None).
219
220     Note that if the HPATH for a LU class is None, this function will
221     not be called.
222
223     """
224     raise NotImplementedError
225
226   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
227     """Notify the LU about the results of its hooks.
228
229     This method is called every time a hooks phase is executed, and notifies
230     the Logical Unit about the hooks' result. The LU can then use it to alter
231     its result based on the hooks.  By default the method does nothing and the
232     previous result is passed back unchanged but any LU can define it if it
233     wants to use the local cluster hook-scripts somehow.
234
235     Args:
236       phase: the hooks phase that has just been run
237       hooks_results: the results of the multi-node hooks rpc call
238       feedback_fn: function to send feedback back to the caller
239       lu_result: the previous result this LU had, or None in the PRE phase.
240
241     """
242     return lu_result
243
244   def _ExpandAndLockInstance(self):
245     """Helper function to expand and lock an instance.
246
247     Many LUs that work on an instance take its name in self.op.instance_name
248     and need to expand it and then declare the expanded name for locking. This
249     function does it, and then updates self.op.instance_name to the expanded
250     name. It also initializes needed_locks as a dict, if this hasn't been done
251     before.
252
253     """
254     if self.needed_locks is None:
255       self.needed_locks = {}
256     else:
257       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
258         "_ExpandAndLockInstance called with instance-level locks set"
259     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
260     if expanded_name is None:
261       raise errors.OpPrereqError("Instance '%s' not known" %
262                                   self.op.instance_name)
263     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
264     self.op.instance_name = expanded_name
265
266   def _LockInstancesNodes(self, primary_only=False):
267     """Helper function to declare instances' nodes for locking.
268
269     This function should be called after locking one or more instances to lock
270     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
271     with all primary or secondary nodes for instances already locked and
272     present in self.needed_locks[locking.LEVEL_INSTANCE].
273
274     It should be called from DeclareLocks, and for safety only works if
275     self.recalculate_locks[locking.LEVEL_NODE] is set.
276
277     In the future it may grow parameters to just lock some instance's nodes, or
278     to just lock primaries or secondary nodes, if needed.
279
280     If should be called in DeclareLocks in a way similar to:
281
282     if level == locking.LEVEL_NODE:
283       self._LockInstancesNodes()
284
285     @type primary_only: boolean
286     @param primary_only: only lock primary nodes of locked instances
287
288     """
289     assert locking.LEVEL_NODE in self.recalculate_locks, \
290       "_LockInstancesNodes helper function called with no nodes to recalculate"
291
292     # TODO: check if we're really been called with the instance locks held
293
294     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
295     # future we might want to have different behaviors depending on the value
296     # of self.recalculate_locks[locking.LEVEL_NODE]
297     wanted_nodes = []
298     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
299       instance = self.context.cfg.GetInstanceInfo(instance_name)
300       wanted_nodes.append(instance.primary_node)
301       if not primary_only:
302         wanted_nodes.extend(instance.secondary_nodes)
303
304     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
305       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
306     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
307       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
308
309     del self.recalculate_locks[locking.LEVEL_NODE]
310
311
312 class NoHooksLU(LogicalUnit):
313   """Simple LU which runs no hooks.
314
315   This LU is intended as a parent for other LogicalUnits which will
316   run no hooks, in order to reduce duplicate code.
317
318   """
319   HPATH = None
320   HTYPE = None
321
322
323 def _GetWantedNodes(lu, nodes):
324   """Returns list of checked and expanded node names.
325
326   Args:
327     nodes: List of nodes (strings) or None for all
328
329   """
330   if not isinstance(nodes, list):
331     raise errors.OpPrereqError("Invalid argument type 'nodes'")
332
333   if not nodes:
334     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
335       " non-empty list of nodes whose name is to be expanded.")
336
337   wanted = []
338   for name in nodes:
339     node = lu.cfg.ExpandNodeName(name)
340     if node is None:
341       raise errors.OpPrereqError("No such node name '%s'" % name)
342     wanted.append(node)
343
344   return utils.NiceSort(wanted)
345
346
347 def _GetWantedInstances(lu, instances):
348   """Returns list of checked and expanded instance names.
349
350   Args:
351     instances: List of instances (strings) or None for all
352
353   """
354   if not isinstance(instances, list):
355     raise errors.OpPrereqError("Invalid argument type 'instances'")
356
357   if instances:
358     wanted = []
359
360     for name in instances:
361       instance = lu.cfg.ExpandInstanceName(name)
362       if instance is None:
363         raise errors.OpPrereqError("No such instance name '%s'" % name)
364       wanted.append(instance)
365
366   else:
367     wanted = lu.cfg.GetInstanceList()
368   return utils.NiceSort(wanted)
369
370
371 def _CheckOutputFields(static, dynamic, selected):
372   """Checks whether all selected fields are valid.
373
374   Args:
375     static: Static fields
376     dynamic: Dynamic fields
377
378   """
379   static_fields = frozenset(static)
380   dynamic_fields = frozenset(dynamic)
381
382   all_fields = static_fields | dynamic_fields
383
384   if not all_fields.issuperset(selected):
385     raise errors.OpPrereqError("Unknown output fields selected: %s"
386                                % ",".join(frozenset(selected).
387                                           difference(all_fields)))
388
389
390 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
391                           memory, vcpus, nics):
392   """Builds instance related env variables for hooks from single variables.
393
394   Args:
395     secondary_nodes: List of secondary nodes as strings
396   """
397   env = {
398     "OP_TARGET": name,
399     "INSTANCE_NAME": name,
400     "INSTANCE_PRIMARY": primary_node,
401     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
402     "INSTANCE_OS_TYPE": os_type,
403     "INSTANCE_STATUS": status,
404     "INSTANCE_MEMORY": memory,
405     "INSTANCE_VCPUS": vcpus,
406   }
407
408   if nics:
409     nic_count = len(nics)
410     for idx, (ip, bridge, mac) in enumerate(nics):
411       if ip is None:
412         ip = ""
413       env["INSTANCE_NIC%d_IP" % idx] = ip
414       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
415       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
416   else:
417     nic_count = 0
418
419   env["INSTANCE_NIC_COUNT"] = nic_count
420
421   return env
422
423
424 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
425   """Builds instance related env variables for hooks from an object.
426
427   Args:
428     instance: objects.Instance object of instance
429     override: dict of values to override
430   """
431   bep = lu.cfg.GetClusterInfo().FillBE(instance)
432   args = {
433     'name': instance.name,
434     'primary_node': instance.primary_node,
435     'secondary_nodes': instance.secondary_nodes,
436     'os_type': instance.os,
437     'status': instance.os,
438     'memory': bep[constants.BE_MEMORY],
439     'vcpus': bep[constants.BE_VCPUS],
440     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
441   }
442   if override:
443     args.update(override)
444   return _BuildInstanceHookEnv(**args)
445
446
447 def _CheckInstanceBridgesExist(lu, instance):
448   """Check that the brigdes needed by an instance exist.
449
450   """
451   # check bridges existance
452   brlist = [nic.bridge for nic in instance.nics]
453   if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
454     raise errors.OpPrereqError("one or more target bridges %s does not"
455                                " exist on destination node '%s'" %
456                                (brlist, instance.primary_node))
457
458
459 class LUDestroyCluster(NoHooksLU):
460   """Logical unit for destroying the cluster.
461
462   """
463   _OP_REQP = []
464
465   def CheckPrereq(self):
466     """Check prerequisites.
467
468     This checks whether the cluster is empty.
469
470     Any errors are signalled by raising errors.OpPrereqError.
471
472     """
473     master = self.cfg.GetMasterNode()
474
475     nodelist = self.cfg.GetNodeList()
476     if len(nodelist) != 1 or nodelist[0] != master:
477       raise errors.OpPrereqError("There are still %d node(s) in"
478                                  " this cluster." % (len(nodelist) - 1))
479     instancelist = self.cfg.GetInstanceList()
480     if instancelist:
481       raise errors.OpPrereqError("There are still %d instance(s) in"
482                                  " this cluster." % len(instancelist))
483
484   def Exec(self, feedback_fn):
485     """Destroys the cluster.
486
487     """
488     master = self.cfg.GetMasterNode()
489     if not self.rpc.call_node_stop_master(master, False):
490       raise errors.OpExecError("Could not disable the master role")
491     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
492     utils.CreateBackup(priv_key)
493     utils.CreateBackup(pub_key)
494     return master
495
496
497 class LUVerifyCluster(LogicalUnit):
498   """Verifies the cluster status.
499
500   """
501   HPATH = "cluster-verify"
502   HTYPE = constants.HTYPE_CLUSTER
503   _OP_REQP = ["skip_checks"]
504   REQ_BGL = False
505
506   def ExpandNames(self):
507     self.needed_locks = {
508       locking.LEVEL_NODE: locking.ALL_SET,
509       locking.LEVEL_INSTANCE: locking.ALL_SET,
510     }
511     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
512
513   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
514                   remote_version, feedback_fn):
515     """Run multiple tests against a node.
516
517     Test list:
518       - compares ganeti version
519       - checks vg existance and size > 20G
520       - checks config file checksum
521       - checks ssh to other nodes
522
523     Args:
524       node: name of the node to check
525       file_list: required list of files
526       local_cksum: dictionary of local files and their checksums
527
528     """
529     # compares ganeti version
530     local_version = constants.PROTOCOL_VERSION
531     if not remote_version:
532       feedback_fn("  - ERROR: connection to %s failed" % (node))
533       return True
534
535     if local_version != remote_version:
536       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
537                       (local_version, node, remote_version))
538       return True
539
540     # checks vg existance and size > 20G
541
542     bad = False
543     if not vglist:
544       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
545                       (node,))
546       bad = True
547     else:
548       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
549                                             constants.MIN_VG_SIZE)
550       if vgstatus:
551         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
552         bad = True
553
554     if not node_result:
555       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
556       return True
557
558     # checks config file checksum
559     # checks ssh to any
560
561     if 'filelist' not in node_result:
562       bad = True
563       feedback_fn("  - ERROR: node hasn't returned file checksum data")
564     else:
565       remote_cksum = node_result['filelist']
566       for file_name in file_list:
567         if file_name not in remote_cksum:
568           bad = True
569           feedback_fn("  - ERROR: file '%s' missing" % file_name)
570         elif remote_cksum[file_name] != local_cksum[file_name]:
571           bad = True
572           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
573
574     if 'nodelist' not in node_result:
575       bad = True
576       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
577     else:
578       if node_result['nodelist']:
579         bad = True
580         for node in node_result['nodelist']:
581           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
582                           (node, node_result['nodelist'][node]))
583     if 'node-net-test' not in node_result:
584       bad = True
585       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
586     else:
587       if node_result['node-net-test']:
588         bad = True
589         nlist = utils.NiceSort(node_result['node-net-test'].keys())
590         for node in nlist:
591           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
592                           (node, node_result['node-net-test'][node]))
593
594     hyp_result = node_result.get('hypervisor', None)
595     if isinstance(hyp_result, dict):
596       for hv_name, hv_result in hyp_result.iteritems():
597         if hv_result is not None:
598           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
599                       (hv_name, hv_result))
600     return bad
601
602   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
603                       node_instance, feedback_fn):
604     """Verify an instance.
605
606     This function checks to see if the required block devices are
607     available on the instance's node.
608
609     """
610     bad = False
611
612     node_current = instanceconfig.primary_node
613
614     node_vol_should = {}
615     instanceconfig.MapLVsByNode(node_vol_should)
616
617     for node in node_vol_should:
618       for volume in node_vol_should[node]:
619         if node not in node_vol_is or volume not in node_vol_is[node]:
620           feedback_fn("  - ERROR: volume %s missing on node %s" %
621                           (volume, node))
622           bad = True
623
624     if not instanceconfig.status == 'down':
625       if (node_current not in node_instance or
626           not instance in node_instance[node_current]):
627         feedback_fn("  - ERROR: instance %s not running on node %s" %
628                         (instance, node_current))
629         bad = True
630
631     for node in node_instance:
632       if (not node == node_current):
633         if instance in node_instance[node]:
634           feedback_fn("  - ERROR: instance %s should not run on node %s" %
635                           (instance, node))
636           bad = True
637
638     return bad
639
640   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
641     """Verify if there are any unknown volumes in the cluster.
642
643     The .os, .swap and backup volumes are ignored. All other volumes are
644     reported as unknown.
645
646     """
647     bad = False
648
649     for node in node_vol_is:
650       for volume in node_vol_is[node]:
651         if node not in node_vol_should or volume not in node_vol_should[node]:
652           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
653                       (volume, node))
654           bad = True
655     return bad
656
657   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
658     """Verify the list of running instances.
659
660     This checks what instances are running but unknown to the cluster.
661
662     """
663     bad = False
664     for node in node_instance:
665       for runninginstance in node_instance[node]:
666         if runninginstance not in instancelist:
667           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
668                           (runninginstance, node))
669           bad = True
670     return bad
671
672   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
673     """Verify N+1 Memory Resilience.
674
675     Check that if one single node dies we can still start all the instances it
676     was primary for.
677
678     """
679     bad = False
680
681     for node, nodeinfo in node_info.iteritems():
682       # This code checks that every node which is now listed as secondary has
683       # enough memory to host all instances it is supposed to should a single
684       # other node in the cluster fail.
685       # FIXME: not ready for failover to an arbitrary node
686       # FIXME: does not support file-backed instances
687       # WARNING: we currently take into account down instances as well as up
688       # ones, considering that even if they're down someone might want to start
689       # them even in the event of a node failure.
690       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
691         needed_mem = 0
692         for instance in instances:
693           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
694           needed_mem += bep[constants.BE_MEMORY]
695         if nodeinfo['mfree'] < needed_mem:
696           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
697                       " failovers should node %s fail" % (node, prinode))
698           bad = True
699     return bad
700
701   def CheckPrereq(self):
702     """Check prerequisites.
703
704     Transform the list of checks we're going to skip into a set and check that
705     all its members are valid.
706
707     """
708     self.skip_set = frozenset(self.op.skip_checks)
709     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
710       raise errors.OpPrereqError("Invalid checks to be skipped specified")
711
712   def BuildHooksEnv(self):
713     """Build hooks env.
714
715     Cluster-Verify hooks just rone in the post phase and their failure makes
716     the output be logged in the verify output and the verification to fail.
717
718     """
719     all_nodes = self.cfg.GetNodeList()
720     # TODO: populate the environment with useful information for verify hooks
721     env = {}
722     return env, [], all_nodes
723
724   def Exec(self, feedback_fn):
725     """Verify integrity of cluster, performing various test on nodes.
726
727     """
728     bad = False
729     feedback_fn("* Verifying global settings")
730     for msg in self.cfg.VerifyConfig():
731       feedback_fn("  - ERROR: %s" % msg)
732
733     vg_name = self.cfg.GetVGName()
734     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
735     nodelist = utils.NiceSort(self.cfg.GetNodeList())
736     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
737     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
738     i_non_redundant = [] # Non redundant instances
739     node_volume = {}
740     node_instance = {}
741     node_info = {}
742     instance_cfg = {}
743
744     # FIXME: verify OS list
745     # do local checksums
746     file_names = []
747     file_names.append(constants.SSL_CERT_FILE)
748     file_names.append(constants.CLUSTER_CONF_FILE)
749     local_checksums = utils.FingerprintFiles(file_names)
750
751     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
752     all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
753     all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
754     all_vglist = self.rpc.call_vg_list(nodelist)
755     node_verify_param = {
756       'filelist': file_names,
757       'nodelist': nodelist,
758       'hypervisor': hypervisors,
759       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
760                         for node in nodeinfo]
761       }
762     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
763                                            self.cfg.GetClusterName())
764     all_rversion = self.rpc.call_version(nodelist)
765     all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
766                                         self.cfg.GetHypervisorType())
767
768     for node in nodelist:
769       feedback_fn("* Verifying node %s" % node)
770       result = self._VerifyNode(node, file_names, local_checksums,
771                                 all_vglist[node], all_nvinfo[node],
772                                 all_rversion[node], feedback_fn)
773       bad = bad or result
774
775       # node_volume
776       volumeinfo = all_volumeinfo[node]
777
778       if isinstance(volumeinfo, basestring):
779         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
780                     (node, volumeinfo[-400:].encode('string_escape')))
781         bad = True
782         node_volume[node] = {}
783       elif not isinstance(volumeinfo, dict):
784         feedback_fn("  - ERROR: connection to %s failed" % (node,))
785         bad = True
786         continue
787       else:
788         node_volume[node] = volumeinfo
789
790       # node_instance
791       nodeinstance = all_instanceinfo[node]
792       if type(nodeinstance) != list:
793         feedback_fn("  - ERROR: connection to %s failed" % (node,))
794         bad = True
795         continue
796
797       node_instance[node] = nodeinstance
798
799       # node_info
800       nodeinfo = all_ninfo[node]
801       if not isinstance(nodeinfo, dict):
802         feedback_fn("  - ERROR: connection to %s failed" % (node,))
803         bad = True
804         continue
805
806       try:
807         node_info[node] = {
808           "mfree": int(nodeinfo['memory_free']),
809           "dfree": int(nodeinfo['vg_free']),
810           "pinst": [],
811           "sinst": [],
812           # dictionary holding all instances this node is secondary for,
813           # grouped by their primary node. Each key is a cluster node, and each
814           # value is a list of instances which have the key as primary and the
815           # current node as secondary.  this is handy to calculate N+1 memory
816           # availability if you can only failover from a primary to its
817           # secondary.
818           "sinst-by-pnode": {},
819         }
820       except ValueError:
821         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
822         bad = True
823         continue
824
825     node_vol_should = {}
826
827     for instance in instancelist:
828       feedback_fn("* Verifying instance %s" % instance)
829       inst_config = self.cfg.GetInstanceInfo(instance)
830       result =  self._VerifyInstance(instance, inst_config, node_volume,
831                                      node_instance, feedback_fn)
832       bad = bad or result
833
834       inst_config.MapLVsByNode(node_vol_should)
835
836       instance_cfg[instance] = inst_config
837
838       pnode = inst_config.primary_node
839       if pnode in node_info:
840         node_info[pnode]['pinst'].append(instance)
841       else:
842         feedback_fn("  - ERROR: instance %s, connection to primary node"
843                     " %s failed" % (instance, pnode))
844         bad = True
845
846       # If the instance is non-redundant we cannot survive losing its primary
847       # node, so we are not N+1 compliant. On the other hand we have no disk
848       # templates with more than one secondary so that situation is not well
849       # supported either.
850       # FIXME: does not support file-backed instances
851       if len(inst_config.secondary_nodes) == 0:
852         i_non_redundant.append(instance)
853       elif len(inst_config.secondary_nodes) > 1:
854         feedback_fn("  - WARNING: multiple secondaries for instance %s"
855                     % instance)
856
857       for snode in inst_config.secondary_nodes:
858         if snode in node_info:
859           node_info[snode]['sinst'].append(instance)
860           if pnode not in node_info[snode]['sinst-by-pnode']:
861             node_info[snode]['sinst-by-pnode'][pnode] = []
862           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
863         else:
864           feedback_fn("  - ERROR: instance %s, connection to secondary node"
865                       " %s failed" % (instance, snode))
866
867     feedback_fn("* Verifying orphan volumes")
868     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
869                                        feedback_fn)
870     bad = bad or result
871
872     feedback_fn("* Verifying remaining instances")
873     result = self._VerifyOrphanInstances(instancelist, node_instance,
874                                          feedback_fn)
875     bad = bad or result
876
877     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
878       feedback_fn("* Verifying N+1 Memory redundancy")
879       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
880       bad = bad or result
881
882     feedback_fn("* Other Notes")
883     if i_non_redundant:
884       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
885                   % len(i_non_redundant))
886
887     return not bad
888
889   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
890     """Analize the post-hooks' result, handle it, and send some
891     nicely-formatted feedback back to the user.
892
893     Args:
894       phase: the hooks phase that has just been run
895       hooks_results: the results of the multi-node hooks rpc call
896       feedback_fn: function to send feedback back to the caller
897       lu_result: previous Exec result
898
899     """
900     # We only really run POST phase hooks, and are only interested in
901     # their results
902     if phase == constants.HOOKS_PHASE_POST:
903       # Used to change hooks' output to proper indentation
904       indent_re = re.compile('^', re.M)
905       feedback_fn("* Hooks Results")
906       if not hooks_results:
907         feedback_fn("  - ERROR: general communication failure")
908         lu_result = 1
909       else:
910         for node_name in hooks_results:
911           show_node_header = True
912           res = hooks_results[node_name]
913           if res is False or not isinstance(res, list):
914             feedback_fn("    Communication failure")
915             lu_result = 1
916             continue
917           for script, hkr, output in res:
918             if hkr == constants.HKR_FAIL:
919               # The node header is only shown once, if there are
920               # failing hooks on that node
921               if show_node_header:
922                 feedback_fn("  Node %s:" % node_name)
923                 show_node_header = False
924               feedback_fn("    ERROR: Script %s failed, output:" % script)
925               output = indent_re.sub('      ', output)
926               feedback_fn("%s" % output)
927               lu_result = 1
928
929       return lu_result
930
931
932 class LUVerifyDisks(NoHooksLU):
933   """Verifies the cluster disks status.
934
935   """
936   _OP_REQP = []
937   REQ_BGL = False
938
939   def ExpandNames(self):
940     self.needed_locks = {
941       locking.LEVEL_NODE: locking.ALL_SET,
942       locking.LEVEL_INSTANCE: locking.ALL_SET,
943     }
944     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
945
946   def CheckPrereq(self):
947     """Check prerequisites.
948
949     This has no prerequisites.
950
951     """
952     pass
953
954   def Exec(self, feedback_fn):
955     """Verify integrity of cluster disks.
956
957     """
958     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
959
960     vg_name = self.cfg.GetVGName()
961     nodes = utils.NiceSort(self.cfg.GetNodeList())
962     instances = [self.cfg.GetInstanceInfo(name)
963                  for name in self.cfg.GetInstanceList()]
964
965     nv_dict = {}
966     for inst in instances:
967       inst_lvs = {}
968       if (inst.status != "up" or
969           inst.disk_template not in constants.DTS_NET_MIRROR):
970         continue
971       inst.MapLVsByNode(inst_lvs)
972       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
973       for node, vol_list in inst_lvs.iteritems():
974         for vol in vol_list:
975           nv_dict[(node, vol)] = inst
976
977     if not nv_dict:
978       return result
979
980     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
981
982     to_act = set()
983     for node in nodes:
984       # node_volume
985       lvs = node_lvs[node]
986
987       if isinstance(lvs, basestring):
988         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
989         res_nlvm[node] = lvs
990       elif not isinstance(lvs, dict):
991         logger.Info("connection to node %s failed or invalid data returned" %
992                     (node,))
993         res_nodes.append(node)
994         continue
995
996       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
997         inst = nv_dict.pop((node, lv_name), None)
998         if (not lv_online and inst is not None
999             and inst.name not in res_instances):
1000           res_instances.append(inst.name)
1001
1002     # any leftover items in nv_dict are missing LVs, let's arrange the
1003     # data better
1004     for key, inst in nv_dict.iteritems():
1005       if inst.name not in res_missing:
1006         res_missing[inst.name] = []
1007       res_missing[inst.name].append(key)
1008
1009     return result
1010
1011
1012 class LURenameCluster(LogicalUnit):
1013   """Rename the cluster.
1014
1015   """
1016   HPATH = "cluster-rename"
1017   HTYPE = constants.HTYPE_CLUSTER
1018   _OP_REQP = ["name"]
1019
1020   def BuildHooksEnv(self):
1021     """Build hooks env.
1022
1023     """
1024     env = {
1025       "OP_TARGET": self.cfg.GetClusterName(),
1026       "NEW_NAME": self.op.name,
1027       }
1028     mn = self.cfg.GetMasterNode()
1029     return env, [mn], [mn]
1030
1031   def CheckPrereq(self):
1032     """Verify that the passed name is a valid one.
1033
1034     """
1035     hostname = utils.HostInfo(self.op.name)
1036
1037     new_name = hostname.name
1038     self.ip = new_ip = hostname.ip
1039     old_name = self.cfg.GetClusterName()
1040     old_ip = self.cfg.GetMasterIP()
1041     if new_name == old_name and new_ip == old_ip:
1042       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1043                                  " cluster has changed")
1044     if new_ip != old_ip:
1045       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1046         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1047                                    " reachable on the network. Aborting." %
1048                                    new_ip)
1049
1050     self.op.name = new_name
1051
1052   def Exec(self, feedback_fn):
1053     """Rename the cluster.
1054
1055     """
1056     clustername = self.op.name
1057     ip = self.ip
1058
1059     # shutdown the master IP
1060     master = self.cfg.GetMasterNode()
1061     if not self.rpc.call_node_stop_master(master, False):
1062       raise errors.OpExecError("Could not disable the master role")
1063
1064     try:
1065       # modify the sstore
1066       # TODO: sstore
1067       ss.SetKey(ss.SS_MASTER_IP, ip)
1068       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1069
1070       # Distribute updated ss config to all nodes
1071       myself = self.cfg.GetNodeInfo(master)
1072       dist_nodes = self.cfg.GetNodeList()
1073       if myself.name in dist_nodes:
1074         dist_nodes.remove(myself.name)
1075
1076       logger.Debug("Copying updated ssconf data to all nodes")
1077       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1078         fname = ss.KeyToFilename(keyname)
1079         result = self.rpc.call_upload_file(dist_nodes, fname)
1080         for to_node in dist_nodes:
1081           if not result[to_node]:
1082             logger.Error("copy of file %s to node %s failed" %
1083                          (fname, to_node))
1084     finally:
1085       if not self.rpc.call_node_start_master(master, False):
1086         logger.Error("Could not re-enable the master role on the master,"
1087                      " please restart manually.")
1088
1089
1090 def _RecursiveCheckIfLVMBased(disk):
1091   """Check if the given disk or its children are lvm-based.
1092
1093   Args:
1094     disk: ganeti.objects.Disk object
1095
1096   Returns:
1097     boolean indicating whether a LD_LV dev_type was found or not
1098
1099   """
1100   if disk.children:
1101     for chdisk in disk.children:
1102       if _RecursiveCheckIfLVMBased(chdisk):
1103         return True
1104   return disk.dev_type == constants.LD_LV
1105
1106
1107 class LUSetClusterParams(LogicalUnit):
1108   """Change the parameters of the cluster.
1109
1110   """
1111   HPATH = "cluster-modify"
1112   HTYPE = constants.HTYPE_CLUSTER
1113   _OP_REQP = []
1114   REQ_BGL = False
1115
1116   def ExpandNames(self):
1117     # FIXME: in the future maybe other cluster params won't require checking on
1118     # all nodes to be modified.
1119     self.needed_locks = {
1120       locking.LEVEL_NODE: locking.ALL_SET,
1121     }
1122     self.share_locks[locking.LEVEL_NODE] = 1
1123
1124   def BuildHooksEnv(self):
1125     """Build hooks env.
1126
1127     """
1128     env = {
1129       "OP_TARGET": self.cfg.GetClusterName(),
1130       "NEW_VG_NAME": self.op.vg_name,
1131       }
1132     mn = self.cfg.GetMasterNode()
1133     return env, [mn], [mn]
1134
1135   def CheckPrereq(self):
1136     """Check prerequisites.
1137
1138     This checks whether the given params don't conflict and
1139     if the given volume group is valid.
1140
1141     """
1142     # FIXME: This only works because there is only one parameter that can be
1143     # changed or removed.
1144     if not self.op.vg_name:
1145       instances = self.cfg.GetAllInstancesInfo().values()
1146       for inst in instances:
1147         for disk in inst.disks:
1148           if _RecursiveCheckIfLVMBased(disk):
1149             raise errors.OpPrereqError("Cannot disable lvm storage while"
1150                                        " lvm-based instances exist")
1151
1152     # if vg_name not None, checks given volume group on all nodes
1153     if self.op.vg_name:
1154       node_list = self.acquired_locks[locking.LEVEL_NODE]
1155       vglist = self.rpc.call_vg_list(node_list)
1156       for node in node_list:
1157         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1158                                               constants.MIN_VG_SIZE)
1159         if vgstatus:
1160           raise errors.OpPrereqError("Error on node '%s': %s" %
1161                                      (node, vgstatus))
1162
1163   def Exec(self, feedback_fn):
1164     """Change the parameters of the cluster.
1165
1166     """
1167     if self.op.vg_name != self.cfg.GetVGName():
1168       self.cfg.SetVGName(self.op.vg_name)
1169     else:
1170       feedback_fn("Cluster LVM configuration already in desired"
1171                   " state, not changing")
1172
1173
1174 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1175   """Sleep and poll for an instance's disk to sync.
1176
1177   """
1178   if not instance.disks:
1179     return True
1180
1181   if not oneshot:
1182     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1183
1184   node = instance.primary_node
1185
1186   for dev in instance.disks:
1187     lu.cfg.SetDiskID(dev, node)
1188
1189   retries = 0
1190   while True:
1191     max_time = 0
1192     done = True
1193     cumul_degraded = False
1194     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1195     if not rstats:
1196       lu.proc.LogWarning("Can't get any data from node %s" % node)
1197       retries += 1
1198       if retries >= 10:
1199         raise errors.RemoteError("Can't contact node %s for mirror data,"
1200                                  " aborting." % node)
1201       time.sleep(6)
1202       continue
1203     retries = 0
1204     for i in range(len(rstats)):
1205       mstat = rstats[i]
1206       if mstat is None:
1207         lu.proc.LogWarning("Can't compute data for node %s/%s" %
1208                            (node, instance.disks[i].iv_name))
1209         continue
1210       # we ignore the ldisk parameter
1211       perc_done, est_time, is_degraded, _ = mstat
1212       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1213       if perc_done is not None:
1214         done = False
1215         if est_time is not None:
1216           rem_time = "%d estimated seconds remaining" % est_time
1217           max_time = est_time
1218         else:
1219           rem_time = "no time estimate"
1220         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1221                         (instance.disks[i].iv_name, perc_done, rem_time))
1222     if done or oneshot:
1223       break
1224
1225     time.sleep(min(60, max_time))
1226
1227   if done:
1228     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1229   return not cumul_degraded
1230
1231
1232 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1233   """Check that mirrors are not degraded.
1234
1235   The ldisk parameter, if True, will change the test from the
1236   is_degraded attribute (which represents overall non-ok status for
1237   the device(s)) to the ldisk (representing the local storage status).
1238
1239   """
1240   lu.cfg.SetDiskID(dev, node)
1241   if ldisk:
1242     idx = 6
1243   else:
1244     idx = 5
1245
1246   result = True
1247   if on_primary or dev.AssembleOnSecondary():
1248     rstats = lu.rpc.call_blockdev_find(node, dev)
1249     if not rstats:
1250       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1251       result = False
1252     else:
1253       result = result and (not rstats[idx])
1254   if dev.children:
1255     for child in dev.children:
1256       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1257
1258   return result
1259
1260
1261 class LUDiagnoseOS(NoHooksLU):
1262   """Logical unit for OS diagnose/query.
1263
1264   """
1265   _OP_REQP = ["output_fields", "names"]
1266   REQ_BGL = False
1267
1268   def ExpandNames(self):
1269     if self.op.names:
1270       raise errors.OpPrereqError("Selective OS query not supported")
1271
1272     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1273     _CheckOutputFields(static=[],
1274                        dynamic=self.dynamic_fields,
1275                        selected=self.op.output_fields)
1276
1277     # Lock all nodes, in shared mode
1278     self.needed_locks = {}
1279     self.share_locks[locking.LEVEL_NODE] = 1
1280     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1281
1282   def CheckPrereq(self):
1283     """Check prerequisites.
1284
1285     """
1286
1287   @staticmethod
1288   def _DiagnoseByOS(node_list, rlist):
1289     """Remaps a per-node return list into an a per-os per-node dictionary
1290
1291       Args:
1292         node_list: a list with the names of all nodes
1293         rlist: a map with node names as keys and OS objects as values
1294
1295       Returns:
1296         map: a map with osnames as keys and as value another map, with
1297              nodes as
1298              keys and list of OS objects as values
1299              e.g. {"debian-etch": {"node1": [<object>,...],
1300                                    "node2": [<object>,]}
1301                   }
1302
1303     """
1304     all_os = {}
1305     for node_name, nr in rlist.iteritems():
1306       if not nr:
1307         continue
1308       for os_obj in nr:
1309         if os_obj.name not in all_os:
1310           # build a list of nodes for this os containing empty lists
1311           # for each node in node_list
1312           all_os[os_obj.name] = {}
1313           for nname in node_list:
1314             all_os[os_obj.name][nname] = []
1315         all_os[os_obj.name][node_name].append(os_obj)
1316     return all_os
1317
1318   def Exec(self, feedback_fn):
1319     """Compute the list of OSes.
1320
1321     """
1322     node_list = self.acquired_locks[locking.LEVEL_NODE]
1323     node_data = self.rpc.call_os_diagnose(node_list)
1324     if node_data == False:
1325       raise errors.OpExecError("Can't gather the list of OSes")
1326     pol = self._DiagnoseByOS(node_list, node_data)
1327     output = []
1328     for os_name, os_data in pol.iteritems():
1329       row = []
1330       for field in self.op.output_fields:
1331         if field == "name":
1332           val = os_name
1333         elif field == "valid":
1334           val = utils.all([osl and osl[0] for osl in os_data.values()])
1335         elif field == "node_status":
1336           val = {}
1337           for node_name, nos_list in os_data.iteritems():
1338             val[node_name] = [(v.status, v.path) for v in nos_list]
1339         else:
1340           raise errors.ParameterError(field)
1341         row.append(val)
1342       output.append(row)
1343
1344     return output
1345
1346
1347 class LURemoveNode(LogicalUnit):
1348   """Logical unit for removing a node.
1349
1350   """
1351   HPATH = "node-remove"
1352   HTYPE = constants.HTYPE_NODE
1353   _OP_REQP = ["node_name"]
1354
1355   def BuildHooksEnv(self):
1356     """Build hooks env.
1357
1358     This doesn't run on the target node in the pre phase as a failed
1359     node would then be impossible to remove.
1360
1361     """
1362     env = {
1363       "OP_TARGET": self.op.node_name,
1364       "NODE_NAME": self.op.node_name,
1365       }
1366     all_nodes = self.cfg.GetNodeList()
1367     all_nodes.remove(self.op.node_name)
1368     return env, all_nodes, all_nodes
1369
1370   def CheckPrereq(self):
1371     """Check prerequisites.
1372
1373     This checks:
1374      - the node exists in the configuration
1375      - it does not have primary or secondary instances
1376      - it's not the master
1377
1378     Any errors are signalled by raising errors.OpPrereqError.
1379
1380     """
1381     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1382     if node is None:
1383       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1384
1385     instance_list = self.cfg.GetInstanceList()
1386
1387     masternode = self.cfg.GetMasterNode()
1388     if node.name == masternode:
1389       raise errors.OpPrereqError("Node is the master node,"
1390                                  " you need to failover first.")
1391
1392     for instance_name in instance_list:
1393       instance = self.cfg.GetInstanceInfo(instance_name)
1394       if node.name == instance.primary_node:
1395         raise errors.OpPrereqError("Instance %s still running on the node,"
1396                                    " please remove first." % instance_name)
1397       if node.name in instance.secondary_nodes:
1398         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1399                                    " please remove first." % instance_name)
1400     self.op.node_name = node.name
1401     self.node = node
1402
1403   def Exec(self, feedback_fn):
1404     """Removes the node from the cluster.
1405
1406     """
1407     node = self.node
1408     logger.Info("stopping the node daemon and removing configs from node %s" %
1409                 node.name)
1410
1411     self.context.RemoveNode(node.name)
1412
1413     self.rpc.call_node_leave_cluster(node.name)
1414
1415
1416 class LUQueryNodes(NoHooksLU):
1417   """Logical unit for querying nodes.
1418
1419   """
1420   _OP_REQP = ["output_fields", "names"]
1421   REQ_BGL = False
1422
1423   def ExpandNames(self):
1424     self.dynamic_fields = frozenset([
1425       "dtotal", "dfree",
1426       "mtotal", "mnode", "mfree",
1427       "bootid",
1428       "ctotal",
1429       ])
1430
1431     self.static_fields = frozenset([
1432       "name", "pinst_cnt", "sinst_cnt",
1433       "pinst_list", "sinst_list",
1434       "pip", "sip", "tags",
1435       "serial_no",
1436       ])
1437
1438     _CheckOutputFields(static=self.static_fields,
1439                        dynamic=self.dynamic_fields,
1440                        selected=self.op.output_fields)
1441
1442     self.needed_locks = {}
1443     self.share_locks[locking.LEVEL_NODE] = 1
1444
1445     if self.op.names:
1446       self.wanted = _GetWantedNodes(self, self.op.names)
1447     else:
1448       self.wanted = locking.ALL_SET
1449
1450     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1451     if self.do_locking:
1452       # if we don't request only static fields, we need to lock the nodes
1453       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1454
1455
1456   def CheckPrereq(self):
1457     """Check prerequisites.
1458
1459     """
1460     # The validation of the node list is done in the _GetWantedNodes,
1461     # if non empty, and if empty, there's no validation to do
1462     pass
1463
1464   def Exec(self, feedback_fn):
1465     """Computes the list of nodes and their attributes.
1466
1467     """
1468     all_info = self.cfg.GetAllNodesInfo()
1469     if self.do_locking:
1470       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1471     elif self.wanted != locking.ALL_SET:
1472       nodenames = self.wanted
1473       missing = set(nodenames).difference(all_info.keys())
1474       if missing:
1475         raise errors.OpExecError(
1476           "Some nodes were removed before retrieving their data: %s" % missing)
1477     else:
1478       nodenames = all_info.keys()
1479     nodelist = [all_info[name] for name in nodenames]
1480
1481     # begin data gathering
1482
1483     if self.dynamic_fields.intersection(self.op.output_fields):
1484       live_data = {}
1485       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1486                                           self.cfg.GetHypervisorType())
1487       for name in nodenames:
1488         nodeinfo = node_data.get(name, None)
1489         if nodeinfo:
1490           live_data[name] = {
1491             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1492             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1493             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1494             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1495             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1496             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1497             "bootid": nodeinfo['bootid'],
1498             }
1499         else:
1500           live_data[name] = {}
1501     else:
1502       live_data = dict.fromkeys(nodenames, {})
1503
1504     node_to_primary = dict([(name, set()) for name in nodenames])
1505     node_to_secondary = dict([(name, set()) for name in nodenames])
1506
1507     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1508                              "sinst_cnt", "sinst_list"))
1509     if inst_fields & frozenset(self.op.output_fields):
1510       instancelist = self.cfg.GetInstanceList()
1511
1512       for instance_name in instancelist:
1513         inst = self.cfg.GetInstanceInfo(instance_name)
1514         if inst.primary_node in node_to_primary:
1515           node_to_primary[inst.primary_node].add(inst.name)
1516         for secnode in inst.secondary_nodes:
1517           if secnode in node_to_secondary:
1518             node_to_secondary[secnode].add(inst.name)
1519
1520     # end data gathering
1521
1522     output = []
1523     for node in nodelist:
1524       node_output = []
1525       for field in self.op.output_fields:
1526         if field == "name":
1527           val = node.name
1528         elif field == "pinst_list":
1529           val = list(node_to_primary[node.name])
1530         elif field == "sinst_list":
1531           val = list(node_to_secondary[node.name])
1532         elif field == "pinst_cnt":
1533           val = len(node_to_primary[node.name])
1534         elif field == "sinst_cnt":
1535           val = len(node_to_secondary[node.name])
1536         elif field == "pip":
1537           val = node.primary_ip
1538         elif field == "sip":
1539           val = node.secondary_ip
1540         elif field == "tags":
1541           val = list(node.GetTags())
1542         elif field == "serial_no":
1543           val = node.serial_no
1544         elif field in self.dynamic_fields:
1545           val = live_data[node.name].get(field, None)
1546         else:
1547           raise errors.ParameterError(field)
1548         node_output.append(val)
1549       output.append(node_output)
1550
1551     return output
1552
1553
1554 class LUQueryNodeVolumes(NoHooksLU):
1555   """Logical unit for getting volumes on node(s).
1556
1557   """
1558   _OP_REQP = ["nodes", "output_fields"]
1559   REQ_BGL = False
1560
1561   def ExpandNames(self):
1562     _CheckOutputFields(static=["node"],
1563                        dynamic=["phys", "vg", "name", "size", "instance"],
1564                        selected=self.op.output_fields)
1565
1566     self.needed_locks = {}
1567     self.share_locks[locking.LEVEL_NODE] = 1
1568     if not self.op.nodes:
1569       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1570     else:
1571       self.needed_locks[locking.LEVEL_NODE] = \
1572         _GetWantedNodes(self, self.op.nodes)
1573
1574   def CheckPrereq(self):
1575     """Check prerequisites.
1576
1577     This checks that the fields required are valid output fields.
1578
1579     """
1580     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1581
1582   def Exec(self, feedback_fn):
1583     """Computes the list of nodes and their attributes.
1584
1585     """
1586     nodenames = self.nodes
1587     volumes = self.rpc.call_node_volumes(nodenames)
1588
1589     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1590              in self.cfg.GetInstanceList()]
1591
1592     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1593
1594     output = []
1595     for node in nodenames:
1596       if node not in volumes or not volumes[node]:
1597         continue
1598
1599       node_vols = volumes[node][:]
1600       node_vols.sort(key=lambda vol: vol['dev'])
1601
1602       for vol in node_vols:
1603         node_output = []
1604         for field in self.op.output_fields:
1605           if field == "node":
1606             val = node
1607           elif field == "phys":
1608             val = vol['dev']
1609           elif field == "vg":
1610             val = vol['vg']
1611           elif field == "name":
1612             val = vol['name']
1613           elif field == "size":
1614             val = int(float(vol['size']))
1615           elif field == "instance":
1616             for inst in ilist:
1617               if node not in lv_by_node[inst]:
1618                 continue
1619               if vol['name'] in lv_by_node[inst][node]:
1620                 val = inst.name
1621                 break
1622             else:
1623               val = '-'
1624           else:
1625             raise errors.ParameterError(field)
1626           node_output.append(str(val))
1627
1628         output.append(node_output)
1629
1630     return output
1631
1632
1633 class LUAddNode(LogicalUnit):
1634   """Logical unit for adding node to the cluster.
1635
1636   """
1637   HPATH = "node-add"
1638   HTYPE = constants.HTYPE_NODE
1639   _OP_REQP = ["node_name"]
1640
1641   def BuildHooksEnv(self):
1642     """Build hooks env.
1643
1644     This will run on all nodes before, and on all nodes + the new node after.
1645
1646     """
1647     env = {
1648       "OP_TARGET": self.op.node_name,
1649       "NODE_NAME": self.op.node_name,
1650       "NODE_PIP": self.op.primary_ip,
1651       "NODE_SIP": self.op.secondary_ip,
1652       }
1653     nodes_0 = self.cfg.GetNodeList()
1654     nodes_1 = nodes_0 + [self.op.node_name, ]
1655     return env, nodes_0, nodes_1
1656
1657   def CheckPrereq(self):
1658     """Check prerequisites.
1659
1660     This checks:
1661      - the new node is not already in the config
1662      - it is resolvable
1663      - its parameters (single/dual homed) matches the cluster
1664
1665     Any errors are signalled by raising errors.OpPrereqError.
1666
1667     """
1668     node_name = self.op.node_name
1669     cfg = self.cfg
1670
1671     dns_data = utils.HostInfo(node_name)
1672
1673     node = dns_data.name
1674     primary_ip = self.op.primary_ip = dns_data.ip
1675     secondary_ip = getattr(self.op, "secondary_ip", None)
1676     if secondary_ip is None:
1677       secondary_ip = primary_ip
1678     if not utils.IsValidIP(secondary_ip):
1679       raise errors.OpPrereqError("Invalid secondary IP given")
1680     self.op.secondary_ip = secondary_ip
1681
1682     node_list = cfg.GetNodeList()
1683     if not self.op.readd and node in node_list:
1684       raise errors.OpPrereqError("Node %s is already in the configuration" %
1685                                  node)
1686     elif self.op.readd and node not in node_list:
1687       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1688
1689     for existing_node_name in node_list:
1690       existing_node = cfg.GetNodeInfo(existing_node_name)
1691
1692       if self.op.readd and node == existing_node_name:
1693         if (existing_node.primary_ip != primary_ip or
1694             existing_node.secondary_ip != secondary_ip):
1695           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1696                                      " address configuration as before")
1697         continue
1698
1699       if (existing_node.primary_ip == primary_ip or
1700           existing_node.secondary_ip == primary_ip or
1701           existing_node.primary_ip == secondary_ip or
1702           existing_node.secondary_ip == secondary_ip):
1703         raise errors.OpPrereqError("New node ip address(es) conflict with"
1704                                    " existing node %s" % existing_node.name)
1705
1706     # check that the type of the node (single versus dual homed) is the
1707     # same as for the master
1708     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1709     master_singlehomed = myself.secondary_ip == myself.primary_ip
1710     newbie_singlehomed = secondary_ip == primary_ip
1711     if master_singlehomed != newbie_singlehomed:
1712       if master_singlehomed:
1713         raise errors.OpPrereqError("The master has no private ip but the"
1714                                    " new node has one")
1715       else:
1716         raise errors.OpPrereqError("The master has a private ip but the"
1717                                    " new node doesn't have one")
1718
1719     # checks reachablity
1720     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1721       raise errors.OpPrereqError("Node not reachable by ping")
1722
1723     if not newbie_singlehomed:
1724       # check reachability from my secondary ip to newbie's secondary ip
1725       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1726                            source=myself.secondary_ip):
1727         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1728                                    " based ping to noded port")
1729
1730     self.new_node = objects.Node(name=node,
1731                                  primary_ip=primary_ip,
1732                                  secondary_ip=secondary_ip)
1733
1734   def Exec(self, feedback_fn):
1735     """Adds the new node to the cluster.
1736
1737     """
1738     new_node = self.new_node
1739     node = new_node.name
1740
1741     # check connectivity
1742     result = self.rpc.call_version([node])[node]
1743     if result:
1744       if constants.PROTOCOL_VERSION == result:
1745         logger.Info("communication to node %s fine, sw version %s match" %
1746                     (node, result))
1747       else:
1748         raise errors.OpExecError("Version mismatch master version %s,"
1749                                  " node version %s" %
1750                                  (constants.PROTOCOL_VERSION, result))
1751     else:
1752       raise errors.OpExecError("Cannot get version from the new node")
1753
1754     # setup ssh on node
1755     logger.Info("copy ssh key to node %s" % node)
1756     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1757     keyarray = []
1758     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1759                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1760                 priv_key, pub_key]
1761
1762     for i in keyfiles:
1763       f = open(i, 'r')
1764       try:
1765         keyarray.append(f.read())
1766       finally:
1767         f.close()
1768
1769     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1770                                     keyarray[2],
1771                                     keyarray[3], keyarray[4], keyarray[5])
1772
1773     if not result:
1774       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1775
1776     # Add node to our /etc/hosts, and add key to known_hosts
1777     utils.AddHostToEtcHosts(new_node.name)
1778
1779     if new_node.secondary_ip != new_node.primary_ip:
1780       if not self.rpc.call_node_has_ip_address(new_node.name,
1781                                                new_node.secondary_ip):
1782         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1783                                  " you gave (%s). Please fix and re-run this"
1784                                  " command." % new_node.secondary_ip)
1785
1786     node_verify_list = [self.cfg.GetMasterNode()]
1787     node_verify_param = {
1788       'nodelist': [node],
1789       # TODO: do a node-net-test as well?
1790     }
1791
1792     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1793                                        self.cfg.GetClusterName())
1794     for verifier in node_verify_list:
1795       if not result[verifier]:
1796         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1797                                  " for remote verification" % verifier)
1798       if result[verifier]['nodelist']:
1799         for failed in result[verifier]['nodelist']:
1800           feedback_fn("ssh/hostname verification failed %s -> %s" %
1801                       (verifier, result[verifier]['nodelist'][failed]))
1802         raise errors.OpExecError("ssh/hostname verification failed.")
1803
1804     # Distribute updated /etc/hosts and known_hosts to all nodes,
1805     # including the node just added
1806     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1807     dist_nodes = self.cfg.GetNodeList()
1808     if not self.op.readd:
1809       dist_nodes.append(node)
1810     if myself.name in dist_nodes:
1811       dist_nodes.remove(myself.name)
1812
1813     logger.Debug("Copying hosts and known_hosts to all nodes")
1814     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1815       result = self.rpc.call_upload_file(dist_nodes, fname)
1816       for to_node in dist_nodes:
1817         if not result[to_node]:
1818           logger.Error("copy of file %s to node %s failed" %
1819                        (fname, to_node))
1820
1821     to_copy = []
1822     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1823       to_copy.append(constants.VNC_PASSWORD_FILE)
1824     for fname in to_copy:
1825       result = self.rpc.call_upload_file([node], fname)
1826       if not result[node]:
1827         logger.Error("could not copy file %s to node %s" % (fname, node))
1828
1829     if self.op.readd:
1830       self.context.ReaddNode(new_node)
1831     else:
1832       self.context.AddNode(new_node)
1833
1834
1835 class LUQueryClusterInfo(NoHooksLU):
1836   """Query cluster configuration.
1837
1838   """
1839   _OP_REQP = []
1840   REQ_MASTER = False
1841   REQ_BGL = False
1842
1843   def ExpandNames(self):
1844     self.needed_locks = {}
1845
1846   def CheckPrereq(self):
1847     """No prerequsites needed for this LU.
1848
1849     """
1850     pass
1851
1852   def Exec(self, feedback_fn):
1853     """Return cluster config.
1854
1855     """
1856     result = {
1857       "name": self.cfg.GetClusterName(),
1858       "software_version": constants.RELEASE_VERSION,
1859       "protocol_version": constants.PROTOCOL_VERSION,
1860       "config_version": constants.CONFIG_VERSION,
1861       "os_api_version": constants.OS_API_VERSION,
1862       "export_version": constants.EXPORT_VERSION,
1863       "master": self.cfg.GetMasterNode(),
1864       "architecture": (platform.architecture()[0], platform.machine()),
1865       "hypervisor_type": self.cfg.GetHypervisorType(),
1866       "enabled_hypervisors": self.cfg.GetClusterInfo().enabled_hypervisors,
1867       }
1868
1869     return result
1870
1871
1872 class LUQueryConfigValues(NoHooksLU):
1873   """Return configuration values.
1874
1875   """
1876   _OP_REQP = []
1877   REQ_BGL = False
1878
1879   def ExpandNames(self):
1880     self.needed_locks = {}
1881
1882     static_fields = ["cluster_name", "master_node"]
1883     _CheckOutputFields(static=static_fields,
1884                        dynamic=[],
1885                        selected=self.op.output_fields)
1886
1887   def CheckPrereq(self):
1888     """No prerequisites.
1889
1890     """
1891     pass
1892
1893   def Exec(self, feedback_fn):
1894     """Dump a representation of the cluster config to the standard output.
1895
1896     """
1897     values = []
1898     for field in self.op.output_fields:
1899       if field == "cluster_name":
1900         values.append(self.cfg.GetClusterName())
1901       elif field == "master_node":
1902         values.append(self.cfg.GetMasterNode())
1903       else:
1904         raise errors.ParameterError(field)
1905     return values
1906
1907
1908 class LUActivateInstanceDisks(NoHooksLU):
1909   """Bring up an instance's disks.
1910
1911   """
1912   _OP_REQP = ["instance_name"]
1913   REQ_BGL = False
1914
1915   def ExpandNames(self):
1916     self._ExpandAndLockInstance()
1917     self.needed_locks[locking.LEVEL_NODE] = []
1918     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1919
1920   def DeclareLocks(self, level):
1921     if level == locking.LEVEL_NODE:
1922       self._LockInstancesNodes()
1923
1924   def CheckPrereq(self):
1925     """Check prerequisites.
1926
1927     This checks that the instance is in the cluster.
1928
1929     """
1930     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1931     assert self.instance is not None, \
1932       "Cannot retrieve locked instance %s" % self.op.instance_name
1933
1934   def Exec(self, feedback_fn):
1935     """Activate the disks.
1936
1937     """
1938     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
1939     if not disks_ok:
1940       raise errors.OpExecError("Cannot activate block devices")
1941
1942     return disks_info
1943
1944
1945 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
1946   """Prepare the block devices for an instance.
1947
1948   This sets up the block devices on all nodes.
1949
1950   Args:
1951     instance: a ganeti.objects.Instance object
1952     ignore_secondaries: if true, errors on secondary nodes won't result
1953                         in an error return from the function
1954
1955   Returns:
1956     false if the operation failed
1957     list of (host, instance_visible_name, node_visible_name) if the operation
1958          suceeded with the mapping from node devices to instance devices
1959   """
1960   device_info = []
1961   disks_ok = True
1962   iname = instance.name
1963   # With the two passes mechanism we try to reduce the window of
1964   # opportunity for the race condition of switching DRBD to primary
1965   # before handshaking occured, but we do not eliminate it
1966
1967   # The proper fix would be to wait (with some limits) until the
1968   # connection has been made and drbd transitions from WFConnection
1969   # into any other network-connected state (Connected, SyncTarget,
1970   # SyncSource, etc.)
1971
1972   # 1st pass, assemble on all nodes in secondary mode
1973   for inst_disk in instance.disks:
1974     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1975       lu.cfg.SetDiskID(node_disk, node)
1976       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
1977       if not result:
1978         logger.Error("could not prepare block device %s on node %s"
1979                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1980         if not ignore_secondaries:
1981           disks_ok = False
1982
1983   # FIXME: race condition on drbd migration to primary
1984
1985   # 2nd pass, do only the primary node
1986   for inst_disk in instance.disks:
1987     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1988       if node != instance.primary_node:
1989         continue
1990       lu.cfg.SetDiskID(node_disk, node)
1991       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
1992       if not result:
1993         logger.Error("could not prepare block device %s on node %s"
1994                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1995         disks_ok = False
1996     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1997
1998   # leave the disks configured for the primary node
1999   # this is a workaround that would be fixed better by
2000   # improving the logical/physical id handling
2001   for disk in instance.disks:
2002     lu.cfg.SetDiskID(disk, instance.primary_node)
2003
2004   return disks_ok, device_info
2005
2006
2007 def _StartInstanceDisks(lu, instance, force):
2008   """Start the disks of an instance.
2009
2010   """
2011   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2012                                            ignore_secondaries=force)
2013   if not disks_ok:
2014     _ShutdownInstanceDisks(lu, instance)
2015     if force is not None and not force:
2016       logger.Error("If the message above refers to a secondary node,"
2017                    " you can retry the operation using '--force'.")
2018     raise errors.OpExecError("Disk consistency error")
2019
2020
2021 class LUDeactivateInstanceDisks(NoHooksLU):
2022   """Shutdown an instance's disks.
2023
2024   """
2025   _OP_REQP = ["instance_name"]
2026   REQ_BGL = False
2027
2028   def ExpandNames(self):
2029     self._ExpandAndLockInstance()
2030     self.needed_locks[locking.LEVEL_NODE] = []
2031     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2032
2033   def DeclareLocks(self, level):
2034     if level == locking.LEVEL_NODE:
2035       self._LockInstancesNodes()
2036
2037   def CheckPrereq(self):
2038     """Check prerequisites.
2039
2040     This checks that the instance is in the cluster.
2041
2042     """
2043     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2044     assert self.instance is not None, \
2045       "Cannot retrieve locked instance %s" % self.op.instance_name
2046
2047   def Exec(self, feedback_fn):
2048     """Deactivate the disks
2049
2050     """
2051     instance = self.instance
2052     _SafeShutdownInstanceDisks(self, instance)
2053
2054
2055 def _SafeShutdownInstanceDisks(lu, instance):
2056   """Shutdown block devices of an instance.
2057
2058   This function checks if an instance is running, before calling
2059   _ShutdownInstanceDisks.
2060
2061   """
2062   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2063                                       [instance.hypervisor])
2064   ins_l = ins_l[instance.primary_node]
2065   if not type(ins_l) is list:
2066     raise errors.OpExecError("Can't contact node '%s'" %
2067                              instance.primary_node)
2068
2069   if instance.name in ins_l:
2070     raise errors.OpExecError("Instance is running, can't shutdown"
2071                              " block devices.")
2072
2073   _ShutdownInstanceDisks(lu, instance)
2074
2075
2076 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2077   """Shutdown block devices of an instance.
2078
2079   This does the shutdown on all nodes of the instance.
2080
2081   If the ignore_primary is false, errors on the primary node are
2082   ignored.
2083
2084   """
2085   result = True
2086   for disk in instance.disks:
2087     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2088       lu.cfg.SetDiskID(top_disk, node)
2089       if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2090         logger.Error("could not shutdown block device %s on node %s" %
2091                      (disk.iv_name, node))
2092         if not ignore_primary or node != instance.primary_node:
2093           result = False
2094   return result
2095
2096
2097 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2098   """Checks if a node has enough free memory.
2099
2100   This function check if a given node has the needed amount of free
2101   memory. In case the node has less memory or we cannot get the
2102   information from the node, this function raise an OpPrereqError
2103   exception.
2104
2105   @type lu: C{LogicalUnit}
2106   @param lu: a logical unit from which we get configuration data
2107   @type node: C{str}
2108   @param node: the node to check
2109   @type reason: C{str}
2110   @param reason: string to use in the error message
2111   @type requested: C{int}
2112   @param requested: the amount of memory in MiB to check for
2113   @type hypervisor: C{str}
2114   @param hypervisor: the hypervisor to ask for memory stats
2115   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2116       we cannot check the node
2117
2118   """
2119   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2120   if not nodeinfo or not isinstance(nodeinfo, dict):
2121     raise errors.OpPrereqError("Could not contact node %s for resource"
2122                              " information" % (node,))
2123
2124   free_mem = nodeinfo[node].get('memory_free')
2125   if not isinstance(free_mem, int):
2126     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2127                              " was '%s'" % (node, free_mem))
2128   if requested > free_mem:
2129     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2130                              " needed %s MiB, available %s MiB" %
2131                              (node, reason, requested, free_mem))
2132
2133
2134 class LUStartupInstance(LogicalUnit):
2135   """Starts an instance.
2136
2137   """
2138   HPATH = "instance-start"
2139   HTYPE = constants.HTYPE_INSTANCE
2140   _OP_REQP = ["instance_name", "force"]
2141   REQ_BGL = False
2142
2143   def ExpandNames(self):
2144     self._ExpandAndLockInstance()
2145     self.needed_locks[locking.LEVEL_NODE] = []
2146     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2147
2148   def DeclareLocks(self, level):
2149     if level == locking.LEVEL_NODE:
2150       self._LockInstancesNodes()
2151
2152   def BuildHooksEnv(self):
2153     """Build hooks env.
2154
2155     This runs on master, primary and secondary nodes of the instance.
2156
2157     """
2158     env = {
2159       "FORCE": self.op.force,
2160       }
2161     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2162     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2163           list(self.instance.secondary_nodes))
2164     return env, nl, nl
2165
2166   def CheckPrereq(self):
2167     """Check prerequisites.
2168
2169     This checks that the instance is in the cluster.
2170
2171     """
2172     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2173     assert self.instance is not None, \
2174       "Cannot retrieve locked instance %s" % self.op.instance_name
2175
2176     bep = self.cfg.GetClusterInfo().FillBE(instance)
2177     # check bridges existance
2178     _CheckInstanceBridgesExist(self, instance)
2179
2180     _CheckNodeFreeMemory(self, instance.primary_node,
2181                          "starting instance %s" % instance.name,
2182                          bep[constants.BE_MEMORY], instance.hypervisor)
2183
2184   def Exec(self, feedback_fn):
2185     """Start the instance.
2186
2187     """
2188     instance = self.instance
2189     force = self.op.force
2190     extra_args = getattr(self.op, "extra_args", "")
2191
2192     self.cfg.MarkInstanceUp(instance.name)
2193
2194     node_current = instance.primary_node
2195
2196     _StartInstanceDisks(self, instance, force)
2197
2198     if not self.rpc.call_instance_start(node_current, instance, extra_args):
2199       _ShutdownInstanceDisks(self, instance)
2200       raise errors.OpExecError("Could not start instance")
2201
2202
2203 class LURebootInstance(LogicalUnit):
2204   """Reboot an instance.
2205
2206   """
2207   HPATH = "instance-reboot"
2208   HTYPE = constants.HTYPE_INSTANCE
2209   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2210   REQ_BGL = False
2211
2212   def ExpandNames(self):
2213     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2214                                    constants.INSTANCE_REBOOT_HARD,
2215                                    constants.INSTANCE_REBOOT_FULL]:
2216       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2217                                   (constants.INSTANCE_REBOOT_SOFT,
2218                                    constants.INSTANCE_REBOOT_HARD,
2219                                    constants.INSTANCE_REBOOT_FULL))
2220     self._ExpandAndLockInstance()
2221     self.needed_locks[locking.LEVEL_NODE] = []
2222     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2223
2224   def DeclareLocks(self, level):
2225     if level == locking.LEVEL_NODE:
2226       primary_only = not constants.INSTANCE_REBOOT_FULL
2227       self._LockInstancesNodes(primary_only=primary_only)
2228
2229   def BuildHooksEnv(self):
2230     """Build hooks env.
2231
2232     This runs on master, primary and secondary nodes of the instance.
2233
2234     """
2235     env = {
2236       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2237       }
2238     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2239     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2240           list(self.instance.secondary_nodes))
2241     return env, nl, nl
2242
2243   def CheckPrereq(self):
2244     """Check prerequisites.
2245
2246     This checks that the instance is in the cluster.
2247
2248     """
2249     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2250     assert self.instance is not None, \
2251       "Cannot retrieve locked instance %s" % self.op.instance_name
2252
2253     # check bridges existance
2254     _CheckInstanceBridgesExist(self, instance)
2255
2256   def Exec(self, feedback_fn):
2257     """Reboot the instance.
2258
2259     """
2260     instance = self.instance
2261     ignore_secondaries = self.op.ignore_secondaries
2262     reboot_type = self.op.reboot_type
2263     extra_args = getattr(self.op, "extra_args", "")
2264
2265     node_current = instance.primary_node
2266
2267     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2268                        constants.INSTANCE_REBOOT_HARD]:
2269       if not self.rpc.call_instance_reboot(node_current, instance,
2270                                            reboot_type, extra_args):
2271         raise errors.OpExecError("Could not reboot instance")
2272     else:
2273       if not self.rpc.call_instance_shutdown(node_current, instance):
2274         raise errors.OpExecError("could not shutdown instance for full reboot")
2275       _ShutdownInstanceDisks(self, instance)
2276       _StartInstanceDisks(self, instance, ignore_secondaries)
2277       if not self.rpc.call_instance_start(node_current, instance, extra_args):
2278         _ShutdownInstanceDisks(self, instance)
2279         raise errors.OpExecError("Could not start instance for full reboot")
2280
2281     self.cfg.MarkInstanceUp(instance.name)
2282
2283
2284 class LUShutdownInstance(LogicalUnit):
2285   """Shutdown an instance.
2286
2287   """
2288   HPATH = "instance-stop"
2289   HTYPE = constants.HTYPE_INSTANCE
2290   _OP_REQP = ["instance_name"]
2291   REQ_BGL = False
2292
2293   def ExpandNames(self):
2294     self._ExpandAndLockInstance()
2295     self.needed_locks[locking.LEVEL_NODE] = []
2296     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2297
2298   def DeclareLocks(self, level):
2299     if level == locking.LEVEL_NODE:
2300       self._LockInstancesNodes()
2301
2302   def BuildHooksEnv(self):
2303     """Build hooks env.
2304
2305     This runs on master, primary and secondary nodes of the instance.
2306
2307     """
2308     env = _BuildInstanceHookEnvByObject(self, self.instance)
2309     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2310           list(self.instance.secondary_nodes))
2311     return env, nl, nl
2312
2313   def CheckPrereq(self):
2314     """Check prerequisites.
2315
2316     This checks that the instance is in the cluster.
2317
2318     """
2319     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2320     assert self.instance is not None, \
2321       "Cannot retrieve locked instance %s" % self.op.instance_name
2322
2323   def Exec(self, feedback_fn):
2324     """Shutdown the instance.
2325
2326     """
2327     instance = self.instance
2328     node_current = instance.primary_node
2329     self.cfg.MarkInstanceDown(instance.name)
2330     if not self.rpc.call_instance_shutdown(node_current, instance):
2331       logger.Error("could not shutdown instance")
2332
2333     _ShutdownInstanceDisks(self, instance)
2334
2335
2336 class LUReinstallInstance(LogicalUnit):
2337   """Reinstall an instance.
2338
2339   """
2340   HPATH = "instance-reinstall"
2341   HTYPE = constants.HTYPE_INSTANCE
2342   _OP_REQP = ["instance_name"]
2343   REQ_BGL = False
2344
2345   def ExpandNames(self):
2346     self._ExpandAndLockInstance()
2347     self.needed_locks[locking.LEVEL_NODE] = []
2348     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2349
2350   def DeclareLocks(self, level):
2351     if level == locking.LEVEL_NODE:
2352       self._LockInstancesNodes()
2353
2354   def BuildHooksEnv(self):
2355     """Build hooks env.
2356
2357     This runs on master, primary and secondary nodes of the instance.
2358
2359     """
2360     env = _BuildInstanceHookEnvByObject(self, self.instance)
2361     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2362           list(self.instance.secondary_nodes))
2363     return env, nl, nl
2364
2365   def CheckPrereq(self):
2366     """Check prerequisites.
2367
2368     This checks that the instance is in the cluster and is not running.
2369
2370     """
2371     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2372     assert instance is not None, \
2373       "Cannot retrieve locked instance %s" % self.op.instance_name
2374
2375     if instance.disk_template == constants.DT_DISKLESS:
2376       raise errors.OpPrereqError("Instance '%s' has no disks" %
2377                                  self.op.instance_name)
2378     if instance.status != "down":
2379       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2380                                  self.op.instance_name)
2381     remote_info = self.rpc.call_instance_info(instance.primary_node,
2382                                               instance.name,
2383                                               instance.hypervisor)
2384     if remote_info:
2385       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2386                                  (self.op.instance_name,
2387                                   instance.primary_node))
2388
2389     self.op.os_type = getattr(self.op, "os_type", None)
2390     if self.op.os_type is not None:
2391       # OS verification
2392       pnode = self.cfg.GetNodeInfo(
2393         self.cfg.ExpandNodeName(instance.primary_node))
2394       if pnode is None:
2395         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2396                                    self.op.pnode)
2397       os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2398       if not os_obj:
2399         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2400                                    " primary node"  % self.op.os_type)
2401
2402     self.instance = instance
2403
2404   def Exec(self, feedback_fn):
2405     """Reinstall the instance.
2406
2407     """
2408     inst = self.instance
2409
2410     if self.op.os_type is not None:
2411       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2412       inst.os = self.op.os_type
2413       self.cfg.Update(inst)
2414
2415     _StartInstanceDisks(self, inst, None)
2416     try:
2417       feedback_fn("Running the instance OS create scripts...")
2418       if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2419                                            "sda", "sdb"):
2420         raise errors.OpExecError("Could not install OS for instance %s"
2421                                  " on node %s" %
2422                                  (inst.name, inst.primary_node))
2423     finally:
2424       _ShutdownInstanceDisks(self, inst)
2425
2426
2427 class LURenameInstance(LogicalUnit):
2428   """Rename an instance.
2429
2430   """
2431   HPATH = "instance-rename"
2432   HTYPE = constants.HTYPE_INSTANCE
2433   _OP_REQP = ["instance_name", "new_name"]
2434
2435   def BuildHooksEnv(self):
2436     """Build hooks env.
2437
2438     This runs on master, primary and secondary nodes of the instance.
2439
2440     """
2441     env = _BuildInstanceHookEnvByObject(self, self.instance)
2442     env["INSTANCE_NEW_NAME"] = self.op.new_name
2443     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2444           list(self.instance.secondary_nodes))
2445     return env, nl, nl
2446
2447   def CheckPrereq(self):
2448     """Check prerequisites.
2449
2450     This checks that the instance is in the cluster and is not running.
2451
2452     """
2453     instance = self.cfg.GetInstanceInfo(
2454       self.cfg.ExpandInstanceName(self.op.instance_name))
2455     if instance is None:
2456       raise errors.OpPrereqError("Instance '%s' not known" %
2457                                  self.op.instance_name)
2458     if instance.status != "down":
2459       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2460                                  self.op.instance_name)
2461     remote_info = self.rpc.call_instance_info(instance.primary_node,
2462                                               instance.name,
2463                                               instance.hypervisor)
2464     if remote_info:
2465       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2466                                  (self.op.instance_name,
2467                                   instance.primary_node))
2468     self.instance = instance
2469
2470     # new name verification
2471     name_info = utils.HostInfo(self.op.new_name)
2472
2473     self.op.new_name = new_name = name_info.name
2474     instance_list = self.cfg.GetInstanceList()
2475     if new_name in instance_list:
2476       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2477                                  new_name)
2478
2479     if not getattr(self.op, "ignore_ip", False):
2480       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2481         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2482                                    (name_info.ip, new_name))
2483
2484
2485   def Exec(self, feedback_fn):
2486     """Reinstall the instance.
2487
2488     """
2489     inst = self.instance
2490     old_name = inst.name
2491
2492     if inst.disk_template == constants.DT_FILE:
2493       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2494
2495     self.cfg.RenameInstance(inst.name, self.op.new_name)
2496     # Change the instance lock. This is definitely safe while we hold the BGL
2497     self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2498     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2499
2500     # re-read the instance from the configuration after rename
2501     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2502
2503     if inst.disk_template == constants.DT_FILE:
2504       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2505       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2506                                                      old_file_storage_dir,
2507                                                      new_file_storage_dir)
2508
2509       if not result:
2510         raise errors.OpExecError("Could not connect to node '%s' to rename"
2511                                  " directory '%s' to '%s' (but the instance"
2512                                  " has been renamed in Ganeti)" % (
2513                                  inst.primary_node, old_file_storage_dir,
2514                                  new_file_storage_dir))
2515
2516       if not result[0]:
2517         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2518                                  " (but the instance has been renamed in"
2519                                  " Ganeti)" % (old_file_storage_dir,
2520                                                new_file_storage_dir))
2521
2522     _StartInstanceDisks(self, inst, None)
2523     try:
2524       if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2525                                                old_name,
2526                                                "sda", "sdb"):
2527         msg = ("Could not run OS rename script for instance %s on node %s"
2528                " (but the instance has been renamed in Ganeti)" %
2529                (inst.name, inst.primary_node))
2530         logger.Error(msg)
2531     finally:
2532       _ShutdownInstanceDisks(self, inst)
2533
2534
2535 class LURemoveInstance(LogicalUnit):
2536   """Remove an instance.
2537
2538   """
2539   HPATH = "instance-remove"
2540   HTYPE = constants.HTYPE_INSTANCE
2541   _OP_REQP = ["instance_name", "ignore_failures"]
2542   REQ_BGL = False
2543
2544   def ExpandNames(self):
2545     self._ExpandAndLockInstance()
2546     self.needed_locks[locking.LEVEL_NODE] = []
2547     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2548
2549   def DeclareLocks(self, level):
2550     if level == locking.LEVEL_NODE:
2551       self._LockInstancesNodes()
2552
2553   def BuildHooksEnv(self):
2554     """Build hooks env.
2555
2556     This runs on master, primary and secondary nodes of the instance.
2557
2558     """
2559     env = _BuildInstanceHookEnvByObject(self, self.instance)
2560     nl = [self.cfg.GetMasterNode()]
2561     return env, nl, nl
2562
2563   def CheckPrereq(self):
2564     """Check prerequisites.
2565
2566     This checks that the instance is in the cluster.
2567
2568     """
2569     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2570     assert self.instance is not None, \
2571       "Cannot retrieve locked instance %s" % self.op.instance_name
2572
2573   def Exec(self, feedback_fn):
2574     """Remove the instance.
2575
2576     """
2577     instance = self.instance
2578     logger.Info("shutting down instance %s on node %s" %
2579                 (instance.name, instance.primary_node))
2580
2581     if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2582       if self.op.ignore_failures:
2583         feedback_fn("Warning: can't shutdown instance")
2584       else:
2585         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2586                                  (instance.name, instance.primary_node))
2587
2588     logger.Info("removing block devices for instance %s" % instance.name)
2589
2590     if not _RemoveDisks(self, instance):
2591       if self.op.ignore_failures:
2592         feedback_fn("Warning: can't remove instance's disks")
2593       else:
2594         raise errors.OpExecError("Can't remove instance's disks")
2595
2596     logger.Info("removing instance %s out of cluster config" % instance.name)
2597
2598     self.cfg.RemoveInstance(instance.name)
2599     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2600
2601
2602 class LUQueryInstances(NoHooksLU):
2603   """Logical unit for querying instances.
2604
2605   """
2606   _OP_REQP = ["output_fields", "names"]
2607   REQ_BGL = False
2608
2609   def ExpandNames(self):
2610     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2611     hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
2612     bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
2613     self.static_fields = frozenset([
2614       "name", "os", "pnode", "snodes",
2615       "admin_state", "admin_ram",
2616       "disk_template", "ip", "mac", "bridge",
2617       "sda_size", "sdb_size", "vcpus", "tags",
2618       "network_port",
2619       "serial_no", "hypervisor", "hvparams",
2620       ] + hvp + bep)
2621
2622     _CheckOutputFields(static=self.static_fields,
2623                        dynamic=self.dynamic_fields,
2624                        selected=self.op.output_fields)
2625
2626     self.needed_locks = {}
2627     self.share_locks[locking.LEVEL_INSTANCE] = 1
2628     self.share_locks[locking.LEVEL_NODE] = 1
2629
2630     if self.op.names:
2631       self.wanted = _GetWantedInstances(self, self.op.names)
2632     else:
2633       self.wanted = locking.ALL_SET
2634
2635     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2636     if self.do_locking:
2637       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2638       self.needed_locks[locking.LEVEL_NODE] = []
2639       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2640
2641   def DeclareLocks(self, level):
2642     if level == locking.LEVEL_NODE and self.do_locking:
2643       self._LockInstancesNodes()
2644
2645   def CheckPrereq(self):
2646     """Check prerequisites.
2647
2648     """
2649     pass
2650
2651   def Exec(self, feedback_fn):
2652     """Computes the list of nodes and their attributes.
2653
2654     """
2655     all_info = self.cfg.GetAllInstancesInfo()
2656     if self.do_locking:
2657       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2658     elif self.wanted != locking.ALL_SET:
2659       instance_names = self.wanted
2660       missing = set(instance_names).difference(all_info.keys())
2661       if missing:
2662         raise errors.OpExecError(
2663           "Some instances were removed before retrieving their data: %s"
2664           % missing)
2665     else:
2666       instance_names = all_info.keys()
2667     instance_list = [all_info[iname] for iname in instance_names]
2668
2669     # begin data gathering
2670
2671     nodes = frozenset([inst.primary_node for inst in instance_list])
2672     hv_list = list(set([inst.hypervisor for inst in instance_list]))
2673
2674     bad_nodes = []
2675     if self.dynamic_fields.intersection(self.op.output_fields):
2676       live_data = {}
2677       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2678       for name in nodes:
2679         result = node_data[name]
2680         if result:
2681           live_data.update(result)
2682         elif result == False:
2683           bad_nodes.append(name)
2684         # else no instance is alive
2685     else:
2686       live_data = dict([(name, {}) for name in instance_names])
2687
2688     # end data gathering
2689
2690     HVPREFIX = "hv/"
2691     BEPREFIX = "be/"
2692     output = []
2693     for instance in instance_list:
2694       iout = []
2695       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2696       i_be = self.cfg.GetClusterInfo().FillBE(instance)
2697       for field in self.op.output_fields:
2698         if field == "name":
2699           val = instance.name
2700         elif field == "os":
2701           val = instance.os
2702         elif field == "pnode":
2703           val = instance.primary_node
2704         elif field == "snodes":
2705           val = list(instance.secondary_nodes)
2706         elif field == "admin_state":
2707           val = (instance.status != "down")
2708         elif field == "oper_state":
2709           if instance.primary_node in bad_nodes:
2710             val = None
2711           else:
2712             val = bool(live_data.get(instance.name))
2713         elif field == "status":
2714           if instance.primary_node in bad_nodes:
2715             val = "ERROR_nodedown"
2716           else:
2717             running = bool(live_data.get(instance.name))
2718             if running:
2719               if instance.status != "down":
2720                 val = "running"
2721               else:
2722                 val = "ERROR_up"
2723             else:
2724               if instance.status != "down":
2725                 val = "ERROR_down"
2726               else:
2727                 val = "ADMIN_down"
2728         elif field == "oper_ram":
2729           if instance.primary_node in bad_nodes:
2730             val = None
2731           elif instance.name in live_data:
2732             val = live_data[instance.name].get("memory", "?")
2733           else:
2734             val = "-"
2735         elif field == "disk_template":
2736           val = instance.disk_template
2737         elif field == "ip":
2738           val = instance.nics[0].ip
2739         elif field == "bridge":
2740           val = instance.nics[0].bridge
2741         elif field == "mac":
2742           val = instance.nics[0].mac
2743         elif field == "sda_size" or field == "sdb_size":
2744           disk = instance.FindDisk(field[:3])
2745           if disk is None:
2746             val = None
2747           else:
2748             val = disk.size
2749         elif field == "tags":
2750           val = list(instance.GetTags())
2751         elif field == "serial_no":
2752           val = instance.serial_no
2753         elif field == "network_port":
2754           val = instance.network_port
2755         elif field == "hypervisor":
2756           val = instance.hypervisor
2757         elif field == "hvparams":
2758           val = i_hv
2759         elif (field.startswith(HVPREFIX) and
2760               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2761           val = i_hv.get(field[len(HVPREFIX):], None)
2762         elif field == "beparams":
2763           val = i_be
2764         elif (field.startswith(BEPREFIX) and
2765               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2766           val = i_be.get(field[len(BEPREFIX):], None)
2767         else:
2768           raise errors.ParameterError(field)
2769         iout.append(val)
2770       output.append(iout)
2771
2772     return output
2773
2774
2775 class LUFailoverInstance(LogicalUnit):
2776   """Failover an instance.
2777
2778   """
2779   HPATH = "instance-failover"
2780   HTYPE = constants.HTYPE_INSTANCE
2781   _OP_REQP = ["instance_name", "ignore_consistency"]
2782   REQ_BGL = False
2783
2784   def ExpandNames(self):
2785     self._ExpandAndLockInstance()
2786     self.needed_locks[locking.LEVEL_NODE] = []
2787     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2788
2789   def DeclareLocks(self, level):
2790     if level == locking.LEVEL_NODE:
2791       self._LockInstancesNodes()
2792
2793   def BuildHooksEnv(self):
2794     """Build hooks env.
2795
2796     This runs on master, primary and secondary nodes of the instance.
2797
2798     """
2799     env = {
2800       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2801       }
2802     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2803     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2804     return env, nl, nl
2805
2806   def CheckPrereq(self):
2807     """Check prerequisites.
2808
2809     This checks that the instance is in the cluster.
2810
2811     """
2812     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2813     assert self.instance is not None, \
2814       "Cannot retrieve locked instance %s" % self.op.instance_name
2815
2816     bep = self.cfg.GetClusterInfo().FillBE(instance)
2817     if instance.disk_template not in constants.DTS_NET_MIRROR:
2818       raise errors.OpPrereqError("Instance's disk layout is not"
2819                                  " network mirrored, cannot failover.")
2820
2821     secondary_nodes = instance.secondary_nodes
2822     if not secondary_nodes:
2823       raise errors.ProgrammerError("no secondary node but using "
2824                                    "a mirrored disk template")
2825
2826     target_node = secondary_nodes[0]
2827     # check memory requirements on the secondary node
2828     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2829                          instance.name, bep[constants.BE_MEMORY],
2830                          instance.hypervisor)
2831
2832     # check bridge existance
2833     brlist = [nic.bridge for nic in instance.nics]
2834     if not self.rpc.call_bridges_exist(target_node, brlist):
2835       raise errors.OpPrereqError("One or more target bridges %s does not"
2836                                  " exist on destination node '%s'" %
2837                                  (brlist, target_node))
2838
2839   def Exec(self, feedback_fn):
2840     """Failover an instance.
2841
2842     The failover is done by shutting it down on its present node and
2843     starting it on the secondary.
2844
2845     """
2846     instance = self.instance
2847
2848     source_node = instance.primary_node
2849     target_node = instance.secondary_nodes[0]
2850
2851     feedback_fn("* checking disk consistency between source and target")
2852     for dev in instance.disks:
2853       # for drbd, these are drbd over lvm
2854       if not _CheckDiskConsistency(self, dev, target_node, False):
2855         if instance.status == "up" and not self.op.ignore_consistency:
2856           raise errors.OpExecError("Disk %s is degraded on target node,"
2857                                    " aborting failover." % dev.iv_name)
2858
2859     feedback_fn("* shutting down instance on source node")
2860     logger.Info("Shutting down instance %s on node %s" %
2861                 (instance.name, source_node))
2862
2863     if not self.rpc.call_instance_shutdown(source_node, instance):
2864       if self.op.ignore_consistency:
2865         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2866                      " anyway. Please make sure node %s is down"  %
2867                      (instance.name, source_node, source_node))
2868       else:
2869         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2870                                  (instance.name, source_node))
2871
2872     feedback_fn("* deactivating the instance's disks on source node")
2873     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2874       raise errors.OpExecError("Can't shut down the instance's disks.")
2875
2876     instance.primary_node = target_node
2877     # distribute new instance config to the other nodes
2878     self.cfg.Update(instance)
2879
2880     # Only start the instance if it's marked as up
2881     if instance.status == "up":
2882       feedback_fn("* activating the instance's disks on target node")
2883       logger.Info("Starting instance %s on node %s" %
2884                   (instance.name, target_node))
2885
2886       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2887                                                ignore_secondaries=True)
2888       if not disks_ok:
2889         _ShutdownInstanceDisks(self, instance)
2890         raise errors.OpExecError("Can't activate the instance's disks")
2891
2892       feedback_fn("* starting the instance on the target node")
2893       if not self.rpc.call_instance_start(target_node, instance, None):
2894         _ShutdownInstanceDisks(self, instance)
2895         raise errors.OpExecError("Could not start instance %s on node %s." %
2896                                  (instance.name, target_node))
2897
2898
2899 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2900   """Create a tree of block devices on the primary node.
2901
2902   This always creates all devices.
2903
2904   """
2905   if device.children:
2906     for child in device.children:
2907       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2908         return False
2909
2910   lu.cfg.SetDiskID(device, node)
2911   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2912                                        instance.name, True, info)
2913   if not new_id:
2914     return False
2915   if device.physical_id is None:
2916     device.physical_id = new_id
2917   return True
2918
2919
2920 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2921   """Create a tree of block devices on a secondary node.
2922
2923   If this device type has to be created on secondaries, create it and
2924   all its children.
2925
2926   If not, just recurse to children keeping the same 'force' value.
2927
2928   """
2929   if device.CreateOnSecondary():
2930     force = True
2931   if device.children:
2932     for child in device.children:
2933       if not _CreateBlockDevOnSecondary(lu, node, instance,
2934                                         child, force, info):
2935         return False
2936
2937   if not force:
2938     return True
2939   lu.cfg.SetDiskID(device, node)
2940   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2941                                        instance.name, False, info)
2942   if not new_id:
2943     return False
2944   if device.physical_id is None:
2945     device.physical_id = new_id
2946   return True
2947
2948
2949 def _GenerateUniqueNames(lu, exts):
2950   """Generate a suitable LV name.
2951
2952   This will generate a logical volume name for the given instance.
2953
2954   """
2955   results = []
2956   for val in exts:
2957     new_id = lu.cfg.GenerateUniqueID()
2958     results.append("%s%s" % (new_id, val))
2959   return results
2960
2961
2962 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
2963                          p_minor, s_minor):
2964   """Generate a drbd8 device complete with its children.
2965
2966   """
2967   port = lu.cfg.AllocatePort()
2968   vgname = lu.cfg.GetVGName()
2969   shared_secret = lu.cfg.GenerateDRBDSecret()
2970   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2971                           logical_id=(vgname, names[0]))
2972   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2973                           logical_id=(vgname, names[1]))
2974   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2975                           logical_id=(primary, secondary, port,
2976                                       p_minor, s_minor,
2977                                       shared_secret),
2978                           children=[dev_data, dev_meta],
2979                           iv_name=iv_name)
2980   return drbd_dev
2981
2982
2983 def _GenerateDiskTemplate(lu, template_name,
2984                           instance_name, primary_node,
2985                           secondary_nodes, disk_sz, swap_sz,
2986                           file_storage_dir, file_driver):
2987   """Generate the entire disk layout for a given template type.
2988
2989   """
2990   #TODO: compute space requirements
2991
2992   vgname = lu.cfg.GetVGName()
2993   if template_name == constants.DT_DISKLESS:
2994     disks = []
2995   elif template_name == constants.DT_PLAIN:
2996     if len(secondary_nodes) != 0:
2997       raise errors.ProgrammerError("Wrong template configuration")
2998
2999     names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3000     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3001                            logical_id=(vgname, names[0]),
3002                            iv_name = "sda")
3003     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3004                            logical_id=(vgname, names[1]),
3005                            iv_name = "sdb")
3006     disks = [sda_dev, sdb_dev]
3007   elif template_name == constants.DT_DRBD8:
3008     if len(secondary_nodes) != 1:
3009       raise errors.ProgrammerError("Wrong template configuration")
3010     remote_node = secondary_nodes[0]
3011     (minor_pa, minor_pb,
3012      minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3013       [primary_node, primary_node, remote_node, remote_node], instance_name)
3014
3015     names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3016                                       ".sdb_data", ".sdb_meta"])
3017     drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3018                                         disk_sz, names[0:2], "sda",
3019                                         minor_pa, minor_sa)
3020     drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3021                                         swap_sz, names[2:4], "sdb",
3022                                         minor_pb, minor_sb)
3023     disks = [drbd_sda_dev, drbd_sdb_dev]
3024   elif template_name == constants.DT_FILE:
3025     if len(secondary_nodes) != 0:
3026       raise errors.ProgrammerError("Wrong template configuration")
3027
3028     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3029                                 iv_name="sda", logical_id=(file_driver,
3030                                 "%s/sda" % file_storage_dir))
3031     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3032                                 iv_name="sdb", logical_id=(file_driver,
3033                                 "%s/sdb" % file_storage_dir))
3034     disks = [file_sda_dev, file_sdb_dev]
3035   else:
3036     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3037   return disks
3038
3039
3040 def _GetInstanceInfoText(instance):
3041   """Compute that text that should be added to the disk's metadata.
3042
3043   """
3044   return "originstname+%s" % instance.name
3045
3046
3047 def _CreateDisks(lu, instance):
3048   """Create all disks for an instance.
3049
3050   This abstracts away some work from AddInstance.
3051
3052   Args:
3053     instance: the instance object
3054
3055   Returns:
3056     True or False showing the success of the creation process
3057
3058   """
3059   info = _GetInstanceInfoText(instance)
3060
3061   if instance.disk_template == constants.DT_FILE:
3062     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3063     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3064                                                  file_storage_dir)
3065
3066     if not result:
3067       logger.Error("Could not connect to node '%s'" % instance.primary_node)
3068       return False
3069
3070     if not result[0]:
3071       logger.Error("failed to create directory '%s'" % file_storage_dir)
3072       return False
3073
3074   for device in instance.disks:
3075     logger.Info("creating volume %s for instance %s" %
3076                 (device.iv_name, instance.name))
3077     #HARDCODE
3078     for secondary_node in instance.secondary_nodes:
3079       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3080                                         device, False, info):
3081         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3082                      (device.iv_name, device, secondary_node))
3083         return False
3084     #HARDCODE
3085     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3086                                     instance, device, info):
3087       logger.Error("failed to create volume %s on primary!" %
3088                    device.iv_name)
3089       return False
3090
3091   return True
3092
3093
3094 def _RemoveDisks(lu, instance):
3095   """Remove all disks for an instance.
3096
3097   This abstracts away some work from `AddInstance()` and
3098   `RemoveInstance()`. Note that in case some of the devices couldn't
3099   be removed, the removal will continue with the other ones (compare
3100   with `_CreateDisks()`).
3101
3102   Args:
3103     instance: the instance object
3104
3105   Returns:
3106     True or False showing the success of the removal proces
3107
3108   """
3109   logger.Info("removing block devices for instance %s" % instance.name)
3110
3111   result = True
3112   for device in instance.disks:
3113     for node, disk in device.ComputeNodeTree(instance.primary_node):
3114       lu.cfg.SetDiskID(disk, node)
3115       if not lu.rpc.call_blockdev_remove(node, disk):
3116         logger.Error("could not remove block device %s on node %s,"
3117                      " continuing anyway" %
3118                      (device.iv_name, node))
3119         result = False
3120
3121   if instance.disk_template == constants.DT_FILE:
3122     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3123     if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3124                                                file_storage_dir):
3125       logger.Error("could not remove directory '%s'" % file_storage_dir)
3126       result = False
3127
3128   return result
3129
3130
3131 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3132   """Compute disk size requirements in the volume group
3133
3134   This is currently hard-coded for the two-drive layout.
3135
3136   """
3137   # Required free disk space as a function of disk and swap space
3138   req_size_dict = {
3139     constants.DT_DISKLESS: None,
3140     constants.DT_PLAIN: disk_size + swap_size,
3141     # 256 MB are added for drbd metadata, 128MB for each drbd device
3142     constants.DT_DRBD8: disk_size + swap_size + 256,
3143     constants.DT_FILE: None,
3144   }
3145
3146   if disk_template not in req_size_dict:
3147     raise errors.ProgrammerError("Disk template '%s' size requirement"
3148                                  " is unknown" %  disk_template)
3149
3150   return req_size_dict[disk_template]
3151
3152
3153 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3154   """Hypervisor parameter validation.
3155
3156   This function abstract the hypervisor parameter validation to be
3157   used in both instance create and instance modify.
3158
3159   @type lu: L{LogicalUnit}
3160   @param lu: the logical unit for which we check
3161   @type nodenames: list
3162   @param nodenames: the list of nodes on which we should check
3163   @type hvname: string
3164   @param hvname: the name of the hypervisor we should use
3165   @type hvparams: dict
3166   @param hvparams: the parameters which we need to check
3167   @raise errors.OpPrereqError: if the parameters are not valid
3168
3169   """
3170   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3171                                                   hvname,
3172                                                   hvparams)
3173   for node in nodenames:
3174     info = hvinfo.get(node, None)
3175     if not info or not isinstance(info, (tuple, list)):
3176       raise errors.OpPrereqError("Cannot get current information"
3177                                  " from node '%s' (%s)" % (node, info))
3178     if not info[0]:
3179       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3180                                  " %s" % info[1])
3181
3182
3183 class LUCreateInstance(LogicalUnit):
3184   """Create an instance.
3185
3186   """
3187   HPATH = "instance-add"
3188   HTYPE = constants.HTYPE_INSTANCE
3189   _OP_REQP = ["instance_name", "disk_size",
3190               "disk_template", "swap_size", "mode", "start",
3191               "wait_for_sync", "ip_check", "mac",
3192               "hvparams", "beparams"]
3193   REQ_BGL = False
3194
3195   def _ExpandNode(self, node):
3196     """Expands and checks one node name.
3197
3198     """
3199     node_full = self.cfg.ExpandNodeName(node)
3200     if node_full is None:
3201       raise errors.OpPrereqError("Unknown node %s" % node)
3202     return node_full
3203
3204   def ExpandNames(self):
3205     """ExpandNames for CreateInstance.
3206
3207     Figure out the right locks for instance creation.
3208
3209     """
3210     self.needed_locks = {}
3211
3212     # set optional parameters to none if they don't exist
3213     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3214       if not hasattr(self.op, attr):
3215         setattr(self.op, attr, None)
3216
3217     # cheap checks, mostly valid constants given
3218
3219     # verify creation mode
3220     if self.op.mode not in (constants.INSTANCE_CREATE,
3221                             constants.INSTANCE_IMPORT):
3222       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3223                                  self.op.mode)
3224
3225     # disk template and mirror node verification
3226     if self.op.disk_template not in constants.DISK_TEMPLATES:
3227       raise errors.OpPrereqError("Invalid disk template name")
3228
3229     if self.op.hypervisor is None:
3230       self.op.hypervisor = self.cfg.GetHypervisorType()
3231
3232     cluster = self.cfg.GetClusterInfo()
3233     enabled_hvs = cluster.enabled_hypervisors
3234     if self.op.hypervisor not in enabled_hvs:
3235       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3236                                  " cluster (%s)" % (self.op.hypervisor,
3237                                   ",".join(enabled_hvs)))
3238
3239     # check hypervisor parameter syntax (locally)
3240
3241     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3242                                   self.op.hvparams)
3243     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3244     hv_type.CheckParameterSyntax(filled_hvp)
3245
3246     # fill and remember the beparams dict
3247     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3248                                     self.op.beparams)
3249
3250     #### instance parameters check
3251
3252     # instance name verification
3253     hostname1 = utils.HostInfo(self.op.instance_name)
3254     self.op.instance_name = instance_name = hostname1.name
3255
3256     # this is just a preventive check, but someone might still add this
3257     # instance in the meantime, and creation will fail at lock-add time
3258     if instance_name in self.cfg.GetInstanceList():
3259       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3260                                  instance_name)
3261
3262     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3263
3264     # ip validity checks
3265     ip = getattr(self.op, "ip", None)
3266     if ip is None or ip.lower() == "none":
3267       inst_ip = None
3268     elif ip.lower() == "auto":
3269       inst_ip = hostname1.ip
3270     else:
3271       if not utils.IsValidIP(ip):
3272         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3273                                    " like a valid IP" % ip)
3274       inst_ip = ip
3275     self.inst_ip = self.op.ip = inst_ip
3276     # used in CheckPrereq for ip ping check
3277     self.check_ip = hostname1.ip
3278
3279     # MAC address verification
3280     if self.op.mac != "auto":
3281       if not utils.IsValidMac(self.op.mac.lower()):
3282         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3283                                    self.op.mac)
3284
3285     # file storage checks
3286     if (self.op.file_driver and
3287         not self.op.file_driver in constants.FILE_DRIVER):
3288       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3289                                  self.op.file_driver)
3290
3291     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3292       raise errors.OpPrereqError("File storage directory path not absolute")
3293
3294     ### Node/iallocator related checks
3295     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3296       raise errors.OpPrereqError("One and only one of iallocator and primary"
3297                                  " node must be given")
3298
3299     if self.op.iallocator:
3300       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3301     else:
3302       self.op.pnode = self._ExpandNode(self.op.pnode)
3303       nodelist = [self.op.pnode]
3304       if self.op.snode is not None:
3305         self.op.snode = self._ExpandNode(self.op.snode)
3306         nodelist.append(self.op.snode)
3307       self.needed_locks[locking.LEVEL_NODE] = nodelist
3308
3309     # in case of import lock the source node too
3310     if self.op.mode == constants.INSTANCE_IMPORT:
3311       src_node = getattr(self.op, "src_node", None)
3312       src_path = getattr(self.op, "src_path", None)
3313
3314       if src_node is None or src_path is None:
3315         raise errors.OpPrereqError("Importing an instance requires source"
3316                                    " node and path options")
3317
3318       if not os.path.isabs(src_path):
3319         raise errors.OpPrereqError("The source path must be absolute")
3320
3321       self.op.src_node = src_node = self._ExpandNode(src_node)
3322       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3323         self.needed_locks[locking.LEVEL_NODE].append(src_node)
3324
3325     else: # INSTANCE_CREATE
3326       if getattr(self.op, "os_type", None) is None:
3327         raise errors.OpPrereqError("No guest OS specified")
3328
3329   def _RunAllocator(self):
3330     """Run the allocator based on input opcode.
3331
3332     """
3333     disks = [{"size": self.op.disk_size, "mode": "w"},
3334              {"size": self.op.swap_size, "mode": "w"}]
3335     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3336              "bridge": self.op.bridge}]
3337     ial = IAllocator(self,
3338                      mode=constants.IALLOCATOR_MODE_ALLOC,
3339                      name=self.op.instance_name,
3340                      disk_template=self.op.disk_template,
3341                      tags=[],
3342                      os=self.op.os_type,
3343                      vcpus=self.be_full[constants.BE_VCPUS],
3344                      mem_size=self.be_full[constants.BE_MEMORY],
3345                      disks=disks,
3346                      nics=nics,
3347                      )
3348
3349     ial.Run(self.op.iallocator)
3350
3351     if not ial.success:
3352       raise errors.OpPrereqError("Can't compute nodes using"
3353                                  " iallocator '%s': %s" % (self.op.iallocator,
3354                                                            ial.info))
3355     if len(ial.nodes) != ial.required_nodes:
3356       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3357                                  " of nodes (%s), required %s" %
3358                                  (self.op.iallocator, len(ial.nodes),
3359                                   ial.required_nodes))
3360     self.op.pnode = ial.nodes[0]
3361     logger.ToStdout("Selected nodes for the instance: %s" %
3362                     (", ".join(ial.nodes),))
3363     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3364                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3365     if ial.required_nodes == 2:
3366       self.op.snode = ial.nodes[1]
3367
3368   def BuildHooksEnv(self):
3369     """Build hooks env.
3370
3371     This runs on master, primary and secondary nodes of the instance.
3372
3373     """
3374     env = {
3375       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3376       "INSTANCE_DISK_SIZE": self.op.disk_size,
3377       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3378       "INSTANCE_ADD_MODE": self.op.mode,
3379       }
3380     if self.op.mode == constants.INSTANCE_IMPORT:
3381       env["INSTANCE_SRC_NODE"] = self.op.src_node
3382       env["INSTANCE_SRC_PATH"] = self.op.src_path
3383       env["INSTANCE_SRC_IMAGE"] = self.src_image
3384
3385     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3386       primary_node=self.op.pnode,
3387       secondary_nodes=self.secondaries,
3388       status=self.instance_status,
3389       os_type=self.op.os_type,
3390       memory=self.be_full[constants.BE_MEMORY],
3391       vcpus=self.be_full[constants.BE_VCPUS],
3392       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3393     ))
3394
3395     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3396           self.secondaries)
3397     return env, nl, nl
3398
3399
3400   def CheckPrereq(self):
3401     """Check prerequisites.
3402
3403     """
3404     if (not self.cfg.GetVGName() and
3405         self.op.disk_template not in constants.DTS_NOT_LVM):
3406       raise errors.OpPrereqError("Cluster does not support lvm-based"
3407                                  " instances")
3408
3409
3410     if self.op.mode == constants.INSTANCE_IMPORT:
3411       src_node = self.op.src_node
3412       src_path = self.op.src_path
3413
3414       export_info = self.rpc.call_export_info(src_node, src_path)
3415
3416       if not export_info:
3417         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3418
3419       if not export_info.has_section(constants.INISECT_EXP):
3420         raise errors.ProgrammerError("Corrupted export config")
3421
3422       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3423       if (int(ei_version) != constants.EXPORT_VERSION):
3424         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3425                                    (ei_version, constants.EXPORT_VERSION))
3426
3427       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3428         raise errors.OpPrereqError("Can't import instance with more than"
3429                                    " one data disk")
3430
3431       # FIXME: are the old os-es, disk sizes, etc. useful?
3432       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3433       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3434                                                          'disk0_dump'))
3435       self.src_image = diskimage
3436
3437     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3438
3439     if self.op.start and not self.op.ip_check:
3440       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3441                                  " adding an instance in start mode")
3442
3443     if self.op.ip_check:
3444       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3445         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3446                                    (self.check_ip, self.op.instance_name))
3447
3448     # bridge verification
3449     bridge = getattr(self.op, "bridge", None)
3450     if bridge is None:
3451       self.op.bridge = self.cfg.GetDefBridge()
3452     else:
3453       self.op.bridge = bridge
3454
3455     #### allocator run
3456
3457     if self.op.iallocator is not None:
3458       self._RunAllocator()
3459
3460     #### node related checks
3461
3462     # check primary node
3463     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3464     assert self.pnode is not None, \
3465       "Cannot retrieve locked node %s" % self.op.pnode
3466     self.secondaries = []
3467
3468     # mirror node verification
3469     if self.op.disk_template in constants.DTS_NET_MIRROR:
3470       if self.op.snode is None:
3471         raise errors.OpPrereqError("The networked disk templates need"
3472                                    " a mirror node")
3473       if self.op.snode == pnode.name:
3474         raise errors.OpPrereqError("The secondary node cannot be"
3475                                    " the primary node.")
3476       self.secondaries.append(self.op.snode)
3477
3478     nodenames = [pnode.name] + self.secondaries
3479
3480     req_size = _ComputeDiskSize(self.op.disk_template,
3481                                 self.op.disk_size, self.op.swap_size)
3482
3483     # Check lv size requirements
3484     if req_size is not None:
3485       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3486                                          self.op.hypervisor)
3487       for node in nodenames:
3488         info = nodeinfo.get(node, None)
3489         if not info:
3490           raise errors.OpPrereqError("Cannot get current information"
3491                                      " from node '%s'" % node)
3492         vg_free = info.get('vg_free', None)
3493         if not isinstance(vg_free, int):
3494           raise errors.OpPrereqError("Can't compute free disk space on"
3495                                      " node %s" % node)
3496         if req_size > info['vg_free']:
3497           raise errors.OpPrereqError("Not enough disk space on target node %s."
3498                                      " %d MB available, %d MB required" %
3499                                      (node, info['vg_free'], req_size))
3500
3501     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3502
3503     # os verification
3504     os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3505     if not os_obj:
3506       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3507                                  " primary node"  % self.op.os_type)
3508
3509     # bridge check on primary node
3510     if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3511       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3512                                  " destination node '%s'" %
3513                                  (self.op.bridge, pnode.name))
3514
3515     # memory check on primary node
3516     if self.op.start:
3517       _CheckNodeFreeMemory(self, self.pnode.name,
3518                            "creating instance %s" % self.op.instance_name,
3519                            self.be_full[constants.BE_MEMORY],
3520                            self.op.hypervisor)
3521
3522     if self.op.start:
3523       self.instance_status = 'up'
3524     else:
3525       self.instance_status = 'down'
3526
3527   def Exec(self, feedback_fn):
3528     """Create and add the instance to the cluster.
3529
3530     """
3531     instance = self.op.instance_name
3532     pnode_name = self.pnode.name
3533
3534     if self.op.mac == "auto":
3535       mac_address = self.cfg.GenerateMAC()
3536     else:
3537       mac_address = self.op.mac
3538
3539     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3540     if self.inst_ip is not None:
3541       nic.ip = self.inst_ip
3542
3543     ht_kind = self.op.hypervisor
3544     if ht_kind in constants.HTS_REQ_PORT:
3545       network_port = self.cfg.AllocatePort()
3546     else:
3547       network_port = None
3548
3549     ##if self.op.vnc_bind_address is None:
3550     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3551
3552     # this is needed because os.path.join does not accept None arguments
3553     if self.op.file_storage_dir is None:
3554       string_file_storage_dir = ""
3555     else:
3556       string_file_storage_dir = self.op.file_storage_dir
3557
3558     # build the full file storage dir path
3559     file_storage_dir = os.path.normpath(os.path.join(
3560                                         self.cfg.GetFileStorageDir(),
3561                                         string_file_storage_dir, instance))
3562
3563
3564     disks = _GenerateDiskTemplate(self,
3565                                   self.op.disk_template,
3566                                   instance, pnode_name,
3567                                   self.secondaries, self.op.disk_size,
3568                                   self.op.swap_size,
3569                                   file_storage_dir,
3570                                   self.op.file_driver)
3571
3572     iobj = objects.Instance(name=instance, os=self.op.os_type,
3573                             primary_node=pnode_name,
3574                             nics=[nic], disks=disks,
3575                             disk_template=self.op.disk_template,
3576                             status=self.instance_status,
3577                             network_port=network_port,
3578                             beparams=self.op.beparams,
3579                             hvparams=self.op.hvparams,
3580                             hypervisor=self.op.hypervisor,
3581                             )
3582
3583     feedback_fn("* creating instance disks...")
3584     if not _CreateDisks(self, iobj):
3585       _RemoveDisks(self, iobj)
3586       self.cfg.ReleaseDRBDMinors(instance)
3587       raise errors.OpExecError("Device creation failed, reverting...")
3588
3589     feedback_fn("adding instance %s to cluster config" % instance)
3590
3591     self.cfg.AddInstance(iobj)
3592     # Declare that we don't want to remove the instance lock anymore, as we've
3593     # added the instance to the config
3594     del self.remove_locks[locking.LEVEL_INSTANCE]
3595     # Remove the temp. assignements for the instance's drbds
3596     self.cfg.ReleaseDRBDMinors(instance)
3597
3598     if self.op.wait_for_sync:
3599       disk_abort = not _WaitForSync(self, iobj)
3600     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3601       # make sure the disks are not degraded (still sync-ing is ok)
3602       time.sleep(15)
3603       feedback_fn("* checking mirrors status")
3604       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3605     else:
3606       disk_abort = False
3607
3608     if disk_abort:
3609       _RemoveDisks(self, iobj)
3610       self.cfg.RemoveInstance(iobj.name)
3611       # Make sure the instance lock gets removed
3612       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3613       raise errors.OpExecError("There are some degraded disks for"
3614                                " this instance")
3615
3616     feedback_fn("creating os for instance %s on node %s" %
3617                 (instance, pnode_name))
3618
3619     if iobj.disk_template != constants.DT_DISKLESS:
3620       if self.op.mode == constants.INSTANCE_CREATE:
3621         feedback_fn("* running the instance OS create scripts...")
3622         if not self.rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3623           raise errors.OpExecError("could not add os for instance %s"
3624                                    " on node %s" %
3625                                    (instance, pnode_name))
3626
3627       elif self.op.mode == constants.INSTANCE_IMPORT:
3628         feedback_fn("* running the instance OS import scripts...")
3629         src_node = self.op.src_node
3630         src_image = self.src_image
3631         cluster_name = self.cfg.GetClusterName()
3632         if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3633                                                 src_node, src_image,
3634                                                 cluster_name):
3635           raise errors.OpExecError("Could not import os for instance"
3636                                    " %s on node %s" %
3637                                    (instance, pnode_name))
3638       else:
3639         # also checked in the prereq part
3640         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3641                                      % self.op.mode)
3642
3643     if self.op.start:
3644       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3645       feedback_fn("* starting instance...")
3646       if not self.rpc.call_instance_start(pnode_name, iobj, None):
3647         raise errors.OpExecError("Could not start instance")
3648
3649
3650 class LUConnectConsole(NoHooksLU):
3651   """Connect to an instance's console.
3652
3653   This is somewhat special in that it returns the command line that
3654   you need to run on the master node in order to connect to the
3655   console.
3656
3657   """
3658   _OP_REQP = ["instance_name"]
3659   REQ_BGL = False
3660
3661   def ExpandNames(self):
3662     self._ExpandAndLockInstance()
3663
3664   def CheckPrereq(self):
3665     """Check prerequisites.
3666
3667     This checks that the instance is in the cluster.
3668
3669     """
3670     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3671     assert self.instance is not None, \
3672       "Cannot retrieve locked instance %s" % self.op.instance_name
3673
3674   def Exec(self, feedback_fn):
3675     """Connect to the console of an instance
3676
3677     """
3678     instance = self.instance
3679     node = instance.primary_node
3680
3681     node_insts = self.rpc.call_instance_list([node],
3682                                              [instance.hypervisor])[node]
3683     if node_insts is False:
3684       raise errors.OpExecError("Can't connect to node %s." % node)
3685
3686     if instance.name not in node_insts:
3687       raise errors.OpExecError("Instance %s is not running." % instance.name)
3688
3689     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3690
3691     hyper = hypervisor.GetHypervisor(instance.hypervisor)
3692     console_cmd = hyper.GetShellCommandForConsole(instance)
3693
3694     # build ssh cmdline
3695     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3696
3697
3698 class LUReplaceDisks(LogicalUnit):
3699   """Replace the disks of an instance.
3700
3701   """
3702   HPATH = "mirrors-replace"
3703   HTYPE = constants.HTYPE_INSTANCE
3704   _OP_REQP = ["instance_name", "mode", "disks"]
3705   REQ_BGL = False
3706
3707   def ExpandNames(self):
3708     self._ExpandAndLockInstance()
3709
3710     if not hasattr(self.op, "remote_node"):
3711       self.op.remote_node = None
3712
3713     ia_name = getattr(self.op, "iallocator", None)
3714     if ia_name is not None:
3715       if self.op.remote_node is not None:
3716         raise errors.OpPrereqError("Give either the iallocator or the new"
3717                                    " secondary, not both")
3718       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3719     elif self.op.remote_node is not None:
3720       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3721       if remote_node is None:
3722         raise errors.OpPrereqError("Node '%s' not known" %
3723                                    self.op.remote_node)
3724       self.op.remote_node = remote_node
3725       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3726       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3727     else:
3728       self.needed_locks[locking.LEVEL_NODE] = []
3729       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3730
3731   def DeclareLocks(self, level):
3732     # If we're not already locking all nodes in the set we have to declare the
3733     # instance's primary/secondary nodes.
3734     if (level == locking.LEVEL_NODE and
3735         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3736       self._LockInstancesNodes()
3737
3738   def _RunAllocator(self):
3739     """Compute a new secondary node using an IAllocator.
3740
3741     """
3742     ial = IAllocator(self,
3743                      mode=constants.IALLOCATOR_MODE_RELOC,
3744                      name=self.op.instance_name,
3745                      relocate_from=[self.sec_node])
3746
3747     ial.Run(self.op.iallocator)
3748
3749     if not ial.success:
3750       raise errors.OpPrereqError("Can't compute nodes using"
3751                                  " iallocator '%s': %s" % (self.op.iallocator,
3752                                                            ial.info))
3753     if len(ial.nodes) != ial.required_nodes:
3754       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3755                                  " of nodes (%s), required %s" %
3756                                  (len(ial.nodes), ial.required_nodes))
3757     self.op.remote_node = ial.nodes[0]
3758     logger.ToStdout("Selected new secondary for the instance: %s" %
3759                     self.op.remote_node)
3760
3761   def BuildHooksEnv(self):
3762     """Build hooks env.
3763
3764     This runs on the master, the primary and all the secondaries.
3765
3766     """
3767     env = {
3768       "MODE": self.op.mode,
3769       "NEW_SECONDARY": self.op.remote_node,
3770       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3771       }
3772     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3773     nl = [
3774       self.cfg.GetMasterNode(),
3775       self.instance.primary_node,
3776       ]
3777     if self.op.remote_node is not None:
3778       nl.append(self.op.remote_node)
3779     return env, nl, nl
3780
3781   def CheckPrereq(self):
3782     """Check prerequisites.
3783
3784     This checks that the instance is in the cluster.
3785
3786     """
3787     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3788     assert instance is not None, \
3789       "Cannot retrieve locked instance %s" % self.op.instance_name
3790     self.instance = instance
3791
3792     if instance.disk_template not in constants.DTS_NET_MIRROR:
3793       raise errors.OpPrereqError("Instance's disk layout is not"
3794                                  " network mirrored.")
3795
3796     if len(instance.secondary_nodes) != 1:
3797       raise errors.OpPrereqError("The instance has a strange layout,"
3798                                  " expected one secondary but found %d" %
3799                                  len(instance.secondary_nodes))
3800
3801     self.sec_node = instance.secondary_nodes[0]
3802
3803     ia_name = getattr(self.op, "iallocator", None)
3804     if ia_name is not None:
3805       self._RunAllocator()
3806
3807     remote_node = self.op.remote_node
3808     if remote_node is not None:
3809       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3810       assert self.remote_node_info is not None, \
3811         "Cannot retrieve locked node %s" % remote_node
3812     else:
3813       self.remote_node_info = None
3814     if remote_node == instance.primary_node:
3815       raise errors.OpPrereqError("The specified node is the primary node of"
3816                                  " the instance.")
3817     elif remote_node == self.sec_node:
3818       if self.op.mode == constants.REPLACE_DISK_SEC:
3819         # this is for DRBD8, where we can't execute the same mode of
3820         # replacement as for drbd7 (no different port allocated)
3821         raise errors.OpPrereqError("Same secondary given, cannot execute"
3822                                    " replacement")
3823     if instance.disk_template == constants.DT_DRBD8:
3824       if (self.op.mode == constants.REPLACE_DISK_ALL and
3825           remote_node is not None):
3826         # switch to replace secondary mode
3827         self.op.mode = constants.REPLACE_DISK_SEC
3828
3829       if self.op.mode == constants.REPLACE_DISK_ALL:
3830         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3831                                    " secondary disk replacement, not"
3832                                    " both at once")
3833       elif self.op.mode == constants.REPLACE_DISK_PRI:
3834         if remote_node is not None:
3835           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3836                                      " the secondary while doing a primary"
3837                                      " node disk replacement")
3838         self.tgt_node = instance.primary_node
3839         self.oth_node = instance.secondary_nodes[0]
3840       elif self.op.mode == constants.REPLACE_DISK_SEC:
3841         self.new_node = remote_node # this can be None, in which case
3842                                     # we don't change the secondary
3843         self.tgt_node = instance.secondary_nodes[0]
3844         self.oth_node = instance.primary_node
3845       else:
3846         raise errors.ProgrammerError("Unhandled disk replace mode")
3847
3848     for name in self.op.disks:
3849       if instance.FindDisk(name) is None:
3850         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3851                                    (name, instance.name))
3852
3853   def _ExecD8DiskOnly(self, feedback_fn):
3854     """Replace a disk on the primary or secondary for dbrd8.
3855
3856     The algorithm for replace is quite complicated:
3857       - for each disk to be replaced:
3858         - create new LVs on the target node with unique names
3859         - detach old LVs from the drbd device
3860         - rename old LVs to name_replaced.<time_t>
3861         - rename new LVs to old LVs
3862         - attach the new LVs (with the old names now) to the drbd device
3863       - wait for sync across all devices
3864       - for each modified disk:
3865         - remove old LVs (which have the name name_replaces.<time_t>)
3866
3867     Failures are not very well handled.
3868
3869     """
3870     steps_total = 6
3871     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3872     instance = self.instance
3873     iv_names = {}
3874     vgname = self.cfg.GetVGName()
3875     # start of work
3876     cfg = self.cfg
3877     tgt_node = self.tgt_node
3878     oth_node = self.oth_node
3879
3880     # Step: check device activation
3881     self.proc.LogStep(1, steps_total, "check device existence")
3882     info("checking volume groups")
3883     my_vg = cfg.GetVGName()
3884     results = self.rpc.call_vg_list([oth_node, tgt_node])
3885     if not results:
3886       raise errors.OpExecError("Can't list volume groups on the nodes")
3887     for node in oth_node, tgt_node:
3888       res = results.get(node, False)
3889       if not res or my_vg not in res:
3890         raise errors.OpExecError("Volume group '%s' not found on %s" %
3891                                  (my_vg, node))
3892     for dev in instance.disks:
3893       if not dev.iv_name in self.op.disks:
3894         continue
3895       for node in tgt_node, oth_node:
3896         info("checking %s on %s" % (dev.iv_name, node))
3897         cfg.SetDiskID(dev, node)
3898         if not self.rpc.call_blockdev_find(node, dev):
3899           raise errors.OpExecError("Can't find device %s on node %s" %
3900                                    (dev.iv_name, node))
3901
3902     # Step: check other node consistency
3903     self.proc.LogStep(2, steps_total, "check peer consistency")
3904     for dev in instance.disks:
3905       if not dev.iv_name in self.op.disks:
3906         continue
3907       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3908       if not _CheckDiskConsistency(self, dev, oth_node,
3909                                    oth_node==instance.primary_node):
3910         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3911                                  " to replace disks on this node (%s)" %
3912                                  (oth_node, tgt_node))
3913
3914     # Step: create new storage
3915     self.proc.LogStep(3, steps_total, "allocate new storage")
3916     for dev in instance.disks:
3917       if not dev.iv_name in self.op.disks:
3918         continue
3919       size = dev.size
3920       cfg.SetDiskID(dev, tgt_node)
3921       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3922       names = _GenerateUniqueNames(self, lv_names)
3923       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3924                              logical_id=(vgname, names[0]))
3925       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3926                              logical_id=(vgname, names[1]))
3927       new_lvs = [lv_data, lv_meta]
3928       old_lvs = dev.children
3929       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3930       info("creating new local storage on %s for %s" %
3931            (tgt_node, dev.iv_name))
3932       # since we *always* want to create this LV, we use the
3933       # _Create...OnPrimary (which forces the creation), even if we
3934       # are talking about the secondary node
3935       for new_lv in new_lvs:
3936         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
3937                                         _GetInstanceInfoText(instance)):
3938           raise errors.OpExecError("Failed to create new LV named '%s' on"
3939                                    " node '%s'" %
3940                                    (new_lv.logical_id[1], tgt_node))
3941
3942     # Step: for each lv, detach+rename*2+attach
3943     self.proc.LogStep(4, steps_total, "change drbd configuration")
3944     for dev, old_lvs, new_lvs in iv_names.itervalues():
3945       info("detaching %s drbd from local storage" % dev.iv_name)
3946       if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3947         raise errors.OpExecError("Can't detach drbd from local storage on node"
3948                                  " %s for device %s" % (tgt_node, dev.iv_name))
3949       #dev.children = []
3950       #cfg.Update(instance)
3951
3952       # ok, we created the new LVs, so now we know we have the needed
3953       # storage; as such, we proceed on the target node to rename
3954       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3955       # using the assumption that logical_id == physical_id (which in
3956       # turn is the unique_id on that node)
3957
3958       # FIXME(iustin): use a better name for the replaced LVs
3959       temp_suffix = int(time.time())
3960       ren_fn = lambda d, suff: (d.physical_id[0],
3961                                 d.physical_id[1] + "_replaced-%s" % suff)
3962       # build the rename list based on what LVs exist on the node
3963       rlist = []
3964       for to_ren in old_lvs:
3965         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
3966         if find_res is not None: # device exists
3967           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3968
3969       info("renaming the old LVs on the target node")
3970       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3971         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3972       # now we rename the new LVs to the old LVs
3973       info("renaming the new LVs on the target node")
3974       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3975       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3976         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3977
3978       for old, new in zip(old_lvs, new_lvs):
3979         new.logical_id = old.logical_id
3980         cfg.SetDiskID(new, tgt_node)
3981
3982       for disk in old_lvs:
3983         disk.logical_id = ren_fn(disk, temp_suffix)
3984         cfg.SetDiskID(disk, tgt_node)
3985
3986       # now that the new lvs have the old name, we can add them to the device
3987       info("adding new mirror component on %s" % tgt_node)
3988       if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3989         for new_lv in new_lvs:
3990           if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
3991             warning("Can't rollback device %s", hint="manually cleanup unused"
3992                     " logical volumes")
3993         raise errors.OpExecError("Can't add local storage to drbd")
3994
3995       dev.children = new_lvs
3996       cfg.Update(instance)
3997
3998     # Step: wait for sync
3999
4000     # this can fail as the old devices are degraded and _WaitForSync
4001     # does a combined result over all disks, so we don't check its
4002     # return value
4003     self.proc.LogStep(5, steps_total, "sync devices")
4004     _WaitForSync(self, instance, unlock=True)
4005
4006     # so check manually all the devices
4007     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4008       cfg.SetDiskID(dev, instance.primary_node)
4009       is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4010       if is_degr:
4011         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4012
4013     # Step: remove old storage
4014     self.proc.LogStep(6, steps_total, "removing old storage")
4015     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4016       info("remove logical volumes for %s" % name)
4017       for lv in old_lvs:
4018         cfg.SetDiskID(lv, tgt_node)
4019         if not self.rpc.call_blockdev_remove(tgt_node, lv):
4020           warning("Can't remove old LV", hint="manually remove unused LVs")
4021           continue
4022
4023   def _ExecD8Secondary(self, feedback_fn):
4024     """Replace the secondary node for drbd8.
4025
4026     The algorithm for replace is quite complicated:
4027       - for all disks of the instance:
4028         - create new LVs on the new node with same names
4029         - shutdown the drbd device on the old secondary
4030         - disconnect the drbd network on the primary
4031         - create the drbd device on the new secondary
4032         - network attach the drbd on the primary, using an artifice:
4033           the drbd code for Attach() will connect to the network if it
4034           finds a device which is connected to the good local disks but
4035           not network enabled
4036       - wait for sync across all devices
4037       - remove all disks from the old secondary
4038
4039     Failures are not very well handled.
4040
4041     """
4042     steps_total = 6
4043     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4044     instance = self.instance
4045     iv_names = {}
4046     vgname = self.cfg.GetVGName()
4047     # start of work
4048     cfg = self.cfg
4049     old_node = self.tgt_node
4050     new_node = self.new_node
4051     pri_node = instance.primary_node
4052
4053     # Step: check device activation
4054     self.proc.LogStep(1, steps_total, "check device existence")
4055     info("checking volume groups")
4056     my_vg = cfg.GetVGName()
4057     results = self.rpc.call_vg_list([pri_node, new_node])
4058     if not results:
4059       raise errors.OpExecError("Can't list volume groups on the nodes")
4060     for node in pri_node, new_node:
4061       res = results.get(node, False)
4062       if not res or my_vg not in res:
4063         raise errors.OpExecError("Volume group '%s' not found on %s" %
4064                                  (my_vg, node))
4065     for dev in instance.disks:
4066       if not dev.iv_name in self.op.disks:
4067         continue
4068       info("checking %s on %s" % (dev.iv_name, pri_node))
4069       cfg.SetDiskID(dev, pri_node)
4070       if not self.rpc.call_blockdev_find(pri_node, dev):
4071         raise errors.OpExecError("Can't find device %s on node %s" %
4072                                  (dev.iv_name, pri_node))
4073
4074     # Step: check other node consistency
4075     self.proc.LogStep(2, steps_total, "check peer consistency")
4076     for dev in instance.disks:
4077       if not dev.iv_name in self.op.disks:
4078         continue
4079       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4080       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4081         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4082                                  " unsafe to replace the secondary" %
4083                                  pri_node)
4084
4085     # Step: create new storage
4086     self.proc.LogStep(3, steps_total, "allocate new storage")
4087     for dev in instance.disks:
4088       size = dev.size
4089       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4090       # since we *always* want to create this LV, we use the
4091       # _Create...OnPrimary (which forces the creation), even if we
4092       # are talking about the secondary node
4093       for new_lv in dev.children:
4094         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4095                                         _GetInstanceInfoText(instance)):
4096           raise errors.OpExecError("Failed to create new LV named '%s' on"
4097                                    " node '%s'" %
4098                                    (new_lv.logical_id[1], new_node))
4099
4100
4101     # Step 4: dbrd minors and drbd setups changes
4102     # after this, we must manually remove the drbd minors on both the
4103     # error and the success paths
4104     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4105                                    instance.name)
4106     logging.debug("Allocated minors %s" % (minors,))
4107     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4108     for dev, new_minor in zip(instance.disks, minors):
4109       size = dev.size
4110       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4111       # create new devices on new_node
4112       if pri_node == dev.logical_id[0]:
4113         new_logical_id = (pri_node, new_node,
4114                           dev.logical_id[2], dev.logical_id[3], new_minor,
4115                           dev.logical_id[5])
4116       else:
4117         new_logical_id = (new_node, pri_node,
4118                           dev.logical_id[2], new_minor, dev.logical_id[4],
4119                           dev.logical_id[5])
4120       iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4121       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4122                     new_logical_id)
4123       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4124                               logical_id=new_logical_id,
4125                               children=dev.children)
4126       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4127                                         new_drbd, False,
4128                                         _GetInstanceInfoText(instance)):
4129         self.cfg.ReleaseDRBDMinors(instance.name)
4130         raise errors.OpExecError("Failed to create new DRBD on"
4131                                  " node '%s'" % new_node)
4132
4133     for dev in instance.disks:
4134       # we have new devices, shutdown the drbd on the old secondary
4135       info("shutting down drbd for %s on old node" % dev.iv_name)
4136       cfg.SetDiskID(dev, old_node)
4137       if not self.rpc.call_blockdev_shutdown(old_node, dev):
4138         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4139                 hint="Please cleanup this device manually as soon as possible")
4140
4141     info("detaching primary drbds from the network (=> standalone)")
4142     done = 0
4143     for dev in instance.disks:
4144       cfg.SetDiskID(dev, pri_node)
4145       # set the network part of the physical (unique in bdev terms) id
4146       # to None, meaning detach from network
4147       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4148       # and 'find' the device, which will 'fix' it to match the
4149       # standalone state
4150       if self.rpc.call_blockdev_find(pri_node, dev):
4151         done += 1
4152       else:
4153         warning("Failed to detach drbd %s from network, unusual case" %
4154                 dev.iv_name)
4155
4156     if not done:
4157       # no detaches succeeded (very unlikely)
4158       self.cfg.ReleaseDRBDMinors(instance.name)
4159       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4160
4161     # if we managed to detach at least one, we update all the disks of
4162     # the instance to point to the new secondary
4163     info("updating instance configuration")
4164     for dev, _, new_logical_id in iv_names.itervalues():
4165       dev.logical_id = new_logical_id
4166       cfg.SetDiskID(dev, pri_node)
4167     cfg.Update(instance)
4168     # we can remove now the temp minors as now the new values are
4169     # written to the config file (and therefore stable)
4170     self.cfg.ReleaseDRBDMinors(instance.name)
4171
4172     # and now perform the drbd attach
4173     info("attaching primary drbds to new secondary (standalone => connected)")
4174     failures = []
4175     for dev in instance.disks:
4176       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4177       # since the attach is smart, it's enough to 'find' the device,
4178       # it will automatically activate the network, if the physical_id
4179       # is correct
4180       cfg.SetDiskID(dev, pri_node)
4181       logging.debug("Disk to attach: %s", dev)
4182       if not self.rpc.call_blockdev_find(pri_node, dev):
4183         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4184                 "please do a gnt-instance info to see the status of disks")
4185
4186     # this can fail as the old devices are degraded and _WaitForSync
4187     # does a combined result over all disks, so we don't check its
4188     # return value
4189     self.proc.LogStep(5, steps_total, "sync devices")
4190     _WaitForSync(self, instance, unlock=True)
4191
4192     # so check manually all the devices
4193     for name, (dev, old_lvs, _) in iv_names.iteritems():
4194       cfg.SetDiskID(dev, pri_node)
4195       is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4196       if is_degr:
4197         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4198
4199     self.proc.LogStep(6, steps_total, "removing old storage")
4200     for name, (dev, old_lvs, _) in iv_names.iteritems():
4201       info("remove logical volumes for %s" % name)
4202       for lv in old_lvs:
4203         cfg.SetDiskID(lv, old_node)
4204         if not self.rpc.call_blockdev_remove(old_node, lv):
4205           warning("Can't remove LV on old secondary",
4206                   hint="Cleanup stale volumes by hand")
4207
4208   def Exec(self, feedback_fn):
4209     """Execute disk replacement.
4210
4211     This dispatches the disk replacement to the appropriate handler.
4212
4213     """
4214     instance = self.instance
4215
4216     # Activate the instance disks if we're replacing them on a down instance
4217     if instance.status == "down":
4218       _StartInstanceDisks(self, instance, True)
4219
4220     if instance.disk_template == constants.DT_DRBD8:
4221       if self.op.remote_node is None:
4222         fn = self._ExecD8DiskOnly
4223       else:
4224         fn = self._ExecD8Secondary
4225     else:
4226       raise errors.ProgrammerError("Unhandled disk replacement case")
4227
4228     ret = fn(feedback_fn)
4229
4230     # Deactivate the instance disks if we're replacing them on a down instance
4231     if instance.status == "down":
4232       _SafeShutdownInstanceDisks(self, instance)
4233
4234     return ret
4235
4236
4237 class LUGrowDisk(LogicalUnit):
4238   """Grow a disk of an instance.
4239
4240   """
4241   HPATH = "disk-grow"
4242   HTYPE = constants.HTYPE_INSTANCE
4243   _OP_REQP = ["instance_name", "disk", "amount"]
4244   REQ_BGL = False
4245
4246   def ExpandNames(self):
4247     self._ExpandAndLockInstance()
4248     self.needed_locks[locking.LEVEL_NODE] = []
4249     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4250
4251   def DeclareLocks(self, level):
4252     if level == locking.LEVEL_NODE:
4253       self._LockInstancesNodes()
4254
4255   def BuildHooksEnv(self):
4256     """Build hooks env.
4257
4258     This runs on the master, the primary and all the secondaries.
4259
4260     """
4261     env = {
4262       "DISK": self.op.disk,
4263       "AMOUNT": self.op.amount,
4264       }
4265     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4266     nl = [
4267       self.cfg.GetMasterNode(),
4268       self.instance.primary_node,
4269       ]
4270     return env, nl, nl
4271
4272   def CheckPrereq(self):
4273     """Check prerequisites.
4274
4275     This checks that the instance is in the cluster.
4276
4277     """
4278     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4279     assert instance is not None, \
4280       "Cannot retrieve locked instance %s" % self.op.instance_name
4281
4282     self.instance = instance
4283
4284     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4285       raise errors.OpPrereqError("Instance's disk layout does not support"
4286                                  " growing.")
4287
4288     if instance.FindDisk(self.op.disk) is None:
4289       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4290                                  (self.op.disk, instance.name))
4291
4292     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4293     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4294                                        instance.hypervisor)
4295     for node in nodenames:
4296       info = nodeinfo.get(node, None)
4297       if not info:
4298         raise errors.OpPrereqError("Cannot get current information"
4299                                    " from node '%s'" % node)
4300       vg_free = info.get('vg_free', None)
4301       if not isinstance(vg_free, int):
4302         raise errors.OpPrereqError("Can't compute free disk space on"
4303                                    " node %s" % node)
4304       if self.op.amount > info['vg_free']:
4305         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4306                                    " %d MiB available, %d MiB required" %
4307                                    (node, info['vg_free'], self.op.amount))
4308
4309   def Exec(self, feedback_fn):
4310     """Execute disk grow.
4311
4312     """
4313     instance = self.instance
4314     disk = instance.FindDisk(self.op.disk)
4315     for node in (instance.secondary_nodes + (instance.primary_node,)):
4316       self.cfg.SetDiskID(disk, node)
4317       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4318       if (not result or not isinstance(result, (list, tuple)) or
4319           len(result) != 2):
4320         raise errors.OpExecError("grow request failed to node %s" % node)
4321       elif not result[0]:
4322         raise errors.OpExecError("grow request failed to node %s: %s" %
4323                                  (node, result[1]))
4324     disk.RecordGrow(self.op.amount)
4325     self.cfg.Update(instance)
4326     return
4327
4328
4329 class LUQueryInstanceData(NoHooksLU):
4330   """Query runtime instance data.
4331
4332   """
4333   _OP_REQP = ["instances", "static"]
4334   REQ_BGL = False
4335
4336   def ExpandNames(self):
4337     self.needed_locks = {}
4338     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4339
4340     if not isinstance(self.op.instances, list):
4341       raise errors.OpPrereqError("Invalid argument type 'instances'")
4342
4343     if self.op.instances:
4344       self.wanted_names = []
4345       for name in self.op.instances:
4346         full_name = self.cfg.ExpandInstanceName(name)
4347         if full_name is None:
4348           raise errors.OpPrereqError("Instance '%s' not known" %
4349                                      self.op.instance_name)
4350         self.wanted_names.append(full_name)
4351       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4352     else:
4353       self.wanted_names = None
4354       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4355
4356     self.needed_locks[locking.LEVEL_NODE] = []
4357     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4358
4359   def DeclareLocks(self, level):
4360     if level == locking.LEVEL_NODE:
4361       self._LockInstancesNodes()
4362
4363   def CheckPrereq(self):
4364     """Check prerequisites.
4365
4366     This only checks the optional instance list against the existing names.
4367
4368     """
4369     if self.wanted_names is None:
4370       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4371
4372     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4373                              in self.wanted_names]
4374     return
4375
4376   def _ComputeDiskStatus(self, instance, snode, dev):
4377     """Compute block device status.
4378
4379     """
4380     static = self.op.static
4381     if not static:
4382       self.cfg.SetDiskID(dev, instance.primary_node)
4383       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4384     else:
4385       dev_pstatus = None
4386
4387     if dev.dev_type in constants.LDS_DRBD:
4388       # we change the snode then (otherwise we use the one passed in)
4389       if dev.logical_id[0] == instance.primary_node:
4390         snode = dev.logical_id[1]
4391       else:
4392         snode = dev.logical_id[0]
4393
4394     if snode and not static:
4395       self.cfg.SetDiskID(dev, snode)
4396       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4397     else:
4398       dev_sstatus = None
4399
4400     if dev.children:
4401       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4402                       for child in dev.children]
4403     else:
4404       dev_children = []
4405
4406     data = {
4407       "iv_name": dev.iv_name,
4408       "dev_type": dev.dev_type,
4409       "logical_id": dev.logical_id,
4410       "physical_id": dev.physical_id,
4411       "pstatus": dev_pstatus,
4412       "sstatus": dev_sstatus,
4413       "children": dev_children,
4414       }
4415
4416     return data
4417
4418   def Exec(self, feedback_fn):
4419     """Gather and return data"""
4420     result = {}
4421
4422     cluster = self.cfg.GetClusterInfo()
4423
4424     for instance in self.wanted_instances:
4425       if not self.op.static:
4426         remote_info = self.rpc.call_instance_info(instance.primary_node,
4427                                                   instance.name,
4428                                                   instance.hypervisor)
4429         if remote_info and "state" in remote_info:
4430           remote_state = "up"
4431         else:
4432           remote_state = "down"
4433       else:
4434         remote_state = None
4435       if instance.status == "down":
4436         config_state = "down"
4437       else:
4438         config_state = "up"
4439
4440       disks = [self._ComputeDiskStatus(instance, None, device)
4441                for device in instance.disks]
4442
4443       idict = {
4444         "name": instance.name,
4445         "config_state": config_state,
4446         "run_state": remote_state,
4447         "pnode": instance.primary_node,
4448         "snodes": instance.secondary_nodes,
4449         "os": instance.os,
4450         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4451         "disks": disks,
4452         "hypervisor": instance.hypervisor,
4453         "network_port": instance.network_port,
4454         "hv_instance": instance.hvparams,
4455         "hv_actual": cluster.FillHV(instance),
4456         "be_instance": instance.beparams,
4457         "be_actual": cluster.FillBE(instance),
4458         }
4459
4460       result[instance.name] = idict
4461
4462     return result
4463
4464
4465 class LUSetInstanceParams(LogicalUnit):
4466   """Modifies an instances's parameters.
4467
4468   """
4469   HPATH = "instance-modify"
4470   HTYPE = constants.HTYPE_INSTANCE
4471   _OP_REQP = ["instance_name", "hvparams"]
4472   REQ_BGL = False
4473
4474   def ExpandNames(self):
4475     self._ExpandAndLockInstance()
4476     self.needed_locks[locking.LEVEL_NODE] = []
4477     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4478
4479
4480   def DeclareLocks(self, level):
4481     if level == locking.LEVEL_NODE:
4482       self._LockInstancesNodes()
4483
4484   def BuildHooksEnv(self):
4485     """Build hooks env.
4486
4487     This runs on the master, primary and secondaries.
4488
4489     """
4490     args = dict()
4491     if constants.BE_MEMORY in self.be_new:
4492       args['memory'] = self.be_new[constants.BE_MEMORY]
4493     if constants.BE_VCPUS in self.be_new:
4494       args['vcpus'] = self.be_bnew[constants.BE_VCPUS]
4495     if self.do_ip or self.do_bridge or self.mac:
4496       if self.do_ip:
4497         ip = self.ip
4498       else:
4499         ip = self.instance.nics[0].ip
4500       if self.bridge:
4501         bridge = self.bridge
4502       else:
4503         bridge = self.instance.nics[0].bridge
4504       if self.mac:
4505         mac = self.mac
4506       else:
4507         mac = self.instance.nics[0].mac
4508       args['nics'] = [(ip, bridge, mac)]
4509     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4510     nl = [self.cfg.GetMasterNode(),
4511           self.instance.primary_node] + list(self.instance.secondary_nodes)
4512     return env, nl, nl
4513
4514   def CheckPrereq(self):
4515     """Check prerequisites.
4516
4517     This only checks the instance list against the existing names.
4518
4519     """
4520     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4521     # a separate CheckArguments function, if we implement one, so the operation
4522     # can be aborted without waiting for any lock, should it have an error...
4523     self.ip = getattr(self.op, "ip", None)
4524     self.mac = getattr(self.op, "mac", None)
4525     self.bridge = getattr(self.op, "bridge", None)
4526     self.kernel_path = getattr(self.op, "kernel_path", None)
4527     self.initrd_path = getattr(self.op, "initrd_path", None)
4528     self.force = getattr(self.op, "force", None)
4529     all_parms = [self.ip, self.bridge, self.mac]
4530     if (all_parms.count(None) == len(all_parms) and
4531         not self.op.hvparams and
4532         not self.op.beparams):
4533       raise errors.OpPrereqError("No changes submitted")
4534     for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4535       val = self.op.beparams.get(item, None)
4536       if val is not None:
4537         try:
4538           val = int(val)
4539         except ValueError, err:
4540           raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4541         self.op.beparams[item] = val
4542     if self.ip is not None:
4543       self.do_ip = True
4544       if self.ip.lower() == "none":
4545         self.ip = None
4546       else:
4547         if not utils.IsValidIP(self.ip):
4548           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4549     else:
4550       self.do_ip = False
4551     self.do_bridge = (self.bridge is not None)
4552     if self.mac is not None:
4553       if self.cfg.IsMacInUse(self.mac):
4554         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4555                                    self.mac)
4556       if not utils.IsValidMac(self.mac):
4557         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4558
4559     # checking the new params on the primary/secondary nodes
4560
4561     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4562     assert self.instance is not None, \
4563       "Cannot retrieve locked instance %s" % self.op.instance_name
4564     pnode = self.instance.primary_node
4565     nodelist = [pnode]
4566     nodelist.extend(instance.secondary_nodes)
4567
4568     # hvparams processing
4569     if self.op.hvparams:
4570       i_hvdict = copy.deepcopy(instance.hvparams)
4571       for key, val in self.op.hvparams.iteritems():
4572         if val is None:
4573           try:
4574             del i_hvdict[key]
4575           except KeyError:
4576             pass
4577         else:
4578           i_hvdict[key] = val
4579       cluster = self.cfg.GetClusterInfo()
4580       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4581                                 i_hvdict)
4582       # local check
4583       hypervisor.GetHypervisor(
4584         instance.hypervisor).CheckParameterSyntax(hv_new)
4585       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4586       self.hv_new = hv_new # the new actual values
4587       self.hv_inst = i_hvdict # the new dict (without defaults)
4588     else:
4589       self.hv_new = self.hv_inst = {}
4590
4591     # beparams processing
4592     if self.op.beparams:
4593       i_bedict = copy.deepcopy(instance.beparams)
4594       for key, val in self.op.beparams.iteritems():
4595         if val is None:
4596           try:
4597             del i_bedict[key]
4598           except KeyError:
4599             pass
4600         else:
4601           i_bedict[key] = val
4602       cluster = self.cfg.GetClusterInfo()
4603       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4604                                 i_bedict)
4605       self.be_new = be_new # the new actual values
4606       self.be_inst = i_bedict # the new dict (without defaults)
4607     else:
4608       self.hv_new = self.hv_inst = {}
4609
4610     self.warn = []
4611     if constants.BE_MEMORY in self.op.beparams and not self.force:
4612       instance_info = self.rpc.call_instance_info(pnode, instance.name,
4613                                                   instance.hypervisor)
4614       nodeinfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
4615                                          instance.hypervisor)
4616
4617       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4618         # Assume the primary node is unreachable and go ahead
4619         self.warn.append("Can't get info from primary node %s" % pnode)
4620       else:
4621         if instance_info:
4622           current_mem = instance_info['memory']
4623         else:
4624           # Assume instance not running
4625           # (there is a slight race condition here, but it's not very probable,
4626           # and we have no other way to check)
4627           current_mem = 0
4628         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4629                     nodeinfo[pnode]['memory_free'])
4630         if miss_mem > 0:
4631           raise errors.OpPrereqError("This change will prevent the instance"
4632                                      " from starting, due to %d MB of memory"
4633                                      " missing on its primary node" % miss_mem)
4634
4635       for node in instance.secondary_nodes:
4636         if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4637           self.warn.append("Can't get info from secondary node %s" % node)
4638         elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4639           self.warn.append("Not enough memory to failover instance to"
4640                            " secondary node %s" % node)
4641
4642     return
4643
4644   def Exec(self, feedback_fn):
4645     """Modifies an instance.
4646
4647     All parameters take effect only at the next restart of the instance.
4648     """
4649     # Process here the warnings from CheckPrereq, as we don't have a
4650     # feedback_fn there.
4651     for warn in self.warn:
4652       feedback_fn("WARNING: %s" % warn)
4653
4654     result = []
4655     instance = self.instance
4656     if self.do_ip:
4657       instance.nics[0].ip = self.ip
4658       result.append(("ip", self.ip))
4659     if self.bridge:
4660       instance.nics[0].bridge = self.bridge
4661       result.append(("bridge", self.bridge))
4662     if self.mac:
4663       instance.nics[0].mac = self.mac
4664       result.append(("mac", self.mac))
4665     if self.op.hvparams:
4666       instance.hvparams = self.hv_new
4667       for key, val in self.op.hvparams.iteritems():
4668         result.append(("hv/%s" % key, val))
4669     if self.op.beparams:
4670       instance.beparams = self.be_inst
4671       for key, val in self.op.beparams.iteritems():
4672         result.append(("be/%s" % key, val))
4673
4674     self.cfg.Update(instance)
4675
4676     return result
4677
4678
4679 class LUQueryExports(NoHooksLU):
4680   """Query the exports list
4681
4682   """
4683   _OP_REQP = ['nodes']
4684   REQ_BGL = False
4685
4686   def ExpandNames(self):
4687     self.needed_locks = {}
4688     self.share_locks[locking.LEVEL_NODE] = 1
4689     if not self.op.nodes:
4690       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4691     else:
4692       self.needed_locks[locking.LEVEL_NODE] = \
4693         _GetWantedNodes(self, self.op.nodes)
4694
4695   def CheckPrereq(self):
4696     """Check prerequisites.
4697
4698     """
4699     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4700
4701   def Exec(self, feedback_fn):
4702     """Compute the list of all the exported system images.
4703
4704     Returns:
4705       a dictionary with the structure node->(export-list)
4706       where export-list is a list of the instances exported on
4707       that node.
4708
4709     """
4710     return self.rpc.call_export_list(self.nodes)
4711
4712
4713 class LUExportInstance(LogicalUnit):
4714   """Export an instance to an image in the cluster.
4715
4716   """
4717   HPATH = "instance-export"
4718   HTYPE = constants.HTYPE_INSTANCE
4719   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4720   REQ_BGL = False
4721
4722   def ExpandNames(self):
4723     self._ExpandAndLockInstance()
4724     # FIXME: lock only instance primary and destination node
4725     #
4726     # Sad but true, for now we have do lock all nodes, as we don't know where
4727     # the previous export might be, and and in this LU we search for it and
4728     # remove it from its current node. In the future we could fix this by:
4729     #  - making a tasklet to search (share-lock all), then create the new one,
4730     #    then one to remove, after
4731     #  - removing the removal operation altoghether
4732     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4733
4734   def DeclareLocks(self, level):
4735     """Last minute lock declaration."""
4736     # All nodes are locked anyway, so nothing to do here.
4737
4738   def BuildHooksEnv(self):
4739     """Build hooks env.
4740
4741     This will run on the master, primary node and target node.
4742
4743     """
4744     env = {
4745       "EXPORT_NODE": self.op.target_node,
4746       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4747       }
4748     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4749     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4750           self.op.target_node]
4751     return env, nl, nl
4752
4753   def CheckPrereq(self):
4754     """Check prerequisites.
4755
4756     This checks that the instance and node names are valid.
4757
4758     """
4759     instance_name = self.op.instance_name
4760     self.instance = self.cfg.GetInstanceInfo(instance_name)
4761     assert self.instance is not None, \
4762           "Cannot retrieve locked instance %s" % self.op.instance_name
4763
4764     self.dst_node = self.cfg.GetNodeInfo(
4765       self.cfg.ExpandNodeName(self.op.target_node))
4766
4767     assert self.dst_node is not None, \
4768           "Cannot retrieve locked node %s" % self.op.target_node
4769
4770     # instance disk type verification
4771     for disk in self.instance.disks:
4772       if disk.dev_type == constants.LD_FILE:
4773         raise errors.OpPrereqError("Export not supported for instances with"
4774                                    " file-based disks")
4775
4776   def Exec(self, feedback_fn):
4777     """Export an instance to an image in the cluster.
4778
4779     """
4780     instance = self.instance
4781     dst_node = self.dst_node
4782     src_node = instance.primary_node
4783     if self.op.shutdown:
4784       # shutdown the instance, but not the disks
4785       if not self.rpc.call_instance_shutdown(src_node, instance):
4786         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4787                                  (instance.name, src_node))
4788
4789     vgname = self.cfg.GetVGName()
4790
4791     snap_disks = []
4792
4793     try:
4794       for disk in instance.disks:
4795         if disk.iv_name == "sda":
4796           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4797           new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4798
4799           if not new_dev_name:
4800             logger.Error("could not snapshot block device %s on node %s" %
4801                          (disk.logical_id[1], src_node))
4802           else:
4803             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4804                                       logical_id=(vgname, new_dev_name),
4805                                       physical_id=(vgname, new_dev_name),
4806                                       iv_name=disk.iv_name)
4807             snap_disks.append(new_dev)
4808
4809     finally:
4810       if self.op.shutdown and instance.status == "up":
4811         if not self.rpc.call_instance_start(src_node, instance, None):
4812           _ShutdownInstanceDisks(self, instance)
4813           raise errors.OpExecError("Could not start instance")
4814
4815     # TODO: check for size
4816
4817     cluster_name = self.cfg.GetClusterName()
4818     for dev in snap_disks:
4819       if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4820                                       instance, cluster_name):
4821         logger.Error("could not export block device %s from node %s to node %s"
4822                      % (dev.logical_id[1], src_node, dst_node.name))
4823       if not self.rpc.call_blockdev_remove(src_node, dev):
4824         logger.Error("could not remove snapshot block device %s from node %s" %
4825                      (dev.logical_id[1], src_node))
4826
4827     if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4828       logger.Error("could not finalize export for instance %s on node %s" %
4829                    (instance.name, dst_node.name))
4830
4831     nodelist = self.cfg.GetNodeList()
4832     nodelist.remove(dst_node.name)
4833
4834     # on one-node clusters nodelist will be empty after the removal
4835     # if we proceed the backup would be removed because OpQueryExports
4836     # substitutes an empty list with the full cluster node list.
4837     if nodelist:
4838       exportlist = self.rpc.call_export_list(nodelist)
4839       for node in exportlist:
4840         if instance.name in exportlist[node]:
4841           if not self.rpc.call_export_remove(node, instance.name):
4842             logger.Error("could not remove older export for instance %s"
4843                          " on node %s" % (instance.name, node))
4844
4845
4846 class LURemoveExport(NoHooksLU):
4847   """Remove exports related to the named instance.
4848
4849   """
4850   _OP_REQP = ["instance_name"]
4851   REQ_BGL = False
4852
4853   def ExpandNames(self):
4854     self.needed_locks = {}
4855     # We need all nodes to be locked in order for RemoveExport to work, but we
4856     # don't need to lock the instance itself, as nothing will happen to it (and
4857     # we can remove exports also for a removed instance)
4858     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4859
4860   def CheckPrereq(self):
4861     """Check prerequisites.
4862     """
4863     pass
4864
4865   def Exec(self, feedback_fn):
4866     """Remove any export.
4867
4868     """
4869     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4870     # If the instance was not found we'll try with the name that was passed in.
4871     # This will only work if it was an FQDN, though.
4872     fqdn_warn = False
4873     if not instance_name:
4874       fqdn_warn = True
4875       instance_name = self.op.instance_name
4876
4877     exportlist = self.rpc.call_export_list(self.acquired_locks[
4878       locking.LEVEL_NODE])
4879     found = False
4880     for node in exportlist:
4881       if instance_name in exportlist[node]:
4882         found = True
4883         if not self.rpc.call_export_remove(node, instance_name):
4884           logger.Error("could not remove export for instance %s"
4885                        " on node %s" % (instance_name, node))
4886
4887     if fqdn_warn and not found:
4888       feedback_fn("Export not found. If trying to remove an export belonging"
4889                   " to a deleted instance please use its Fully Qualified"
4890                   " Domain Name.")
4891
4892
4893 class TagsLU(NoHooksLU):
4894   """Generic tags LU.
4895
4896   This is an abstract class which is the parent of all the other tags LUs.
4897
4898   """
4899
4900   def ExpandNames(self):
4901     self.needed_locks = {}
4902     if self.op.kind == constants.TAG_NODE:
4903       name = self.cfg.ExpandNodeName(self.op.name)
4904       if name is None:
4905         raise errors.OpPrereqError("Invalid node name (%s)" %
4906                                    (self.op.name,))
4907       self.op.name = name
4908       self.needed_locks[locking.LEVEL_NODE] = name
4909     elif self.op.kind == constants.TAG_INSTANCE:
4910       name = self.cfg.ExpandInstanceName(self.op.name)
4911       if name is None:
4912         raise errors.OpPrereqError("Invalid instance name (%s)" %
4913                                    (self.op.name,))
4914       self.op.name = name
4915       self.needed_locks[locking.LEVEL_INSTANCE] = name
4916
4917   def CheckPrereq(self):
4918     """Check prerequisites.
4919
4920     """
4921     if self.op.kind == constants.TAG_CLUSTER:
4922       self.target = self.cfg.GetClusterInfo()
4923     elif self.op.kind == constants.TAG_NODE:
4924       self.target = self.cfg.GetNodeInfo(self.op.name)
4925     elif self.op.kind == constants.TAG_INSTANCE:
4926       self.target = self.cfg.GetInstanceInfo(self.op.name)
4927     else:
4928       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4929                                  str(self.op.kind))
4930
4931
4932 class LUGetTags(TagsLU):
4933   """Returns the tags of a given object.
4934
4935   """
4936   _OP_REQP = ["kind", "name"]
4937   REQ_BGL = False
4938
4939   def Exec(self, feedback_fn):
4940     """Returns the tag list.
4941
4942     """
4943     return list(self.target.GetTags())
4944
4945
4946 class LUSearchTags(NoHooksLU):
4947   """Searches the tags for a given pattern.
4948
4949   """
4950   _OP_REQP = ["pattern"]
4951   REQ_BGL = False
4952
4953   def ExpandNames(self):
4954     self.needed_locks = {}
4955
4956   def CheckPrereq(self):
4957     """Check prerequisites.
4958
4959     This checks the pattern passed for validity by compiling it.
4960
4961     """
4962     try:
4963       self.re = re.compile(self.op.pattern)
4964     except re.error, err:
4965       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4966                                  (self.op.pattern, err))
4967
4968   def Exec(self, feedback_fn):
4969     """Returns the tag list.
4970
4971     """
4972     cfg = self.cfg
4973     tgts = [("/cluster", cfg.GetClusterInfo())]
4974     ilist = cfg.GetAllInstancesInfo().values()
4975     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4976     nlist = cfg.GetAllNodesInfo().values()
4977     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4978     results = []
4979     for path, target in tgts:
4980       for tag in target.GetTags():
4981         if self.re.search(tag):
4982           results.append((path, tag))
4983     return results
4984
4985
4986 class LUAddTags(TagsLU):
4987   """Sets a tag on a given object.
4988
4989   """
4990   _OP_REQP = ["kind", "name", "tags"]
4991   REQ_BGL = False
4992
4993   def CheckPrereq(self):
4994     """Check prerequisites.
4995
4996     This checks the type and length of the tag name and value.
4997
4998     """
4999     TagsLU.CheckPrereq(self)
5000     for tag in self.op.tags:
5001       objects.TaggableObject.ValidateTag(tag)
5002
5003   def Exec(self, feedback_fn):
5004     """Sets the tag.
5005
5006     """
5007     try:
5008       for tag in self.op.tags:
5009         self.target.AddTag(tag)
5010     except errors.TagError, err:
5011       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5012     try:
5013       self.cfg.Update(self.target)
5014     except errors.ConfigurationError:
5015       raise errors.OpRetryError("There has been a modification to the"
5016                                 " config file and the operation has been"
5017                                 " aborted. Please retry.")
5018
5019
5020 class LUDelTags(TagsLU):
5021   """Delete a list of tags from a given object.
5022
5023   """
5024   _OP_REQP = ["kind", "name", "tags"]
5025   REQ_BGL = False
5026
5027   def CheckPrereq(self):
5028     """Check prerequisites.
5029
5030     This checks that we have the given tag.
5031
5032     """
5033     TagsLU.CheckPrereq(self)
5034     for tag in self.op.tags:
5035       objects.TaggableObject.ValidateTag(tag)
5036     del_tags = frozenset(self.op.tags)
5037     cur_tags = self.target.GetTags()
5038     if not del_tags <= cur_tags:
5039       diff_tags = del_tags - cur_tags
5040       diff_names = ["'%s'" % tag for tag in diff_tags]
5041       diff_names.sort()
5042       raise errors.OpPrereqError("Tag(s) %s not found" %
5043                                  (",".join(diff_names)))
5044
5045   def Exec(self, feedback_fn):
5046     """Remove the tag from the object.
5047
5048     """
5049     for tag in self.op.tags:
5050       self.target.RemoveTag(tag)
5051     try:
5052       self.cfg.Update(self.target)
5053     except errors.ConfigurationError:
5054       raise errors.OpRetryError("There has been a modification to the"
5055                                 " config file and the operation has been"
5056                                 " aborted. Please retry.")
5057
5058
5059 class LUTestDelay(NoHooksLU):
5060   """Sleep for a specified amount of time.
5061
5062   This LU sleeps on the master and/or nodes for a specified amount of
5063   time.
5064
5065   """
5066   _OP_REQP = ["duration", "on_master", "on_nodes"]
5067   REQ_BGL = False
5068
5069   def ExpandNames(self):
5070     """Expand names and set required locks.
5071
5072     This expands the node list, if any.
5073
5074     """
5075     self.needed_locks = {}
5076     if self.op.on_nodes:
5077       # _GetWantedNodes can be used here, but is not always appropriate to use
5078       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5079       # more information.
5080       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5081       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5082
5083   def CheckPrereq(self):
5084     """Check prerequisites.
5085
5086     """
5087
5088   def Exec(self, feedback_fn):
5089     """Do the actual sleep.
5090
5091     """
5092     if self.op.on_master:
5093       if not utils.TestDelay(self.op.duration):
5094         raise errors.OpExecError("Error during master delay test")
5095     if self.op.on_nodes:
5096       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5097       if not result:
5098         raise errors.OpExecError("Complete failure from rpc call")
5099       for node, node_result in result.items():
5100         if not node_result:
5101           raise errors.OpExecError("Failure during rpc call to node %s,"
5102                                    " result: %s" % (node, node_result))
5103
5104
5105 class IAllocator(object):
5106   """IAllocator framework.
5107
5108   An IAllocator instance has three sets of attributes:
5109     - cfg that is needed to query the cluster
5110     - input data (all members of the _KEYS class attribute are required)
5111     - four buffer attributes (in|out_data|text), that represent the
5112       input (to the external script) in text and data structure format,
5113       and the output from it, again in two formats
5114     - the result variables from the script (success, info, nodes) for
5115       easy usage
5116
5117   """
5118   _ALLO_KEYS = [
5119     "mem_size", "disks", "disk_template",
5120     "os", "tags", "nics", "vcpus",
5121     ]
5122   _RELO_KEYS = [
5123     "relocate_from",
5124     ]
5125
5126   def __init__(self, lu, mode, name, **kwargs):
5127     self.lu = lu
5128     # init buffer variables
5129     self.in_text = self.out_text = self.in_data = self.out_data = None
5130     # init all input fields so that pylint is happy
5131     self.mode = mode
5132     self.name = name
5133     self.mem_size = self.disks = self.disk_template = None
5134     self.os = self.tags = self.nics = self.vcpus = None
5135     self.relocate_from = None
5136     # computed fields
5137     self.required_nodes = None
5138     # init result fields
5139     self.success = self.info = self.nodes = None
5140     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5141       keyset = self._ALLO_KEYS
5142     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5143       keyset = self._RELO_KEYS
5144     else:
5145       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5146                                    " IAllocator" % self.mode)
5147     for key in kwargs:
5148       if key not in keyset:
5149         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5150                                      " IAllocator" % key)
5151       setattr(self, key, kwargs[key])
5152     for key in keyset:
5153       if key not in kwargs:
5154         raise errors.ProgrammerError("Missing input parameter '%s' to"
5155                                      " IAllocator" % key)
5156     self._BuildInputData()
5157
5158   def _ComputeClusterData(self):
5159     """Compute the generic allocator input data.
5160
5161     This is the data that is independent of the actual operation.
5162
5163     """
5164     cfg = self.lu.cfg
5165     cluster_info = cfg.GetClusterInfo()
5166     # cluster data
5167     data = {
5168       "version": 1,
5169       "cluster_name": cfg.GetClusterName(),
5170       "cluster_tags": list(cluster_info.GetTags()),
5171       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5172       # we don't have job IDs
5173       }
5174
5175     i_list = []
5176     cluster = self.cfg.GetClusterInfo()
5177     for iname in cfg.GetInstanceList():
5178       i_obj = cfg.GetInstanceInfo(iname)
5179       i_list.append((i_obj, cluster.FillBE(i_obj)))
5180
5181     # node data
5182     node_results = {}
5183     node_list = cfg.GetNodeList()
5184     # FIXME: here we have only one hypervisor information, but
5185     # instance can belong to different hypervisors
5186     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5187                                            cfg.GetHypervisorType())
5188     for nname in node_list:
5189       ninfo = cfg.GetNodeInfo(nname)
5190       if nname not in node_data or not isinstance(node_data[nname], dict):
5191         raise errors.OpExecError("Can't get data for node %s" % nname)
5192       remote_info = node_data[nname]
5193       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5194                    'vg_size', 'vg_free', 'cpu_total']:
5195         if attr not in remote_info:
5196           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5197                                    (nname, attr))
5198         try:
5199           remote_info[attr] = int(remote_info[attr])
5200         except ValueError, err:
5201           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5202                                    " %s" % (nname, attr, str(err)))
5203       # compute memory used by primary instances
5204       i_p_mem = i_p_up_mem = 0
5205       for iinfo, beinfo in i_list:
5206         if iinfo.primary_node == nname:
5207           i_p_mem += beinfo[constants.BE_MEMORY]
5208           if iinfo.status == "up":
5209             i_p_up_mem += beinfo[constants.BE_MEMORY]
5210
5211       # compute memory used by instances
5212       pnr = {
5213         "tags": list(ninfo.GetTags()),
5214         "total_memory": remote_info['memory_total'],
5215         "reserved_memory": remote_info['memory_dom0'],
5216         "free_memory": remote_info['memory_free'],
5217         "i_pri_memory": i_p_mem,
5218         "i_pri_up_memory": i_p_up_mem,
5219         "total_disk": remote_info['vg_size'],
5220         "free_disk": remote_info['vg_free'],
5221         "primary_ip": ninfo.primary_ip,
5222         "secondary_ip": ninfo.secondary_ip,
5223         "total_cpus": remote_info['cpu_total'],
5224         }
5225       node_results[nname] = pnr
5226     data["nodes"] = node_results
5227
5228     # instance data
5229     instance_data = {}
5230     for iinfo, beinfo in i_list:
5231       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5232                   for n in iinfo.nics]
5233       pir = {
5234         "tags": list(iinfo.GetTags()),
5235         "should_run": iinfo.status == "up",
5236         "vcpus": beinfo[constants.BE_VCPUS],
5237         "memory": beinfo[constants.BE_MEMORY],
5238         "os": iinfo.os,
5239         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5240         "nics": nic_data,
5241         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5242         "disk_template": iinfo.disk_template,
5243         "hypervisor": iinfo.hypervisor,
5244         }
5245       instance_data[iinfo.name] = pir
5246
5247     data["instances"] = instance_data
5248
5249     self.in_data = data
5250
5251   def _AddNewInstance(self):
5252     """Add new instance data to allocator structure.
5253
5254     This in combination with _AllocatorGetClusterData will create the
5255     correct structure needed as input for the allocator.
5256
5257     The checks for the completeness of the opcode must have already been
5258     done.
5259
5260     """
5261     data = self.in_data
5262     if len(self.disks) != 2:
5263       raise errors.OpExecError("Only two-disk configurations supported")
5264
5265     disk_space = _ComputeDiskSize(self.disk_template,
5266                                   self.disks[0]["size"], self.disks[1]["size"])
5267
5268     if self.disk_template in constants.DTS_NET_MIRROR:
5269       self.required_nodes = 2
5270     else:
5271       self.required_nodes = 1
5272     request = {
5273       "type": "allocate",
5274       "name": self.name,
5275       "disk_template": self.disk_template,
5276       "tags": self.tags,
5277       "os": self.os,
5278       "vcpus": self.vcpus,
5279       "memory": self.mem_size,
5280       "disks": self.disks,
5281       "disk_space_total": disk_space,
5282       "nics": self.nics,
5283       "required_nodes": self.required_nodes,
5284       }
5285     data["request"] = request
5286
5287   def _AddRelocateInstance(self):
5288     """Add relocate instance data to allocator structure.
5289
5290     This in combination with _IAllocatorGetClusterData will create the
5291     correct structure needed as input for the allocator.
5292
5293     The checks for the completeness of the opcode must have already been
5294     done.
5295
5296     """
5297     instance = self.lu.cfg.GetInstanceInfo(self.name)
5298     if instance is None:
5299       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5300                                    " IAllocator" % self.name)
5301
5302     if instance.disk_template not in constants.DTS_NET_MIRROR:
5303       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5304
5305     if len(instance.secondary_nodes) != 1:
5306       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5307
5308     self.required_nodes = 1
5309
5310     disk_space = _ComputeDiskSize(instance.disk_template,
5311                                   instance.disks[0].size,
5312                                   instance.disks[1].size)
5313
5314     request = {
5315       "type": "relocate",
5316       "name": self.name,
5317       "disk_space_total": disk_space,
5318       "required_nodes": self.required_nodes,
5319       "relocate_from": self.relocate_from,
5320       }
5321     self.in_data["request"] = request
5322
5323   def _BuildInputData(self):
5324     """Build input data structures.
5325
5326     """
5327     self._ComputeClusterData()
5328
5329     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5330       self._AddNewInstance()
5331     else:
5332       self._AddRelocateInstance()
5333
5334     self.in_text = serializer.Dump(self.in_data)
5335
5336   def Run(self, name, validate=True, call_fn=None):
5337     """Run an instance allocator and return the results.
5338
5339     """
5340     if call_fn is None:
5341       call_fn = self.lu.rpc.call_iallocator_runner
5342     data = self.in_text
5343
5344     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5345
5346     if not isinstance(result, (list, tuple)) or len(result) != 4:
5347       raise errors.OpExecError("Invalid result from master iallocator runner")
5348
5349     rcode, stdout, stderr, fail = result
5350
5351     if rcode == constants.IARUN_NOTFOUND:
5352       raise errors.OpExecError("Can't find allocator '%s'" % name)
5353     elif rcode == constants.IARUN_FAILURE:
5354       raise errors.OpExecError("Instance allocator call failed: %s,"
5355                                " output: %s" % (fail, stdout+stderr))
5356     self.out_text = stdout
5357     if validate:
5358       self._ValidateResult()
5359
5360   def _ValidateResult(self):
5361     """Process the allocator results.
5362
5363     This will process and if successful save the result in
5364     self.out_data and the other parameters.
5365
5366     """
5367     try:
5368       rdict = serializer.Load(self.out_text)
5369     except Exception, err:
5370       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5371
5372     if not isinstance(rdict, dict):
5373       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5374
5375     for key in "success", "info", "nodes":
5376       if key not in rdict:
5377         raise errors.OpExecError("Can't parse iallocator results:"
5378                                  " missing key '%s'" % key)
5379       setattr(self, key, rdict[key])
5380
5381     if not isinstance(rdict["nodes"], list):
5382       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5383                                " is not a list")
5384     self.out_data = rdict
5385
5386
5387 class LUTestAllocator(NoHooksLU):
5388   """Run allocator tests.
5389
5390   This LU runs the allocator tests
5391
5392   """
5393   _OP_REQP = ["direction", "mode", "name"]
5394
5395   def CheckPrereq(self):
5396     """Check prerequisites.
5397
5398     This checks the opcode parameters depending on the director and mode test.
5399
5400     """
5401     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5402       for attr in ["name", "mem_size", "disks", "disk_template",
5403                    "os", "tags", "nics", "vcpus"]:
5404         if not hasattr(self.op, attr):
5405           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5406                                      attr)
5407       iname = self.cfg.ExpandInstanceName(self.op.name)
5408       if iname is not None:
5409         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5410                                    iname)
5411       if not isinstance(self.op.nics, list):
5412         raise errors.OpPrereqError("Invalid parameter 'nics'")
5413       for row in self.op.nics:
5414         if (not isinstance(row, dict) or
5415             "mac" not in row or
5416             "ip" not in row or
5417             "bridge" not in row):
5418           raise errors.OpPrereqError("Invalid contents of the"
5419                                      " 'nics' parameter")
5420       if not isinstance(self.op.disks, list):
5421         raise errors.OpPrereqError("Invalid parameter 'disks'")
5422       if len(self.op.disks) != 2:
5423         raise errors.OpPrereqError("Only two-disk configurations supported")
5424       for row in self.op.disks:
5425         if (not isinstance(row, dict) or
5426             "size" not in row or
5427             not isinstance(row["size"], int) or
5428             "mode" not in row or
5429             row["mode"] not in ['r', 'w']):
5430           raise errors.OpPrereqError("Invalid contents of the"
5431                                      " 'disks' parameter")
5432     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5433       if not hasattr(self.op, "name"):
5434         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5435       fname = self.cfg.ExpandInstanceName(self.op.name)
5436       if fname is None:
5437         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5438                                    self.op.name)
5439       self.op.name = fname
5440       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5441     else:
5442       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5443                                  self.op.mode)
5444
5445     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5446       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5447         raise errors.OpPrereqError("Missing allocator name")
5448     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5449       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5450                                  self.op.direction)
5451
5452   def Exec(self, feedback_fn):
5453     """Run the allocator test.
5454
5455     """
5456     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5457       ial = IAllocator(self,
5458                        mode=self.op.mode,
5459                        name=self.op.name,
5460                        mem_size=self.op.mem_size,
5461                        disks=self.op.disks,
5462                        disk_template=self.op.disk_template,
5463                        os=self.op.os,
5464                        tags=self.op.tags,
5465                        nics=self.op.nics,
5466                        vcpus=self.op.vcpus,
5467                        )
5468     else:
5469       ial = IAllocator(self,
5470                        mode=self.op.mode,
5471                        name=self.op.name,
5472                        relocate_from=list(self.relocate_from),
5473                        )
5474
5475     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5476       result = ial.in_text
5477     else:
5478       ial.Run(self.op.allocator, validate=False)
5479       result = ial.out_text
5480     return result