Cleanup os_add/rename rpc for OS API 10
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_MASTER: the LU needs to run on the master node
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_MASTER = True
68   REQ_BGL = True
69
70   def __init__(self, processor, op, context, rpc):
71     """Constructor for LogicalUnit.
72
73     This needs to be overriden in derived classes in order to check op
74     validity.
75
76     """
77     self.proc = processor
78     self.op = op
79     self.cfg = context.cfg
80     self.context = context
81     self.rpc = rpc
82     # Dicts used to declare locking needs to mcpu
83     self.needed_locks = None
84     self.acquired_locks = {}
85     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86     self.add_locks = {}
87     self.remove_locks = {}
88     # Used to force good behavior when calling helper functions
89     self.recalculate_locks = {}
90     self.__ssh = None
91
92     for attr_name in self._OP_REQP:
93       attr_val = getattr(op, attr_name, None)
94       if attr_val is None:
95         raise errors.OpPrereqError("Required parameter '%s' missing" %
96                                    attr_name)
97
98     if not self.cfg.IsCluster():
99       raise errors.OpPrereqError("Cluster not initialized yet,"
100                                  " use 'gnt-cluster init' first.")
101     if self.REQ_MASTER:
102       master = self.cfg.GetMasterNode()
103       if master != utils.HostInfo().name:
104         raise errors.OpPrereqError("Commands must be run on the master"
105                                    " node %s" % master)
106
107   def __GetSSH(self):
108     """Returns the SshRunner object
109
110     """
111     if not self.__ssh:
112       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113     return self.__ssh
114
115   ssh = property(fget=__GetSSH)
116
117   def ExpandNames(self):
118     """Expand names for this LU.
119
120     This method is called before starting to execute the opcode, and it should
121     update all the parameters of the opcode to their canonical form (e.g. a
122     short node name must be fully expanded after this method has successfully
123     completed). This way locking, hooks, logging, ecc. can work correctly.
124
125     LUs which implement this method must also populate the self.needed_locks
126     member, as a dict with lock levels as keys, and a list of needed lock names
127     as values. Rules:
128       - Use an empty dict if you don't need any lock
129       - If you don't need any lock at a particular level omit that level
130       - Don't put anything for the BGL level
131       - If you want all locks at a level use locking.ALL_SET as a value
132
133     If you need to share locks (rather than acquire them exclusively) at one
134     level you can modify self.share_locks, setting a true value (usually 1) for
135     that level. By default locks are not shared.
136
137     Examples:
138     # Acquire all nodes and one instance
139     self.needed_locks = {
140       locking.LEVEL_NODE: locking.ALL_SET,
141       locking.LEVEL_INSTANCE: ['instance1.example.tld'],
142     }
143     # Acquire just two nodes
144     self.needed_locks = {
145       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
146     }
147     # Acquire no locks
148     self.needed_locks = {} # No, you can't leave it to the default value None
149
150     """
151     # The implementation of this method is mandatory only if the new LU is
152     # concurrent, so that old LUs don't need to be changed all at the same
153     # time.
154     if self.REQ_BGL:
155       self.needed_locks = {} # Exclusive LUs don't need locks.
156     else:
157       raise NotImplementedError
158
159   def DeclareLocks(self, level):
160     """Declare LU locking needs for a level
161
162     While most LUs can just declare their locking needs at ExpandNames time,
163     sometimes there's the need to calculate some locks after having acquired
164     the ones before. This function is called just before acquiring locks at a
165     particular level, but after acquiring the ones at lower levels, and permits
166     such calculations. It can be used to modify self.needed_locks, and by
167     default it does nothing.
168
169     This function is only called if you have something already set in
170     self.needed_locks for the level.
171
172     @param level: Locking level which is going to be locked
173     @type level: member of ganeti.locking.LEVELS
174
175     """
176
177   def CheckPrereq(self):
178     """Check prerequisites for this LU.
179
180     This method should check that the prerequisites for the execution
181     of this LU are fulfilled. It can do internode communication, but
182     it should be idempotent - no cluster or system changes are
183     allowed.
184
185     The method should raise errors.OpPrereqError in case something is
186     not fulfilled. Its return value is ignored.
187
188     This method should also update all the parameters of the opcode to
189     their canonical form if it hasn't been done by ExpandNames before.
190
191     """
192     raise NotImplementedError
193
194   def Exec(self, feedback_fn):
195     """Execute the LU.
196
197     This method should implement the actual work. It should raise
198     errors.OpExecError for failures that are somewhat dealt with in
199     code, or expected.
200
201     """
202     raise NotImplementedError
203
204   def BuildHooksEnv(self):
205     """Build hooks environment for this LU.
206
207     This method should return a three-node tuple consisting of: a dict
208     containing the environment that will be used for running the
209     specific hook for this LU, a list of node names on which the hook
210     should run before the execution, and a list of node names on which
211     the hook should run after the execution.
212
213     The keys of the dict must not have 'GANETI_' prefixed as this will
214     be handled in the hooks runner. Also note additional keys will be
215     added by the hooks runner. If the LU doesn't define any
216     environment, an empty dict (and not None) should be returned.
217
218     No nodes should be returned as an empty list (and not None).
219
220     Note that if the HPATH for a LU class is None, this function will
221     not be called.
222
223     """
224     raise NotImplementedError
225
226   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
227     """Notify the LU about the results of its hooks.
228
229     This method is called every time a hooks phase is executed, and notifies
230     the Logical Unit about the hooks' result. The LU can then use it to alter
231     its result based on the hooks.  By default the method does nothing and the
232     previous result is passed back unchanged but any LU can define it if it
233     wants to use the local cluster hook-scripts somehow.
234
235     Args:
236       phase: the hooks phase that has just been run
237       hooks_results: the results of the multi-node hooks rpc call
238       feedback_fn: function to send feedback back to the caller
239       lu_result: the previous result this LU had, or None in the PRE phase.
240
241     """
242     return lu_result
243
244   def _ExpandAndLockInstance(self):
245     """Helper function to expand and lock an instance.
246
247     Many LUs that work on an instance take its name in self.op.instance_name
248     and need to expand it and then declare the expanded name for locking. This
249     function does it, and then updates self.op.instance_name to the expanded
250     name. It also initializes needed_locks as a dict, if this hasn't been done
251     before.
252
253     """
254     if self.needed_locks is None:
255       self.needed_locks = {}
256     else:
257       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
258         "_ExpandAndLockInstance called with instance-level locks set"
259     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
260     if expanded_name is None:
261       raise errors.OpPrereqError("Instance '%s' not known" %
262                                   self.op.instance_name)
263     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
264     self.op.instance_name = expanded_name
265
266   def _LockInstancesNodes(self, primary_only=False):
267     """Helper function to declare instances' nodes for locking.
268
269     This function should be called after locking one or more instances to lock
270     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
271     with all primary or secondary nodes for instances already locked and
272     present in self.needed_locks[locking.LEVEL_INSTANCE].
273
274     It should be called from DeclareLocks, and for safety only works if
275     self.recalculate_locks[locking.LEVEL_NODE] is set.
276
277     In the future it may grow parameters to just lock some instance's nodes, or
278     to just lock primaries or secondary nodes, if needed.
279
280     If should be called in DeclareLocks in a way similar to:
281
282     if level == locking.LEVEL_NODE:
283       self._LockInstancesNodes()
284
285     @type primary_only: boolean
286     @param primary_only: only lock primary nodes of locked instances
287
288     """
289     assert locking.LEVEL_NODE in self.recalculate_locks, \
290       "_LockInstancesNodes helper function called with no nodes to recalculate"
291
292     # TODO: check if we're really been called with the instance locks held
293
294     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
295     # future we might want to have different behaviors depending on the value
296     # of self.recalculate_locks[locking.LEVEL_NODE]
297     wanted_nodes = []
298     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
299       instance = self.context.cfg.GetInstanceInfo(instance_name)
300       wanted_nodes.append(instance.primary_node)
301       if not primary_only:
302         wanted_nodes.extend(instance.secondary_nodes)
303
304     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
305       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
306     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
307       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
308
309     del self.recalculate_locks[locking.LEVEL_NODE]
310
311
312 class NoHooksLU(LogicalUnit):
313   """Simple LU which runs no hooks.
314
315   This LU is intended as a parent for other LogicalUnits which will
316   run no hooks, in order to reduce duplicate code.
317
318   """
319   HPATH = None
320   HTYPE = None
321
322
323 def _GetWantedNodes(lu, nodes):
324   """Returns list of checked and expanded node names.
325
326   Args:
327     nodes: List of nodes (strings) or None for all
328
329   """
330   if not isinstance(nodes, list):
331     raise errors.OpPrereqError("Invalid argument type 'nodes'")
332
333   if not nodes:
334     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
335       " non-empty list of nodes whose name is to be expanded.")
336
337   wanted = []
338   for name in nodes:
339     node = lu.cfg.ExpandNodeName(name)
340     if node is None:
341       raise errors.OpPrereqError("No such node name '%s'" % name)
342     wanted.append(node)
343
344   return utils.NiceSort(wanted)
345
346
347 def _GetWantedInstances(lu, instances):
348   """Returns list of checked and expanded instance names.
349
350   Args:
351     instances: List of instances (strings) or None for all
352
353   """
354   if not isinstance(instances, list):
355     raise errors.OpPrereqError("Invalid argument type 'instances'")
356
357   if instances:
358     wanted = []
359
360     for name in instances:
361       instance = lu.cfg.ExpandInstanceName(name)
362       if instance is None:
363         raise errors.OpPrereqError("No such instance name '%s'" % name)
364       wanted.append(instance)
365
366   else:
367     wanted = lu.cfg.GetInstanceList()
368   return utils.NiceSort(wanted)
369
370
371 def _CheckOutputFields(static, dynamic, selected):
372   """Checks whether all selected fields are valid.
373
374   Args:
375     static: Static fields
376     dynamic: Dynamic fields
377
378   """
379   static_fields = frozenset(static)
380   dynamic_fields = frozenset(dynamic)
381
382   all_fields = static_fields | dynamic_fields
383
384   if not all_fields.issuperset(selected):
385     raise errors.OpPrereqError("Unknown output fields selected: %s"
386                                % ",".join(frozenset(selected).
387                                           difference(all_fields)))
388
389
390 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
391                           memory, vcpus, nics):
392   """Builds instance related env variables for hooks from single variables.
393
394   Args:
395     secondary_nodes: List of secondary nodes as strings
396   """
397   env = {
398     "OP_TARGET": name,
399     "INSTANCE_NAME": name,
400     "INSTANCE_PRIMARY": primary_node,
401     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
402     "INSTANCE_OS_TYPE": os_type,
403     "INSTANCE_STATUS": status,
404     "INSTANCE_MEMORY": memory,
405     "INSTANCE_VCPUS": vcpus,
406   }
407
408   if nics:
409     nic_count = len(nics)
410     for idx, (ip, bridge, mac) in enumerate(nics):
411       if ip is None:
412         ip = ""
413       env["INSTANCE_NIC%d_IP" % idx] = ip
414       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
415       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
416   else:
417     nic_count = 0
418
419   env["INSTANCE_NIC_COUNT"] = nic_count
420
421   return env
422
423
424 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
425   """Builds instance related env variables for hooks from an object.
426
427   Args:
428     instance: objects.Instance object of instance
429     override: dict of values to override
430   """
431   bep = lu.cfg.GetClusterInfo().FillBE(instance)
432   args = {
433     'name': instance.name,
434     'primary_node': instance.primary_node,
435     'secondary_nodes': instance.secondary_nodes,
436     'os_type': instance.os,
437     'status': instance.os,
438     'memory': bep[constants.BE_MEMORY],
439     'vcpus': bep[constants.BE_VCPUS],
440     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
441   }
442   if override:
443     args.update(override)
444   return _BuildInstanceHookEnv(**args)
445
446
447 def _CheckInstanceBridgesExist(lu, instance):
448   """Check that the brigdes needed by an instance exist.
449
450   """
451   # check bridges existance
452   brlist = [nic.bridge for nic in instance.nics]
453   if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
454     raise errors.OpPrereqError("one or more target bridges %s does not"
455                                " exist on destination node '%s'" %
456                                (brlist, instance.primary_node))
457
458
459 class LUDestroyCluster(NoHooksLU):
460   """Logical unit for destroying the cluster.
461
462   """
463   _OP_REQP = []
464
465   def CheckPrereq(self):
466     """Check prerequisites.
467
468     This checks whether the cluster is empty.
469
470     Any errors are signalled by raising errors.OpPrereqError.
471
472     """
473     master = self.cfg.GetMasterNode()
474
475     nodelist = self.cfg.GetNodeList()
476     if len(nodelist) != 1 or nodelist[0] != master:
477       raise errors.OpPrereqError("There are still %d node(s) in"
478                                  " this cluster." % (len(nodelist) - 1))
479     instancelist = self.cfg.GetInstanceList()
480     if instancelist:
481       raise errors.OpPrereqError("There are still %d instance(s) in"
482                                  " this cluster." % len(instancelist))
483
484   def Exec(self, feedback_fn):
485     """Destroys the cluster.
486
487     """
488     master = self.cfg.GetMasterNode()
489     if not self.rpc.call_node_stop_master(master, False):
490       raise errors.OpExecError("Could not disable the master role")
491     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
492     utils.CreateBackup(priv_key)
493     utils.CreateBackup(pub_key)
494     return master
495
496
497 class LUVerifyCluster(LogicalUnit):
498   """Verifies the cluster status.
499
500   """
501   HPATH = "cluster-verify"
502   HTYPE = constants.HTYPE_CLUSTER
503   _OP_REQP = ["skip_checks"]
504   REQ_BGL = False
505
506   def ExpandNames(self):
507     self.needed_locks = {
508       locking.LEVEL_NODE: locking.ALL_SET,
509       locking.LEVEL_INSTANCE: locking.ALL_SET,
510     }
511     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
512
513   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
514                   remote_version, feedback_fn):
515     """Run multiple tests against a node.
516
517     Test list:
518       - compares ganeti version
519       - checks vg existance and size > 20G
520       - checks config file checksum
521       - checks ssh to other nodes
522
523     Args:
524       node: name of the node to check
525       file_list: required list of files
526       local_cksum: dictionary of local files and their checksums
527
528     """
529     # compares ganeti version
530     local_version = constants.PROTOCOL_VERSION
531     if not remote_version:
532       feedback_fn("  - ERROR: connection to %s failed" % (node))
533       return True
534
535     if local_version != remote_version:
536       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
537                       (local_version, node, remote_version))
538       return True
539
540     # checks vg existance and size > 20G
541
542     bad = False
543     if not vglist:
544       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
545                       (node,))
546       bad = True
547     else:
548       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
549                                             constants.MIN_VG_SIZE)
550       if vgstatus:
551         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
552         bad = True
553
554     if not node_result:
555       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
556       return True
557
558     # checks config file checksum
559     # checks ssh to any
560
561     if 'filelist' not in node_result:
562       bad = True
563       feedback_fn("  - ERROR: node hasn't returned file checksum data")
564     else:
565       remote_cksum = node_result['filelist']
566       for file_name in file_list:
567         if file_name not in remote_cksum:
568           bad = True
569           feedback_fn("  - ERROR: file '%s' missing" % file_name)
570         elif remote_cksum[file_name] != local_cksum[file_name]:
571           bad = True
572           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
573
574     if 'nodelist' not in node_result:
575       bad = True
576       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
577     else:
578       if node_result['nodelist']:
579         bad = True
580         for node in node_result['nodelist']:
581           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
582                           (node, node_result['nodelist'][node]))
583     if 'node-net-test' not in node_result:
584       bad = True
585       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
586     else:
587       if node_result['node-net-test']:
588         bad = True
589         nlist = utils.NiceSort(node_result['node-net-test'].keys())
590         for node in nlist:
591           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
592                           (node, node_result['node-net-test'][node]))
593
594     hyp_result = node_result.get('hypervisor', None)
595     if isinstance(hyp_result, dict):
596       for hv_name, hv_result in hyp_result.iteritems():
597         if hv_result is not None:
598           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
599                       (hv_name, hv_result))
600     return bad
601
602   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
603                       node_instance, feedback_fn):
604     """Verify an instance.
605
606     This function checks to see if the required block devices are
607     available on the instance's node.
608
609     """
610     bad = False
611
612     node_current = instanceconfig.primary_node
613
614     node_vol_should = {}
615     instanceconfig.MapLVsByNode(node_vol_should)
616
617     for node in node_vol_should:
618       for volume in node_vol_should[node]:
619         if node not in node_vol_is or volume not in node_vol_is[node]:
620           feedback_fn("  - ERROR: volume %s missing on node %s" %
621                           (volume, node))
622           bad = True
623
624     if not instanceconfig.status == 'down':
625       if (node_current not in node_instance or
626           not instance in node_instance[node_current]):
627         feedback_fn("  - ERROR: instance %s not running on node %s" %
628                         (instance, node_current))
629         bad = True
630
631     for node in node_instance:
632       if (not node == node_current):
633         if instance in node_instance[node]:
634           feedback_fn("  - ERROR: instance %s should not run on node %s" %
635                           (instance, node))
636           bad = True
637
638     return bad
639
640   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
641     """Verify if there are any unknown volumes in the cluster.
642
643     The .os, .swap and backup volumes are ignored. All other volumes are
644     reported as unknown.
645
646     """
647     bad = False
648
649     for node in node_vol_is:
650       for volume in node_vol_is[node]:
651         if node not in node_vol_should or volume not in node_vol_should[node]:
652           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
653                       (volume, node))
654           bad = True
655     return bad
656
657   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
658     """Verify the list of running instances.
659
660     This checks what instances are running but unknown to the cluster.
661
662     """
663     bad = False
664     for node in node_instance:
665       for runninginstance in node_instance[node]:
666         if runninginstance not in instancelist:
667           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
668                           (runninginstance, node))
669           bad = True
670     return bad
671
672   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
673     """Verify N+1 Memory Resilience.
674
675     Check that if one single node dies we can still start all the instances it
676     was primary for.
677
678     """
679     bad = False
680
681     for node, nodeinfo in node_info.iteritems():
682       # This code checks that every node which is now listed as secondary has
683       # enough memory to host all instances it is supposed to should a single
684       # other node in the cluster fail.
685       # FIXME: not ready for failover to an arbitrary node
686       # FIXME: does not support file-backed instances
687       # WARNING: we currently take into account down instances as well as up
688       # ones, considering that even if they're down someone might want to start
689       # them even in the event of a node failure.
690       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
691         needed_mem = 0
692         for instance in instances:
693           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
694           if bep[constants.BE_AUTO_BALANCE]:
695             needed_mem += bep[constants.BE_MEMORY]
696         if nodeinfo['mfree'] < needed_mem:
697           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
698                       " failovers should node %s fail" % (node, prinode))
699           bad = True
700     return bad
701
702   def CheckPrereq(self):
703     """Check prerequisites.
704
705     Transform the list of checks we're going to skip into a set and check that
706     all its members are valid.
707
708     """
709     self.skip_set = frozenset(self.op.skip_checks)
710     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
711       raise errors.OpPrereqError("Invalid checks to be skipped specified")
712
713   def BuildHooksEnv(self):
714     """Build hooks env.
715
716     Cluster-Verify hooks just rone in the post phase and their failure makes
717     the output be logged in the verify output and the verification to fail.
718
719     """
720     all_nodes = self.cfg.GetNodeList()
721     # TODO: populate the environment with useful information for verify hooks
722     env = {}
723     return env, [], all_nodes
724
725   def Exec(self, feedback_fn):
726     """Verify integrity of cluster, performing various test on nodes.
727
728     """
729     bad = False
730     feedback_fn("* Verifying global settings")
731     for msg in self.cfg.VerifyConfig():
732       feedback_fn("  - ERROR: %s" % msg)
733
734     vg_name = self.cfg.GetVGName()
735     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
736     nodelist = utils.NiceSort(self.cfg.GetNodeList())
737     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
738     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
739     i_non_redundant = [] # Non redundant instances
740     i_non_a_balanced = [] # Non auto-balanced instances
741     node_volume = {}
742     node_instance = {}
743     node_info = {}
744     instance_cfg = {}
745
746     # FIXME: verify OS list
747     # do local checksums
748     file_names = []
749     file_names.append(constants.SSL_CERT_FILE)
750     file_names.append(constants.CLUSTER_CONF_FILE)
751     local_checksums = utils.FingerprintFiles(file_names)
752
753     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
754     all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
755     all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
756     all_vglist = self.rpc.call_vg_list(nodelist)
757     node_verify_param = {
758       'filelist': file_names,
759       'nodelist': nodelist,
760       'hypervisor': hypervisors,
761       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
762                         for node in nodeinfo]
763       }
764     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
765                                            self.cfg.GetClusterName())
766     all_rversion = self.rpc.call_version(nodelist)
767     all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
768                                         self.cfg.GetHypervisorType())
769
770     cluster = self.cfg.GetClusterInfo()
771     for node in nodelist:
772       feedback_fn("* Verifying node %s" % node)
773       result = self._VerifyNode(node, file_names, local_checksums,
774                                 all_vglist[node], all_nvinfo[node],
775                                 all_rversion[node], feedback_fn)
776       bad = bad or result
777
778       # node_volume
779       volumeinfo = all_volumeinfo[node]
780
781       if isinstance(volumeinfo, basestring):
782         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
783                     (node, volumeinfo[-400:].encode('string_escape')))
784         bad = True
785         node_volume[node] = {}
786       elif not isinstance(volumeinfo, dict):
787         feedback_fn("  - ERROR: connection to %s failed" % (node,))
788         bad = True
789         continue
790       else:
791         node_volume[node] = volumeinfo
792
793       # node_instance
794       nodeinstance = all_instanceinfo[node]
795       if type(nodeinstance) != list:
796         feedback_fn("  - ERROR: connection to %s failed" % (node,))
797         bad = True
798         continue
799
800       node_instance[node] = nodeinstance
801
802       # node_info
803       nodeinfo = all_ninfo[node]
804       if not isinstance(nodeinfo, dict):
805         feedback_fn("  - ERROR: connection to %s failed" % (node,))
806         bad = True
807         continue
808
809       try:
810         node_info[node] = {
811           "mfree": int(nodeinfo['memory_free']),
812           "dfree": int(nodeinfo['vg_free']),
813           "pinst": [],
814           "sinst": [],
815           # dictionary holding all instances this node is secondary for,
816           # grouped by their primary node. Each key is a cluster node, and each
817           # value is a list of instances which have the key as primary and the
818           # current node as secondary.  this is handy to calculate N+1 memory
819           # availability if you can only failover from a primary to its
820           # secondary.
821           "sinst-by-pnode": {},
822         }
823       except ValueError:
824         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
825         bad = True
826         continue
827
828     node_vol_should = {}
829
830     for instance in instancelist:
831       feedback_fn("* Verifying instance %s" % instance)
832       inst_config = self.cfg.GetInstanceInfo(instance)
833       result =  self._VerifyInstance(instance, inst_config, node_volume,
834                                      node_instance, feedback_fn)
835       bad = bad or result
836
837       inst_config.MapLVsByNode(node_vol_should)
838
839       instance_cfg[instance] = inst_config
840
841       pnode = inst_config.primary_node
842       if pnode in node_info:
843         node_info[pnode]['pinst'].append(instance)
844       else:
845         feedback_fn("  - ERROR: instance %s, connection to primary node"
846                     " %s failed" % (instance, pnode))
847         bad = True
848
849       # If the instance is non-redundant we cannot survive losing its primary
850       # node, so we are not N+1 compliant. On the other hand we have no disk
851       # templates with more than one secondary so that situation is not well
852       # supported either.
853       # FIXME: does not support file-backed instances
854       if len(inst_config.secondary_nodes) == 0:
855         i_non_redundant.append(instance)
856       elif len(inst_config.secondary_nodes) > 1:
857         feedback_fn("  - WARNING: multiple secondaries for instance %s"
858                     % instance)
859
860       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
861         i_non_a_balanced.append(instance)
862
863       for snode in inst_config.secondary_nodes:
864         if snode in node_info:
865           node_info[snode]['sinst'].append(instance)
866           if pnode not in node_info[snode]['sinst-by-pnode']:
867             node_info[snode]['sinst-by-pnode'][pnode] = []
868           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
869         else:
870           feedback_fn("  - ERROR: instance %s, connection to secondary node"
871                       " %s failed" % (instance, snode))
872
873     feedback_fn("* Verifying orphan volumes")
874     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
875                                        feedback_fn)
876     bad = bad or result
877
878     feedback_fn("* Verifying remaining instances")
879     result = self._VerifyOrphanInstances(instancelist, node_instance,
880                                          feedback_fn)
881     bad = bad or result
882
883     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
884       feedback_fn("* Verifying N+1 Memory redundancy")
885       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
886       bad = bad or result
887
888     feedback_fn("* Other Notes")
889     if i_non_redundant:
890       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
891                   % len(i_non_redundant))
892
893     if i_non_a_balanced:
894       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
895                   % len(i_non_a_balanced))
896
897     return not bad
898
899   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
900     """Analize the post-hooks' result, handle it, and send some
901     nicely-formatted feedback back to the user.
902
903     Args:
904       phase: the hooks phase that has just been run
905       hooks_results: the results of the multi-node hooks rpc call
906       feedback_fn: function to send feedback back to the caller
907       lu_result: previous Exec result
908
909     """
910     # We only really run POST phase hooks, and are only interested in
911     # their results
912     if phase == constants.HOOKS_PHASE_POST:
913       # Used to change hooks' output to proper indentation
914       indent_re = re.compile('^', re.M)
915       feedback_fn("* Hooks Results")
916       if not hooks_results:
917         feedback_fn("  - ERROR: general communication failure")
918         lu_result = 1
919       else:
920         for node_name in hooks_results:
921           show_node_header = True
922           res = hooks_results[node_name]
923           if res is False or not isinstance(res, list):
924             feedback_fn("    Communication failure")
925             lu_result = 1
926             continue
927           for script, hkr, output in res:
928             if hkr == constants.HKR_FAIL:
929               # The node header is only shown once, if there are
930               # failing hooks on that node
931               if show_node_header:
932                 feedback_fn("  Node %s:" % node_name)
933                 show_node_header = False
934               feedback_fn("    ERROR: Script %s failed, output:" % script)
935               output = indent_re.sub('      ', output)
936               feedback_fn("%s" % output)
937               lu_result = 1
938
939       return lu_result
940
941
942 class LUVerifyDisks(NoHooksLU):
943   """Verifies the cluster disks status.
944
945   """
946   _OP_REQP = []
947   REQ_BGL = False
948
949   def ExpandNames(self):
950     self.needed_locks = {
951       locking.LEVEL_NODE: locking.ALL_SET,
952       locking.LEVEL_INSTANCE: locking.ALL_SET,
953     }
954     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
955
956   def CheckPrereq(self):
957     """Check prerequisites.
958
959     This has no prerequisites.
960
961     """
962     pass
963
964   def Exec(self, feedback_fn):
965     """Verify integrity of cluster disks.
966
967     """
968     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
969
970     vg_name = self.cfg.GetVGName()
971     nodes = utils.NiceSort(self.cfg.GetNodeList())
972     instances = [self.cfg.GetInstanceInfo(name)
973                  for name in self.cfg.GetInstanceList()]
974
975     nv_dict = {}
976     for inst in instances:
977       inst_lvs = {}
978       if (inst.status != "up" or
979           inst.disk_template not in constants.DTS_NET_MIRROR):
980         continue
981       inst.MapLVsByNode(inst_lvs)
982       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
983       for node, vol_list in inst_lvs.iteritems():
984         for vol in vol_list:
985           nv_dict[(node, vol)] = inst
986
987     if not nv_dict:
988       return result
989
990     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
991
992     to_act = set()
993     for node in nodes:
994       # node_volume
995       lvs = node_lvs[node]
996
997       if isinstance(lvs, basestring):
998         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
999         res_nlvm[node] = lvs
1000       elif not isinstance(lvs, dict):
1001         logger.Info("connection to node %s failed or invalid data returned" %
1002                     (node,))
1003         res_nodes.append(node)
1004         continue
1005
1006       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1007         inst = nv_dict.pop((node, lv_name), None)
1008         if (not lv_online and inst is not None
1009             and inst.name not in res_instances):
1010           res_instances.append(inst.name)
1011
1012     # any leftover items in nv_dict are missing LVs, let's arrange the
1013     # data better
1014     for key, inst in nv_dict.iteritems():
1015       if inst.name not in res_missing:
1016         res_missing[inst.name] = []
1017       res_missing[inst.name].append(key)
1018
1019     return result
1020
1021
1022 class LURenameCluster(LogicalUnit):
1023   """Rename the cluster.
1024
1025   """
1026   HPATH = "cluster-rename"
1027   HTYPE = constants.HTYPE_CLUSTER
1028   _OP_REQP = ["name"]
1029
1030   def BuildHooksEnv(self):
1031     """Build hooks env.
1032
1033     """
1034     env = {
1035       "OP_TARGET": self.cfg.GetClusterName(),
1036       "NEW_NAME": self.op.name,
1037       }
1038     mn = self.cfg.GetMasterNode()
1039     return env, [mn], [mn]
1040
1041   def CheckPrereq(self):
1042     """Verify that the passed name is a valid one.
1043
1044     """
1045     hostname = utils.HostInfo(self.op.name)
1046
1047     new_name = hostname.name
1048     self.ip = new_ip = hostname.ip
1049     old_name = self.cfg.GetClusterName()
1050     old_ip = self.cfg.GetMasterIP()
1051     if new_name == old_name and new_ip == old_ip:
1052       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1053                                  " cluster has changed")
1054     if new_ip != old_ip:
1055       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1056         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1057                                    " reachable on the network. Aborting." %
1058                                    new_ip)
1059
1060     self.op.name = new_name
1061
1062   def Exec(self, feedback_fn):
1063     """Rename the cluster.
1064
1065     """
1066     clustername = self.op.name
1067     ip = self.ip
1068
1069     # shutdown the master IP
1070     master = self.cfg.GetMasterNode()
1071     if not self.rpc.call_node_stop_master(master, False):
1072       raise errors.OpExecError("Could not disable the master role")
1073
1074     try:
1075       # modify the sstore
1076       # TODO: sstore
1077       ss.SetKey(ss.SS_MASTER_IP, ip)
1078       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1079
1080       # Distribute updated ss config to all nodes
1081       myself = self.cfg.GetNodeInfo(master)
1082       dist_nodes = self.cfg.GetNodeList()
1083       if myself.name in dist_nodes:
1084         dist_nodes.remove(myself.name)
1085
1086       logger.Debug("Copying updated ssconf data to all nodes")
1087       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1088         fname = ss.KeyToFilename(keyname)
1089         result = self.rpc.call_upload_file(dist_nodes, fname)
1090         for to_node in dist_nodes:
1091           if not result[to_node]:
1092             logger.Error("copy of file %s to node %s failed" %
1093                          (fname, to_node))
1094     finally:
1095       if not self.rpc.call_node_start_master(master, False):
1096         logger.Error("Could not re-enable the master role on the master,"
1097                      " please restart manually.")
1098
1099
1100 def _RecursiveCheckIfLVMBased(disk):
1101   """Check if the given disk or its children are lvm-based.
1102
1103   Args:
1104     disk: ganeti.objects.Disk object
1105
1106   Returns:
1107     boolean indicating whether a LD_LV dev_type was found or not
1108
1109   """
1110   if disk.children:
1111     for chdisk in disk.children:
1112       if _RecursiveCheckIfLVMBased(chdisk):
1113         return True
1114   return disk.dev_type == constants.LD_LV
1115
1116
1117 class LUSetClusterParams(LogicalUnit):
1118   """Change the parameters of the cluster.
1119
1120   """
1121   HPATH = "cluster-modify"
1122   HTYPE = constants.HTYPE_CLUSTER
1123   _OP_REQP = []
1124   REQ_BGL = False
1125
1126   def ExpandNames(self):
1127     # FIXME: in the future maybe other cluster params won't require checking on
1128     # all nodes to be modified.
1129     self.needed_locks = {
1130       locking.LEVEL_NODE: locking.ALL_SET,
1131     }
1132     self.share_locks[locking.LEVEL_NODE] = 1
1133
1134   def BuildHooksEnv(self):
1135     """Build hooks env.
1136
1137     """
1138     env = {
1139       "OP_TARGET": self.cfg.GetClusterName(),
1140       "NEW_VG_NAME": self.op.vg_name,
1141       }
1142     mn = self.cfg.GetMasterNode()
1143     return env, [mn], [mn]
1144
1145   def CheckPrereq(self):
1146     """Check prerequisites.
1147
1148     This checks whether the given params don't conflict and
1149     if the given volume group is valid.
1150
1151     """
1152     # FIXME: This only works because there is only one parameter that can be
1153     # changed or removed.
1154     if self.op.vg_name is not None and not self.op.vg_name:
1155       instances = self.cfg.GetAllInstancesInfo().values()
1156       for inst in instances:
1157         for disk in inst.disks:
1158           if _RecursiveCheckIfLVMBased(disk):
1159             raise errors.OpPrereqError("Cannot disable lvm storage while"
1160                                        " lvm-based instances exist")
1161
1162     node_list = self.acquired_locks[locking.LEVEL_NODE]
1163
1164     # if vg_name not None, checks given volume group on all nodes
1165     if self.op.vg_name:
1166       vglist = self.rpc.call_vg_list(node_list)
1167       for node in node_list:
1168         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1169                                               constants.MIN_VG_SIZE)
1170         if vgstatus:
1171           raise errors.OpPrereqError("Error on node '%s': %s" %
1172                                      (node, vgstatus))
1173
1174     self.cluster = cluster = self.cfg.GetClusterInfo()
1175     # beparams changes do not need validation (we can't validate?),
1176     # but we still process here
1177     if self.op.beparams:
1178       self.new_beparams = cluster.FillDict(
1179         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1180
1181     # hypervisor list/parameters
1182     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1183     if self.op.hvparams:
1184       if not isinstance(self.op.hvparams, dict):
1185         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1186       for hv_name, hv_dict in self.op.hvparams.items():
1187         if hv_name not in self.new_hvparams:
1188           self.new_hvparams[hv_name] = hv_dict
1189         else:
1190           self.new_hvparams[hv_name].update(hv_dict)
1191
1192     if self.op.enabled_hypervisors is not None:
1193       self.hv_list = self.op.enabled_hypervisors
1194     else:
1195       self.hv_list = cluster.enabled_hypervisors
1196
1197     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1198       # either the enabled list has changed, or the parameters have, validate
1199       for hv_name, hv_params in self.new_hvparams.items():
1200         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1201             (self.op.enabled_hypervisors and
1202              hv_name in self.op.enabled_hypervisors)):
1203           # either this is a new hypervisor, or its parameters have changed
1204           hv_class = hypervisor.GetHypervisor(hv_name)
1205           hv_class.CheckParameterSyntax(hv_params)
1206           _CheckHVParams(self, node_list, hv_name, hv_params)
1207
1208   def Exec(self, feedback_fn):
1209     """Change the parameters of the cluster.
1210
1211     """
1212     if self.op.vg_name is not None:
1213       if self.op.vg_name != self.cfg.GetVGName():
1214         self.cfg.SetVGName(self.op.vg_name)
1215       else:
1216         feedback_fn("Cluster LVM configuration already in desired"
1217                     " state, not changing")
1218     if self.op.hvparams:
1219       self.cluster.hvparams = self.new_hvparams
1220     if self.op.enabled_hypervisors is not None:
1221       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1222     if self.op.beparams:
1223       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1224     self.cfg.Update(self.cluster)
1225
1226
1227 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1228   """Sleep and poll for an instance's disk to sync.
1229
1230   """
1231   if not instance.disks:
1232     return True
1233
1234   if not oneshot:
1235     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1236
1237   node = instance.primary_node
1238
1239   for dev in instance.disks:
1240     lu.cfg.SetDiskID(dev, node)
1241
1242   retries = 0
1243   while True:
1244     max_time = 0
1245     done = True
1246     cumul_degraded = False
1247     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1248     if not rstats:
1249       lu.proc.LogWarning("Can't get any data from node %s" % node)
1250       retries += 1
1251       if retries >= 10:
1252         raise errors.RemoteError("Can't contact node %s for mirror data,"
1253                                  " aborting." % node)
1254       time.sleep(6)
1255       continue
1256     retries = 0
1257     for i in range(len(rstats)):
1258       mstat = rstats[i]
1259       if mstat is None:
1260         lu.proc.LogWarning("Can't compute data for node %s/%s" %
1261                            (node, instance.disks[i].iv_name))
1262         continue
1263       # we ignore the ldisk parameter
1264       perc_done, est_time, is_degraded, _ = mstat
1265       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1266       if perc_done is not None:
1267         done = False
1268         if est_time is not None:
1269           rem_time = "%d estimated seconds remaining" % est_time
1270           max_time = est_time
1271         else:
1272           rem_time = "no time estimate"
1273         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1274                         (instance.disks[i].iv_name, perc_done, rem_time))
1275     if done or oneshot:
1276       break
1277
1278     time.sleep(min(60, max_time))
1279
1280   if done:
1281     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1282   return not cumul_degraded
1283
1284
1285 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1286   """Check that mirrors are not degraded.
1287
1288   The ldisk parameter, if True, will change the test from the
1289   is_degraded attribute (which represents overall non-ok status for
1290   the device(s)) to the ldisk (representing the local storage status).
1291
1292   """
1293   lu.cfg.SetDiskID(dev, node)
1294   if ldisk:
1295     idx = 6
1296   else:
1297     idx = 5
1298
1299   result = True
1300   if on_primary or dev.AssembleOnSecondary():
1301     rstats = lu.rpc.call_blockdev_find(node, dev)
1302     if not rstats:
1303       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1304       result = False
1305     else:
1306       result = result and (not rstats[idx])
1307   if dev.children:
1308     for child in dev.children:
1309       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1310
1311   return result
1312
1313
1314 class LUDiagnoseOS(NoHooksLU):
1315   """Logical unit for OS diagnose/query.
1316
1317   """
1318   _OP_REQP = ["output_fields", "names"]
1319   REQ_BGL = False
1320
1321   def ExpandNames(self):
1322     if self.op.names:
1323       raise errors.OpPrereqError("Selective OS query not supported")
1324
1325     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1326     _CheckOutputFields(static=[],
1327                        dynamic=self.dynamic_fields,
1328                        selected=self.op.output_fields)
1329
1330     # Lock all nodes, in shared mode
1331     self.needed_locks = {}
1332     self.share_locks[locking.LEVEL_NODE] = 1
1333     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1334
1335   def CheckPrereq(self):
1336     """Check prerequisites.
1337
1338     """
1339
1340   @staticmethod
1341   def _DiagnoseByOS(node_list, rlist):
1342     """Remaps a per-node return list into an a per-os per-node dictionary
1343
1344       Args:
1345         node_list: a list with the names of all nodes
1346         rlist: a map with node names as keys and OS objects as values
1347
1348       Returns:
1349         map: a map with osnames as keys and as value another map, with
1350              nodes as
1351              keys and list of OS objects as values
1352              e.g. {"debian-etch": {"node1": [<object>,...],
1353                                    "node2": [<object>,]}
1354                   }
1355
1356     """
1357     all_os = {}
1358     for node_name, nr in rlist.iteritems():
1359       if not nr:
1360         continue
1361       for os_obj in nr:
1362         if os_obj.name not in all_os:
1363           # build a list of nodes for this os containing empty lists
1364           # for each node in node_list
1365           all_os[os_obj.name] = {}
1366           for nname in node_list:
1367             all_os[os_obj.name][nname] = []
1368         all_os[os_obj.name][node_name].append(os_obj)
1369     return all_os
1370
1371   def Exec(self, feedback_fn):
1372     """Compute the list of OSes.
1373
1374     """
1375     node_list = self.acquired_locks[locking.LEVEL_NODE]
1376     node_data = self.rpc.call_os_diagnose(node_list)
1377     if node_data == False:
1378       raise errors.OpExecError("Can't gather the list of OSes")
1379     pol = self._DiagnoseByOS(node_list, node_data)
1380     output = []
1381     for os_name, os_data in pol.iteritems():
1382       row = []
1383       for field in self.op.output_fields:
1384         if field == "name":
1385           val = os_name
1386         elif field == "valid":
1387           val = utils.all([osl and osl[0] for osl in os_data.values()])
1388         elif field == "node_status":
1389           val = {}
1390           for node_name, nos_list in os_data.iteritems():
1391             val[node_name] = [(v.status, v.path) for v in nos_list]
1392         else:
1393           raise errors.ParameterError(field)
1394         row.append(val)
1395       output.append(row)
1396
1397     return output
1398
1399
1400 class LURemoveNode(LogicalUnit):
1401   """Logical unit for removing a node.
1402
1403   """
1404   HPATH = "node-remove"
1405   HTYPE = constants.HTYPE_NODE
1406   _OP_REQP = ["node_name"]
1407
1408   def BuildHooksEnv(self):
1409     """Build hooks env.
1410
1411     This doesn't run on the target node in the pre phase as a failed
1412     node would then be impossible to remove.
1413
1414     """
1415     env = {
1416       "OP_TARGET": self.op.node_name,
1417       "NODE_NAME": self.op.node_name,
1418       }
1419     all_nodes = self.cfg.GetNodeList()
1420     all_nodes.remove(self.op.node_name)
1421     return env, all_nodes, all_nodes
1422
1423   def CheckPrereq(self):
1424     """Check prerequisites.
1425
1426     This checks:
1427      - the node exists in the configuration
1428      - it does not have primary or secondary instances
1429      - it's not the master
1430
1431     Any errors are signalled by raising errors.OpPrereqError.
1432
1433     """
1434     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1435     if node is None:
1436       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1437
1438     instance_list = self.cfg.GetInstanceList()
1439
1440     masternode = self.cfg.GetMasterNode()
1441     if node.name == masternode:
1442       raise errors.OpPrereqError("Node is the master node,"
1443                                  " you need to failover first.")
1444
1445     for instance_name in instance_list:
1446       instance = self.cfg.GetInstanceInfo(instance_name)
1447       if node.name == instance.primary_node:
1448         raise errors.OpPrereqError("Instance %s still running on the node,"
1449                                    " please remove first." % instance_name)
1450       if node.name in instance.secondary_nodes:
1451         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1452                                    " please remove first." % instance_name)
1453     self.op.node_name = node.name
1454     self.node = node
1455
1456   def Exec(self, feedback_fn):
1457     """Removes the node from the cluster.
1458
1459     """
1460     node = self.node
1461     logger.Info("stopping the node daemon and removing configs from node %s" %
1462                 node.name)
1463
1464     self.context.RemoveNode(node.name)
1465
1466     self.rpc.call_node_leave_cluster(node.name)
1467
1468
1469 class LUQueryNodes(NoHooksLU):
1470   """Logical unit for querying nodes.
1471
1472   """
1473   _OP_REQP = ["output_fields", "names"]
1474   REQ_BGL = False
1475
1476   def ExpandNames(self):
1477     self.dynamic_fields = frozenset([
1478       "dtotal", "dfree",
1479       "mtotal", "mnode", "mfree",
1480       "bootid",
1481       "ctotal",
1482       ])
1483
1484     self.static_fields = frozenset([
1485       "name", "pinst_cnt", "sinst_cnt",
1486       "pinst_list", "sinst_list",
1487       "pip", "sip", "tags",
1488       "serial_no",
1489       ])
1490
1491     _CheckOutputFields(static=self.static_fields,
1492                        dynamic=self.dynamic_fields,
1493                        selected=self.op.output_fields)
1494
1495     self.needed_locks = {}
1496     self.share_locks[locking.LEVEL_NODE] = 1
1497
1498     if self.op.names:
1499       self.wanted = _GetWantedNodes(self, self.op.names)
1500     else:
1501       self.wanted = locking.ALL_SET
1502
1503     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1504     if self.do_locking:
1505       # if we don't request only static fields, we need to lock the nodes
1506       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1507
1508
1509   def CheckPrereq(self):
1510     """Check prerequisites.
1511
1512     """
1513     # The validation of the node list is done in the _GetWantedNodes,
1514     # if non empty, and if empty, there's no validation to do
1515     pass
1516
1517   def Exec(self, feedback_fn):
1518     """Computes the list of nodes and their attributes.
1519
1520     """
1521     all_info = self.cfg.GetAllNodesInfo()
1522     if self.do_locking:
1523       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1524     elif self.wanted != locking.ALL_SET:
1525       nodenames = self.wanted
1526       missing = set(nodenames).difference(all_info.keys())
1527       if missing:
1528         raise errors.OpExecError(
1529           "Some nodes were removed before retrieving their data: %s" % missing)
1530     else:
1531       nodenames = all_info.keys()
1532
1533     nodenames = utils.NiceSort(nodenames)
1534     nodelist = [all_info[name] for name in nodenames]
1535
1536     # begin data gathering
1537
1538     if self.dynamic_fields.intersection(self.op.output_fields):
1539       live_data = {}
1540       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1541                                           self.cfg.GetHypervisorType())
1542       for name in nodenames:
1543         nodeinfo = node_data.get(name, None)
1544         if nodeinfo:
1545           live_data[name] = {
1546             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1547             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1548             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1549             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1550             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1551             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1552             "bootid": nodeinfo['bootid'],
1553             }
1554         else:
1555           live_data[name] = {}
1556     else:
1557       live_data = dict.fromkeys(nodenames, {})
1558
1559     node_to_primary = dict([(name, set()) for name in nodenames])
1560     node_to_secondary = dict([(name, set()) for name in nodenames])
1561
1562     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1563                              "sinst_cnt", "sinst_list"))
1564     if inst_fields & frozenset(self.op.output_fields):
1565       instancelist = self.cfg.GetInstanceList()
1566
1567       for instance_name in instancelist:
1568         inst = self.cfg.GetInstanceInfo(instance_name)
1569         if inst.primary_node in node_to_primary:
1570           node_to_primary[inst.primary_node].add(inst.name)
1571         for secnode in inst.secondary_nodes:
1572           if secnode in node_to_secondary:
1573             node_to_secondary[secnode].add(inst.name)
1574
1575     # end data gathering
1576
1577     output = []
1578     for node in nodelist:
1579       node_output = []
1580       for field in self.op.output_fields:
1581         if field == "name":
1582           val = node.name
1583         elif field == "pinst_list":
1584           val = list(node_to_primary[node.name])
1585         elif field == "sinst_list":
1586           val = list(node_to_secondary[node.name])
1587         elif field == "pinst_cnt":
1588           val = len(node_to_primary[node.name])
1589         elif field == "sinst_cnt":
1590           val = len(node_to_secondary[node.name])
1591         elif field == "pip":
1592           val = node.primary_ip
1593         elif field == "sip":
1594           val = node.secondary_ip
1595         elif field == "tags":
1596           val = list(node.GetTags())
1597         elif field == "serial_no":
1598           val = node.serial_no
1599         elif field in self.dynamic_fields:
1600           val = live_data[node.name].get(field, None)
1601         else:
1602           raise errors.ParameterError(field)
1603         node_output.append(val)
1604       output.append(node_output)
1605
1606     return output
1607
1608
1609 class LUQueryNodeVolumes(NoHooksLU):
1610   """Logical unit for getting volumes on node(s).
1611
1612   """
1613   _OP_REQP = ["nodes", "output_fields"]
1614   REQ_BGL = False
1615
1616   def ExpandNames(self):
1617     _CheckOutputFields(static=["node"],
1618                        dynamic=["phys", "vg", "name", "size", "instance"],
1619                        selected=self.op.output_fields)
1620
1621     self.needed_locks = {}
1622     self.share_locks[locking.LEVEL_NODE] = 1
1623     if not self.op.nodes:
1624       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1625     else:
1626       self.needed_locks[locking.LEVEL_NODE] = \
1627         _GetWantedNodes(self, self.op.nodes)
1628
1629   def CheckPrereq(self):
1630     """Check prerequisites.
1631
1632     This checks that the fields required are valid output fields.
1633
1634     """
1635     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1636
1637   def Exec(self, feedback_fn):
1638     """Computes the list of nodes and their attributes.
1639
1640     """
1641     nodenames = self.nodes
1642     volumes = self.rpc.call_node_volumes(nodenames)
1643
1644     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1645              in self.cfg.GetInstanceList()]
1646
1647     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1648
1649     output = []
1650     for node in nodenames:
1651       if node not in volumes or not volumes[node]:
1652         continue
1653
1654       node_vols = volumes[node][:]
1655       node_vols.sort(key=lambda vol: vol['dev'])
1656
1657       for vol in node_vols:
1658         node_output = []
1659         for field in self.op.output_fields:
1660           if field == "node":
1661             val = node
1662           elif field == "phys":
1663             val = vol['dev']
1664           elif field == "vg":
1665             val = vol['vg']
1666           elif field == "name":
1667             val = vol['name']
1668           elif field == "size":
1669             val = int(float(vol['size']))
1670           elif field == "instance":
1671             for inst in ilist:
1672               if node not in lv_by_node[inst]:
1673                 continue
1674               if vol['name'] in lv_by_node[inst][node]:
1675                 val = inst.name
1676                 break
1677             else:
1678               val = '-'
1679           else:
1680             raise errors.ParameterError(field)
1681           node_output.append(str(val))
1682
1683         output.append(node_output)
1684
1685     return output
1686
1687
1688 class LUAddNode(LogicalUnit):
1689   """Logical unit for adding node to the cluster.
1690
1691   """
1692   HPATH = "node-add"
1693   HTYPE = constants.HTYPE_NODE
1694   _OP_REQP = ["node_name"]
1695
1696   def BuildHooksEnv(self):
1697     """Build hooks env.
1698
1699     This will run on all nodes before, and on all nodes + the new node after.
1700
1701     """
1702     env = {
1703       "OP_TARGET": self.op.node_name,
1704       "NODE_NAME": self.op.node_name,
1705       "NODE_PIP": self.op.primary_ip,
1706       "NODE_SIP": self.op.secondary_ip,
1707       }
1708     nodes_0 = self.cfg.GetNodeList()
1709     nodes_1 = nodes_0 + [self.op.node_name, ]
1710     return env, nodes_0, nodes_1
1711
1712   def CheckPrereq(self):
1713     """Check prerequisites.
1714
1715     This checks:
1716      - the new node is not already in the config
1717      - it is resolvable
1718      - its parameters (single/dual homed) matches the cluster
1719
1720     Any errors are signalled by raising errors.OpPrereqError.
1721
1722     """
1723     node_name = self.op.node_name
1724     cfg = self.cfg
1725
1726     dns_data = utils.HostInfo(node_name)
1727
1728     node = dns_data.name
1729     primary_ip = self.op.primary_ip = dns_data.ip
1730     secondary_ip = getattr(self.op, "secondary_ip", None)
1731     if secondary_ip is None:
1732       secondary_ip = primary_ip
1733     if not utils.IsValidIP(secondary_ip):
1734       raise errors.OpPrereqError("Invalid secondary IP given")
1735     self.op.secondary_ip = secondary_ip
1736
1737     node_list = cfg.GetNodeList()
1738     if not self.op.readd and node in node_list:
1739       raise errors.OpPrereqError("Node %s is already in the configuration" %
1740                                  node)
1741     elif self.op.readd and node not in node_list:
1742       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1743
1744     for existing_node_name in node_list:
1745       existing_node = cfg.GetNodeInfo(existing_node_name)
1746
1747       if self.op.readd and node == existing_node_name:
1748         if (existing_node.primary_ip != primary_ip or
1749             existing_node.secondary_ip != secondary_ip):
1750           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1751                                      " address configuration as before")
1752         continue
1753
1754       if (existing_node.primary_ip == primary_ip or
1755           existing_node.secondary_ip == primary_ip or
1756           existing_node.primary_ip == secondary_ip or
1757           existing_node.secondary_ip == secondary_ip):
1758         raise errors.OpPrereqError("New node ip address(es) conflict with"
1759                                    " existing node %s" % existing_node.name)
1760
1761     # check that the type of the node (single versus dual homed) is the
1762     # same as for the master
1763     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1764     master_singlehomed = myself.secondary_ip == myself.primary_ip
1765     newbie_singlehomed = secondary_ip == primary_ip
1766     if master_singlehomed != newbie_singlehomed:
1767       if master_singlehomed:
1768         raise errors.OpPrereqError("The master has no private ip but the"
1769                                    " new node has one")
1770       else:
1771         raise errors.OpPrereqError("The master has a private ip but the"
1772                                    " new node doesn't have one")
1773
1774     # checks reachablity
1775     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1776       raise errors.OpPrereqError("Node not reachable by ping")
1777
1778     if not newbie_singlehomed:
1779       # check reachability from my secondary ip to newbie's secondary ip
1780       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1781                            source=myself.secondary_ip):
1782         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1783                                    " based ping to noded port")
1784
1785     self.new_node = objects.Node(name=node,
1786                                  primary_ip=primary_ip,
1787                                  secondary_ip=secondary_ip)
1788
1789   def Exec(self, feedback_fn):
1790     """Adds the new node to the cluster.
1791
1792     """
1793     new_node = self.new_node
1794     node = new_node.name
1795
1796     # check connectivity
1797     result = self.rpc.call_version([node])[node]
1798     if result:
1799       if constants.PROTOCOL_VERSION == result:
1800         logger.Info("communication to node %s fine, sw version %s match" %
1801                     (node, result))
1802       else:
1803         raise errors.OpExecError("Version mismatch master version %s,"
1804                                  " node version %s" %
1805                                  (constants.PROTOCOL_VERSION, result))
1806     else:
1807       raise errors.OpExecError("Cannot get version from the new node")
1808
1809     # setup ssh on node
1810     logger.Info("copy ssh key to node %s" % node)
1811     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1812     keyarray = []
1813     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1814                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1815                 priv_key, pub_key]
1816
1817     for i in keyfiles:
1818       f = open(i, 'r')
1819       try:
1820         keyarray.append(f.read())
1821       finally:
1822         f.close()
1823
1824     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1825                                     keyarray[2],
1826                                     keyarray[3], keyarray[4], keyarray[5])
1827
1828     if not result:
1829       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1830
1831     # Add node to our /etc/hosts, and add key to known_hosts
1832     utils.AddHostToEtcHosts(new_node.name)
1833
1834     if new_node.secondary_ip != new_node.primary_ip:
1835       if not self.rpc.call_node_has_ip_address(new_node.name,
1836                                                new_node.secondary_ip):
1837         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1838                                  " you gave (%s). Please fix and re-run this"
1839                                  " command." % new_node.secondary_ip)
1840
1841     node_verify_list = [self.cfg.GetMasterNode()]
1842     node_verify_param = {
1843       'nodelist': [node],
1844       # TODO: do a node-net-test as well?
1845     }
1846
1847     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1848                                        self.cfg.GetClusterName())
1849     for verifier in node_verify_list:
1850       if not result[verifier]:
1851         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1852                                  " for remote verification" % verifier)
1853       if result[verifier]['nodelist']:
1854         for failed in result[verifier]['nodelist']:
1855           feedback_fn("ssh/hostname verification failed %s -> %s" %
1856                       (verifier, result[verifier]['nodelist'][failed]))
1857         raise errors.OpExecError("ssh/hostname verification failed.")
1858
1859     # Distribute updated /etc/hosts and known_hosts to all nodes,
1860     # including the node just added
1861     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1862     dist_nodes = self.cfg.GetNodeList()
1863     if not self.op.readd:
1864       dist_nodes.append(node)
1865     if myself.name in dist_nodes:
1866       dist_nodes.remove(myself.name)
1867
1868     logger.Debug("Copying hosts and known_hosts to all nodes")
1869     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1870       result = self.rpc.call_upload_file(dist_nodes, fname)
1871       for to_node in dist_nodes:
1872         if not result[to_node]:
1873           logger.Error("copy of file %s to node %s failed" %
1874                        (fname, to_node))
1875
1876     to_copy = []
1877     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1878       to_copy.append(constants.VNC_PASSWORD_FILE)
1879     for fname in to_copy:
1880       result = self.rpc.call_upload_file([node], fname)
1881       if not result[node]:
1882         logger.Error("could not copy file %s to node %s" % (fname, node))
1883
1884     if self.op.readd:
1885       self.context.ReaddNode(new_node)
1886     else:
1887       self.context.AddNode(new_node)
1888
1889
1890 class LUQueryClusterInfo(NoHooksLU):
1891   """Query cluster configuration.
1892
1893   """
1894   _OP_REQP = []
1895   REQ_MASTER = False
1896   REQ_BGL = False
1897
1898   def ExpandNames(self):
1899     self.needed_locks = {}
1900
1901   def CheckPrereq(self):
1902     """No prerequsites needed for this LU.
1903
1904     """
1905     pass
1906
1907   def Exec(self, feedback_fn):
1908     """Return cluster config.
1909
1910     """
1911     cluster = self.cfg.GetClusterInfo()
1912     result = {
1913       "software_version": constants.RELEASE_VERSION,
1914       "protocol_version": constants.PROTOCOL_VERSION,
1915       "config_version": constants.CONFIG_VERSION,
1916       "os_api_version": constants.OS_API_VERSION,
1917       "export_version": constants.EXPORT_VERSION,
1918       "architecture": (platform.architecture()[0], platform.machine()),
1919       "name": cluster.cluster_name,
1920       "master": cluster.master_node,
1921       "hypervisor_type": cluster.hypervisor,
1922       "enabled_hypervisors": cluster.enabled_hypervisors,
1923       "hvparams": cluster.hvparams,
1924       "beparams": cluster.beparams,
1925       }
1926
1927     return result
1928
1929
1930 class LUQueryConfigValues(NoHooksLU):
1931   """Return configuration values.
1932
1933   """
1934   _OP_REQP = []
1935   REQ_BGL = False
1936
1937   def ExpandNames(self):
1938     self.needed_locks = {}
1939
1940     static_fields = ["cluster_name", "master_node", "drain_flag"]
1941     _CheckOutputFields(static=static_fields,
1942                        dynamic=[],
1943                        selected=self.op.output_fields)
1944
1945   def CheckPrereq(self):
1946     """No prerequisites.
1947
1948     """
1949     pass
1950
1951   def Exec(self, feedback_fn):
1952     """Dump a representation of the cluster config to the standard output.
1953
1954     """
1955     values = []
1956     for field in self.op.output_fields:
1957       if field == "cluster_name":
1958         entry = self.cfg.GetClusterName()
1959       elif field == "master_node":
1960         entry = self.cfg.GetMasterNode()
1961       elif field == "drain_flag":
1962         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1963       else:
1964         raise errors.ParameterError(field)
1965       values.append(entry)
1966     return values
1967
1968
1969 class LUActivateInstanceDisks(NoHooksLU):
1970   """Bring up an instance's disks.
1971
1972   """
1973   _OP_REQP = ["instance_name"]
1974   REQ_BGL = False
1975
1976   def ExpandNames(self):
1977     self._ExpandAndLockInstance()
1978     self.needed_locks[locking.LEVEL_NODE] = []
1979     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1980
1981   def DeclareLocks(self, level):
1982     if level == locking.LEVEL_NODE:
1983       self._LockInstancesNodes()
1984
1985   def CheckPrereq(self):
1986     """Check prerequisites.
1987
1988     This checks that the instance is in the cluster.
1989
1990     """
1991     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1992     assert self.instance is not None, \
1993       "Cannot retrieve locked instance %s" % self.op.instance_name
1994
1995   def Exec(self, feedback_fn):
1996     """Activate the disks.
1997
1998     """
1999     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2000     if not disks_ok:
2001       raise errors.OpExecError("Cannot activate block devices")
2002
2003     return disks_info
2004
2005
2006 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2007   """Prepare the block devices for an instance.
2008
2009   This sets up the block devices on all nodes.
2010
2011   Args:
2012     instance: a ganeti.objects.Instance object
2013     ignore_secondaries: if true, errors on secondary nodes won't result
2014                         in an error return from the function
2015
2016   Returns:
2017     false if the operation failed
2018     list of (host, instance_visible_name, node_visible_name) if the operation
2019          suceeded with the mapping from node devices to instance devices
2020   """
2021   device_info = []
2022   disks_ok = True
2023   iname = instance.name
2024   # With the two passes mechanism we try to reduce the window of
2025   # opportunity for the race condition of switching DRBD to primary
2026   # before handshaking occured, but we do not eliminate it
2027
2028   # The proper fix would be to wait (with some limits) until the
2029   # connection has been made and drbd transitions from WFConnection
2030   # into any other network-connected state (Connected, SyncTarget,
2031   # SyncSource, etc.)
2032
2033   # 1st pass, assemble on all nodes in secondary mode
2034   for inst_disk in instance.disks:
2035     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2036       lu.cfg.SetDiskID(node_disk, node)
2037       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2038       if not result:
2039         logger.Error("could not prepare block device %s on node %s"
2040                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
2041         if not ignore_secondaries:
2042           disks_ok = False
2043
2044   # FIXME: race condition on drbd migration to primary
2045
2046   # 2nd pass, do only the primary node
2047   for inst_disk in instance.disks:
2048     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2049       if node != instance.primary_node:
2050         continue
2051       lu.cfg.SetDiskID(node_disk, node)
2052       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2053       if not result:
2054         logger.Error("could not prepare block device %s on node %s"
2055                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2056         disks_ok = False
2057     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2058
2059   # leave the disks configured for the primary node
2060   # this is a workaround that would be fixed better by
2061   # improving the logical/physical id handling
2062   for disk in instance.disks:
2063     lu.cfg.SetDiskID(disk, instance.primary_node)
2064
2065   return disks_ok, device_info
2066
2067
2068 def _StartInstanceDisks(lu, instance, force):
2069   """Start the disks of an instance.
2070
2071   """
2072   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2073                                            ignore_secondaries=force)
2074   if not disks_ok:
2075     _ShutdownInstanceDisks(lu, instance)
2076     if force is not None and not force:
2077       logger.Error("If the message above refers to a secondary node,"
2078                    " you can retry the operation using '--force'.")
2079     raise errors.OpExecError("Disk consistency error")
2080
2081
2082 class LUDeactivateInstanceDisks(NoHooksLU):
2083   """Shutdown an instance's disks.
2084
2085   """
2086   _OP_REQP = ["instance_name"]
2087   REQ_BGL = False
2088
2089   def ExpandNames(self):
2090     self._ExpandAndLockInstance()
2091     self.needed_locks[locking.LEVEL_NODE] = []
2092     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2093
2094   def DeclareLocks(self, level):
2095     if level == locking.LEVEL_NODE:
2096       self._LockInstancesNodes()
2097
2098   def CheckPrereq(self):
2099     """Check prerequisites.
2100
2101     This checks that the instance is in the cluster.
2102
2103     """
2104     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2105     assert self.instance is not None, \
2106       "Cannot retrieve locked instance %s" % self.op.instance_name
2107
2108   def Exec(self, feedback_fn):
2109     """Deactivate the disks
2110
2111     """
2112     instance = self.instance
2113     _SafeShutdownInstanceDisks(self, instance)
2114
2115
2116 def _SafeShutdownInstanceDisks(lu, instance):
2117   """Shutdown block devices of an instance.
2118
2119   This function checks if an instance is running, before calling
2120   _ShutdownInstanceDisks.
2121
2122   """
2123   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2124                                       [instance.hypervisor])
2125   ins_l = ins_l[instance.primary_node]
2126   if not type(ins_l) is list:
2127     raise errors.OpExecError("Can't contact node '%s'" %
2128                              instance.primary_node)
2129
2130   if instance.name in ins_l:
2131     raise errors.OpExecError("Instance is running, can't shutdown"
2132                              " block devices.")
2133
2134   _ShutdownInstanceDisks(lu, instance)
2135
2136
2137 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2138   """Shutdown block devices of an instance.
2139
2140   This does the shutdown on all nodes of the instance.
2141
2142   If the ignore_primary is false, errors on the primary node are
2143   ignored.
2144
2145   """
2146   result = True
2147   for disk in instance.disks:
2148     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2149       lu.cfg.SetDiskID(top_disk, node)
2150       if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2151         logger.Error("could not shutdown block device %s on node %s" %
2152                      (disk.iv_name, node))
2153         if not ignore_primary or node != instance.primary_node:
2154           result = False
2155   return result
2156
2157
2158 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2159   """Checks if a node has enough free memory.
2160
2161   This function check if a given node has the needed amount of free
2162   memory. In case the node has less memory or we cannot get the
2163   information from the node, this function raise an OpPrereqError
2164   exception.
2165
2166   @type lu: C{LogicalUnit}
2167   @param lu: a logical unit from which we get configuration data
2168   @type node: C{str}
2169   @param node: the node to check
2170   @type reason: C{str}
2171   @param reason: string to use in the error message
2172   @type requested: C{int}
2173   @param requested: the amount of memory in MiB to check for
2174   @type hypervisor: C{str}
2175   @param hypervisor: the hypervisor to ask for memory stats
2176   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2177       we cannot check the node
2178
2179   """
2180   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2181   if not nodeinfo or not isinstance(nodeinfo, dict):
2182     raise errors.OpPrereqError("Could not contact node %s for resource"
2183                              " information" % (node,))
2184
2185   free_mem = nodeinfo[node].get('memory_free')
2186   if not isinstance(free_mem, int):
2187     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2188                              " was '%s'" % (node, free_mem))
2189   if requested > free_mem:
2190     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2191                              " needed %s MiB, available %s MiB" %
2192                              (node, reason, requested, free_mem))
2193
2194
2195 class LUStartupInstance(LogicalUnit):
2196   """Starts an instance.
2197
2198   """
2199   HPATH = "instance-start"
2200   HTYPE = constants.HTYPE_INSTANCE
2201   _OP_REQP = ["instance_name", "force"]
2202   REQ_BGL = False
2203
2204   def ExpandNames(self):
2205     self._ExpandAndLockInstance()
2206     self.needed_locks[locking.LEVEL_NODE] = []
2207     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2208
2209   def DeclareLocks(self, level):
2210     if level == locking.LEVEL_NODE:
2211       self._LockInstancesNodes()
2212
2213   def BuildHooksEnv(self):
2214     """Build hooks env.
2215
2216     This runs on master, primary and secondary nodes of the instance.
2217
2218     """
2219     env = {
2220       "FORCE": self.op.force,
2221       }
2222     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2223     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2224           list(self.instance.secondary_nodes))
2225     return env, nl, nl
2226
2227   def CheckPrereq(self):
2228     """Check prerequisites.
2229
2230     This checks that the instance is in the cluster.
2231
2232     """
2233     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2234     assert self.instance is not None, \
2235       "Cannot retrieve locked instance %s" % self.op.instance_name
2236
2237     bep = self.cfg.GetClusterInfo().FillBE(instance)
2238     # check bridges existance
2239     _CheckInstanceBridgesExist(self, instance)
2240
2241     _CheckNodeFreeMemory(self, instance.primary_node,
2242                          "starting instance %s" % instance.name,
2243                          bep[constants.BE_MEMORY], instance.hypervisor)
2244
2245   def Exec(self, feedback_fn):
2246     """Start the instance.
2247
2248     """
2249     instance = self.instance
2250     force = self.op.force
2251     extra_args = getattr(self.op, "extra_args", "")
2252
2253     self.cfg.MarkInstanceUp(instance.name)
2254
2255     node_current = instance.primary_node
2256
2257     _StartInstanceDisks(self, instance, force)
2258
2259     if not self.rpc.call_instance_start(node_current, instance, extra_args):
2260       _ShutdownInstanceDisks(self, instance)
2261       raise errors.OpExecError("Could not start instance")
2262
2263
2264 class LURebootInstance(LogicalUnit):
2265   """Reboot an instance.
2266
2267   """
2268   HPATH = "instance-reboot"
2269   HTYPE = constants.HTYPE_INSTANCE
2270   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2271   REQ_BGL = False
2272
2273   def ExpandNames(self):
2274     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2275                                    constants.INSTANCE_REBOOT_HARD,
2276                                    constants.INSTANCE_REBOOT_FULL]:
2277       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2278                                   (constants.INSTANCE_REBOOT_SOFT,
2279                                    constants.INSTANCE_REBOOT_HARD,
2280                                    constants.INSTANCE_REBOOT_FULL))
2281     self._ExpandAndLockInstance()
2282     self.needed_locks[locking.LEVEL_NODE] = []
2283     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2284
2285   def DeclareLocks(self, level):
2286     if level == locking.LEVEL_NODE:
2287       primary_only = not constants.INSTANCE_REBOOT_FULL
2288       self._LockInstancesNodes(primary_only=primary_only)
2289
2290   def BuildHooksEnv(self):
2291     """Build hooks env.
2292
2293     This runs on master, primary and secondary nodes of the instance.
2294
2295     """
2296     env = {
2297       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2298       }
2299     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2300     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2301           list(self.instance.secondary_nodes))
2302     return env, nl, nl
2303
2304   def CheckPrereq(self):
2305     """Check prerequisites.
2306
2307     This checks that the instance is in the cluster.
2308
2309     """
2310     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2311     assert self.instance is not None, \
2312       "Cannot retrieve locked instance %s" % self.op.instance_name
2313
2314     # check bridges existance
2315     _CheckInstanceBridgesExist(self, instance)
2316
2317   def Exec(self, feedback_fn):
2318     """Reboot the instance.
2319
2320     """
2321     instance = self.instance
2322     ignore_secondaries = self.op.ignore_secondaries
2323     reboot_type = self.op.reboot_type
2324     extra_args = getattr(self.op, "extra_args", "")
2325
2326     node_current = instance.primary_node
2327
2328     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2329                        constants.INSTANCE_REBOOT_HARD]:
2330       if not self.rpc.call_instance_reboot(node_current, instance,
2331                                            reboot_type, extra_args):
2332         raise errors.OpExecError("Could not reboot instance")
2333     else:
2334       if not self.rpc.call_instance_shutdown(node_current, instance):
2335         raise errors.OpExecError("could not shutdown instance for full reboot")
2336       _ShutdownInstanceDisks(self, instance)
2337       _StartInstanceDisks(self, instance, ignore_secondaries)
2338       if not self.rpc.call_instance_start(node_current, instance, extra_args):
2339         _ShutdownInstanceDisks(self, instance)
2340         raise errors.OpExecError("Could not start instance for full reboot")
2341
2342     self.cfg.MarkInstanceUp(instance.name)
2343
2344
2345 class LUShutdownInstance(LogicalUnit):
2346   """Shutdown an instance.
2347
2348   """
2349   HPATH = "instance-stop"
2350   HTYPE = constants.HTYPE_INSTANCE
2351   _OP_REQP = ["instance_name"]
2352   REQ_BGL = False
2353
2354   def ExpandNames(self):
2355     self._ExpandAndLockInstance()
2356     self.needed_locks[locking.LEVEL_NODE] = []
2357     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2358
2359   def DeclareLocks(self, level):
2360     if level == locking.LEVEL_NODE:
2361       self._LockInstancesNodes()
2362
2363   def BuildHooksEnv(self):
2364     """Build hooks env.
2365
2366     This runs on master, primary and secondary nodes of the instance.
2367
2368     """
2369     env = _BuildInstanceHookEnvByObject(self, self.instance)
2370     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2371           list(self.instance.secondary_nodes))
2372     return env, nl, nl
2373
2374   def CheckPrereq(self):
2375     """Check prerequisites.
2376
2377     This checks that the instance is in the cluster.
2378
2379     """
2380     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2381     assert self.instance is not None, \
2382       "Cannot retrieve locked instance %s" % self.op.instance_name
2383
2384   def Exec(self, feedback_fn):
2385     """Shutdown the instance.
2386
2387     """
2388     instance = self.instance
2389     node_current = instance.primary_node
2390     self.cfg.MarkInstanceDown(instance.name)
2391     if not self.rpc.call_instance_shutdown(node_current, instance):
2392       logger.Error("could not shutdown instance")
2393
2394     _ShutdownInstanceDisks(self, instance)
2395
2396
2397 class LUReinstallInstance(LogicalUnit):
2398   """Reinstall an instance.
2399
2400   """
2401   HPATH = "instance-reinstall"
2402   HTYPE = constants.HTYPE_INSTANCE
2403   _OP_REQP = ["instance_name"]
2404   REQ_BGL = False
2405
2406   def ExpandNames(self):
2407     self._ExpandAndLockInstance()
2408     self.needed_locks[locking.LEVEL_NODE] = []
2409     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2410
2411   def DeclareLocks(self, level):
2412     if level == locking.LEVEL_NODE:
2413       self._LockInstancesNodes()
2414
2415   def BuildHooksEnv(self):
2416     """Build hooks env.
2417
2418     This runs on master, primary and secondary nodes of the instance.
2419
2420     """
2421     env = _BuildInstanceHookEnvByObject(self, self.instance)
2422     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2423           list(self.instance.secondary_nodes))
2424     return env, nl, nl
2425
2426   def CheckPrereq(self):
2427     """Check prerequisites.
2428
2429     This checks that the instance is in the cluster and is not running.
2430
2431     """
2432     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2433     assert instance is not None, \
2434       "Cannot retrieve locked instance %s" % self.op.instance_name
2435
2436     if instance.disk_template == constants.DT_DISKLESS:
2437       raise errors.OpPrereqError("Instance '%s' has no disks" %
2438                                  self.op.instance_name)
2439     if instance.status != "down":
2440       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2441                                  self.op.instance_name)
2442     remote_info = self.rpc.call_instance_info(instance.primary_node,
2443                                               instance.name,
2444                                               instance.hypervisor)
2445     if remote_info:
2446       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2447                                  (self.op.instance_name,
2448                                   instance.primary_node))
2449
2450     self.op.os_type = getattr(self.op, "os_type", None)
2451     if self.op.os_type is not None:
2452       # OS verification
2453       pnode = self.cfg.GetNodeInfo(
2454         self.cfg.ExpandNodeName(instance.primary_node))
2455       if pnode is None:
2456         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2457                                    self.op.pnode)
2458       os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2459       if not os_obj:
2460         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2461                                    " primary node"  % self.op.os_type)
2462
2463     self.instance = instance
2464
2465   def Exec(self, feedback_fn):
2466     """Reinstall the instance.
2467
2468     """
2469     inst = self.instance
2470
2471     if self.op.os_type is not None:
2472       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2473       inst.os = self.op.os_type
2474       self.cfg.Update(inst)
2475
2476     _StartInstanceDisks(self, inst, None)
2477     try:
2478       feedback_fn("Running the instance OS create scripts...")
2479       if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2480                                            "sda", "sdb"):
2481         raise errors.OpExecError("Could not install OS for instance %s"
2482                                  " on node %s" %
2483                                  (inst.name, inst.primary_node))
2484     finally:
2485       _ShutdownInstanceDisks(self, inst)
2486
2487
2488 class LURenameInstance(LogicalUnit):
2489   """Rename an instance.
2490
2491   """
2492   HPATH = "instance-rename"
2493   HTYPE = constants.HTYPE_INSTANCE
2494   _OP_REQP = ["instance_name", "new_name"]
2495
2496   def BuildHooksEnv(self):
2497     """Build hooks env.
2498
2499     This runs on master, primary and secondary nodes of the instance.
2500
2501     """
2502     env = _BuildInstanceHookEnvByObject(self, self.instance)
2503     env["INSTANCE_NEW_NAME"] = self.op.new_name
2504     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2505           list(self.instance.secondary_nodes))
2506     return env, nl, nl
2507
2508   def CheckPrereq(self):
2509     """Check prerequisites.
2510
2511     This checks that the instance is in the cluster and is not running.
2512
2513     """
2514     instance = self.cfg.GetInstanceInfo(
2515       self.cfg.ExpandInstanceName(self.op.instance_name))
2516     if instance is None:
2517       raise errors.OpPrereqError("Instance '%s' not known" %
2518                                  self.op.instance_name)
2519     if instance.status != "down":
2520       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2521                                  self.op.instance_name)
2522     remote_info = self.rpc.call_instance_info(instance.primary_node,
2523                                               instance.name,
2524                                               instance.hypervisor)
2525     if remote_info:
2526       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2527                                  (self.op.instance_name,
2528                                   instance.primary_node))
2529     self.instance = instance
2530
2531     # new name verification
2532     name_info = utils.HostInfo(self.op.new_name)
2533
2534     self.op.new_name = new_name = name_info.name
2535     instance_list = self.cfg.GetInstanceList()
2536     if new_name in instance_list:
2537       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2538                                  new_name)
2539
2540     if not getattr(self.op, "ignore_ip", False):
2541       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2542         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2543                                    (name_info.ip, new_name))
2544
2545
2546   def Exec(self, feedback_fn):
2547     """Reinstall the instance.
2548
2549     """
2550     inst = self.instance
2551     old_name = inst.name
2552
2553     if inst.disk_template == constants.DT_FILE:
2554       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2555
2556     self.cfg.RenameInstance(inst.name, self.op.new_name)
2557     # Change the instance lock. This is definitely safe while we hold the BGL
2558     self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2559     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2560
2561     # re-read the instance from the configuration after rename
2562     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2563
2564     if inst.disk_template == constants.DT_FILE:
2565       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2566       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2567                                                      old_file_storage_dir,
2568                                                      new_file_storage_dir)
2569
2570       if not result:
2571         raise errors.OpExecError("Could not connect to node '%s' to rename"
2572                                  " directory '%s' to '%s' (but the instance"
2573                                  " has been renamed in Ganeti)" % (
2574                                  inst.primary_node, old_file_storage_dir,
2575                                  new_file_storage_dir))
2576
2577       if not result[0]:
2578         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2579                                  " (but the instance has been renamed in"
2580                                  " Ganeti)" % (old_file_storage_dir,
2581                                                new_file_storage_dir))
2582
2583     _StartInstanceDisks(self, inst, None)
2584     try:
2585       if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2586                                                old_name):
2587         msg = ("Could not run OS rename script for instance %s on node %s"
2588                " (but the instance has been renamed in Ganeti)" %
2589                (inst.name, inst.primary_node))
2590         logger.Error(msg)
2591     finally:
2592       _ShutdownInstanceDisks(self, inst)
2593
2594
2595 class LURemoveInstance(LogicalUnit):
2596   """Remove an instance.
2597
2598   """
2599   HPATH = "instance-remove"
2600   HTYPE = constants.HTYPE_INSTANCE
2601   _OP_REQP = ["instance_name", "ignore_failures"]
2602   REQ_BGL = False
2603
2604   def ExpandNames(self):
2605     self._ExpandAndLockInstance()
2606     self.needed_locks[locking.LEVEL_NODE] = []
2607     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2608
2609   def DeclareLocks(self, level):
2610     if level == locking.LEVEL_NODE:
2611       self._LockInstancesNodes()
2612
2613   def BuildHooksEnv(self):
2614     """Build hooks env.
2615
2616     This runs on master, primary and secondary nodes of the instance.
2617
2618     """
2619     env = _BuildInstanceHookEnvByObject(self, self.instance)
2620     nl = [self.cfg.GetMasterNode()]
2621     return env, nl, nl
2622
2623   def CheckPrereq(self):
2624     """Check prerequisites.
2625
2626     This checks that the instance is in the cluster.
2627
2628     """
2629     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2630     assert self.instance is not None, \
2631       "Cannot retrieve locked instance %s" % self.op.instance_name
2632
2633   def Exec(self, feedback_fn):
2634     """Remove the instance.
2635
2636     """
2637     instance = self.instance
2638     logger.Info("shutting down instance %s on node %s" %
2639                 (instance.name, instance.primary_node))
2640
2641     if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2642       if self.op.ignore_failures:
2643         feedback_fn("Warning: can't shutdown instance")
2644       else:
2645         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2646                                  (instance.name, instance.primary_node))
2647
2648     logger.Info("removing block devices for instance %s" % instance.name)
2649
2650     if not _RemoveDisks(self, instance):
2651       if self.op.ignore_failures:
2652         feedback_fn("Warning: can't remove instance's disks")
2653       else:
2654         raise errors.OpExecError("Can't remove instance's disks")
2655
2656     logger.Info("removing instance %s out of cluster config" % instance.name)
2657
2658     self.cfg.RemoveInstance(instance.name)
2659     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2660
2661
2662 class LUQueryInstances(NoHooksLU):
2663   """Logical unit for querying instances.
2664
2665   """
2666   _OP_REQP = ["output_fields", "names"]
2667   REQ_BGL = False
2668
2669   def ExpandNames(self):
2670     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2671     hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
2672     bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
2673     self.static_fields = frozenset([
2674       "name", "os", "pnode", "snodes",
2675       "admin_state", "admin_ram",
2676       "disk_template", "ip", "mac", "bridge",
2677       "sda_size", "sdb_size", "vcpus", "tags",
2678       "network_port",
2679       "serial_no", "hypervisor", "hvparams",
2680       ] + hvp + bep)
2681
2682     _CheckOutputFields(static=self.static_fields,
2683                        dynamic=self.dynamic_fields,
2684                        selected=self.op.output_fields)
2685
2686     self.needed_locks = {}
2687     self.share_locks[locking.LEVEL_INSTANCE] = 1
2688     self.share_locks[locking.LEVEL_NODE] = 1
2689
2690     if self.op.names:
2691       self.wanted = _GetWantedInstances(self, self.op.names)
2692     else:
2693       self.wanted = locking.ALL_SET
2694
2695     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2696     if self.do_locking:
2697       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2698       self.needed_locks[locking.LEVEL_NODE] = []
2699       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2700
2701   def DeclareLocks(self, level):
2702     if level == locking.LEVEL_NODE and self.do_locking:
2703       self._LockInstancesNodes()
2704
2705   def CheckPrereq(self):
2706     """Check prerequisites.
2707
2708     """
2709     pass
2710
2711   def Exec(self, feedback_fn):
2712     """Computes the list of nodes and their attributes.
2713
2714     """
2715     all_info = self.cfg.GetAllInstancesInfo()
2716     if self.do_locking:
2717       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2718     elif self.wanted != locking.ALL_SET:
2719       instance_names = self.wanted
2720       missing = set(instance_names).difference(all_info.keys())
2721       if missing:
2722         raise errors.OpExecError(
2723           "Some instances were removed before retrieving their data: %s"
2724           % missing)
2725     else:
2726       instance_names = all_info.keys()
2727
2728     instance_names = utils.NiceSort(instance_names)
2729     instance_list = [all_info[iname] for iname in instance_names]
2730
2731     # begin data gathering
2732
2733     nodes = frozenset([inst.primary_node for inst in instance_list])
2734     hv_list = list(set([inst.hypervisor for inst in instance_list]))
2735
2736     bad_nodes = []
2737     if self.dynamic_fields.intersection(self.op.output_fields):
2738       live_data = {}
2739       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2740       for name in nodes:
2741         result = node_data[name]
2742         if result:
2743           live_data.update(result)
2744         elif result == False:
2745           bad_nodes.append(name)
2746         # else no instance is alive
2747     else:
2748       live_data = dict([(name, {}) for name in instance_names])
2749
2750     # end data gathering
2751
2752     HVPREFIX = "hv/"
2753     BEPREFIX = "be/"
2754     output = []
2755     for instance in instance_list:
2756       iout = []
2757       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2758       i_be = self.cfg.GetClusterInfo().FillBE(instance)
2759       for field in self.op.output_fields:
2760         if field == "name":
2761           val = instance.name
2762         elif field == "os":
2763           val = instance.os
2764         elif field == "pnode":
2765           val = instance.primary_node
2766         elif field == "snodes":
2767           val = list(instance.secondary_nodes)
2768         elif field == "admin_state":
2769           val = (instance.status != "down")
2770         elif field == "oper_state":
2771           if instance.primary_node in bad_nodes:
2772             val = None
2773           else:
2774             val = bool(live_data.get(instance.name))
2775         elif field == "status":
2776           if instance.primary_node in bad_nodes:
2777             val = "ERROR_nodedown"
2778           else:
2779             running = bool(live_data.get(instance.name))
2780             if running:
2781               if instance.status != "down":
2782                 val = "running"
2783               else:
2784                 val = "ERROR_up"
2785             else:
2786               if instance.status != "down":
2787                 val = "ERROR_down"
2788               else:
2789                 val = "ADMIN_down"
2790         elif field == "oper_ram":
2791           if instance.primary_node in bad_nodes:
2792             val = None
2793           elif instance.name in live_data:
2794             val = live_data[instance.name].get("memory", "?")
2795           else:
2796             val = "-"
2797         elif field == "disk_template":
2798           val = instance.disk_template
2799         elif field == "ip":
2800           val = instance.nics[0].ip
2801         elif field == "bridge":
2802           val = instance.nics[0].bridge
2803         elif field == "mac":
2804           val = instance.nics[0].mac
2805         elif field == "sda_size" or field == "sdb_size":
2806           disk = instance.FindDisk(field[:3])
2807           if disk is None:
2808             val = None
2809           else:
2810             val = disk.size
2811         elif field == "tags":
2812           val = list(instance.GetTags())
2813         elif field == "serial_no":
2814           val = instance.serial_no
2815         elif field == "network_port":
2816           val = instance.network_port
2817         elif field == "hypervisor":
2818           val = instance.hypervisor
2819         elif field == "hvparams":
2820           val = i_hv
2821         elif (field.startswith(HVPREFIX) and
2822               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2823           val = i_hv.get(field[len(HVPREFIX):], None)
2824         elif field == "beparams":
2825           val = i_be
2826         elif (field.startswith(BEPREFIX) and
2827               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2828           val = i_be.get(field[len(BEPREFIX):], None)
2829         else:
2830           raise errors.ParameterError(field)
2831         iout.append(val)
2832       output.append(iout)
2833
2834     return output
2835
2836
2837 class LUFailoverInstance(LogicalUnit):
2838   """Failover an instance.
2839
2840   """
2841   HPATH = "instance-failover"
2842   HTYPE = constants.HTYPE_INSTANCE
2843   _OP_REQP = ["instance_name", "ignore_consistency"]
2844   REQ_BGL = False
2845
2846   def ExpandNames(self):
2847     self._ExpandAndLockInstance()
2848     self.needed_locks[locking.LEVEL_NODE] = []
2849     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2850
2851   def DeclareLocks(self, level):
2852     if level == locking.LEVEL_NODE:
2853       self._LockInstancesNodes()
2854
2855   def BuildHooksEnv(self):
2856     """Build hooks env.
2857
2858     This runs on master, primary and secondary nodes of the instance.
2859
2860     """
2861     env = {
2862       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2863       }
2864     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2865     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2866     return env, nl, nl
2867
2868   def CheckPrereq(self):
2869     """Check prerequisites.
2870
2871     This checks that the instance is in the cluster.
2872
2873     """
2874     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2875     assert self.instance is not None, \
2876       "Cannot retrieve locked instance %s" % self.op.instance_name
2877
2878     bep = self.cfg.GetClusterInfo().FillBE(instance)
2879     if instance.disk_template not in constants.DTS_NET_MIRROR:
2880       raise errors.OpPrereqError("Instance's disk layout is not"
2881                                  " network mirrored, cannot failover.")
2882
2883     secondary_nodes = instance.secondary_nodes
2884     if not secondary_nodes:
2885       raise errors.ProgrammerError("no secondary node but using "
2886                                    "a mirrored disk template")
2887
2888     target_node = secondary_nodes[0]
2889     # check memory requirements on the secondary node
2890     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2891                          instance.name, bep[constants.BE_MEMORY],
2892                          instance.hypervisor)
2893
2894     # check bridge existance
2895     brlist = [nic.bridge for nic in instance.nics]
2896     if not self.rpc.call_bridges_exist(target_node, brlist):
2897       raise errors.OpPrereqError("One or more target bridges %s does not"
2898                                  " exist on destination node '%s'" %
2899                                  (brlist, target_node))
2900
2901   def Exec(self, feedback_fn):
2902     """Failover an instance.
2903
2904     The failover is done by shutting it down on its present node and
2905     starting it on the secondary.
2906
2907     """
2908     instance = self.instance
2909
2910     source_node = instance.primary_node
2911     target_node = instance.secondary_nodes[0]
2912
2913     feedback_fn("* checking disk consistency between source and target")
2914     for dev in instance.disks:
2915       # for drbd, these are drbd over lvm
2916       if not _CheckDiskConsistency(self, dev, target_node, False):
2917         if instance.status == "up" and not self.op.ignore_consistency:
2918           raise errors.OpExecError("Disk %s is degraded on target node,"
2919                                    " aborting failover." % dev.iv_name)
2920
2921     feedback_fn("* shutting down instance on source node")
2922     logger.Info("Shutting down instance %s on node %s" %
2923                 (instance.name, source_node))
2924
2925     if not self.rpc.call_instance_shutdown(source_node, instance):
2926       if self.op.ignore_consistency:
2927         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2928                      " anyway. Please make sure node %s is down"  %
2929                      (instance.name, source_node, source_node))
2930       else:
2931         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2932                                  (instance.name, source_node))
2933
2934     feedback_fn("* deactivating the instance's disks on source node")
2935     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2936       raise errors.OpExecError("Can't shut down the instance's disks.")
2937
2938     instance.primary_node = target_node
2939     # distribute new instance config to the other nodes
2940     self.cfg.Update(instance)
2941
2942     # Only start the instance if it's marked as up
2943     if instance.status == "up":
2944       feedback_fn("* activating the instance's disks on target node")
2945       logger.Info("Starting instance %s on node %s" %
2946                   (instance.name, target_node))
2947
2948       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2949                                                ignore_secondaries=True)
2950       if not disks_ok:
2951         _ShutdownInstanceDisks(self, instance)
2952         raise errors.OpExecError("Can't activate the instance's disks")
2953
2954       feedback_fn("* starting the instance on the target node")
2955       if not self.rpc.call_instance_start(target_node, instance, None):
2956         _ShutdownInstanceDisks(self, instance)
2957         raise errors.OpExecError("Could not start instance %s on node %s." %
2958                                  (instance.name, target_node))
2959
2960
2961 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2962   """Create a tree of block devices on the primary node.
2963
2964   This always creates all devices.
2965
2966   """
2967   if device.children:
2968     for child in device.children:
2969       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2970         return False
2971
2972   lu.cfg.SetDiskID(device, node)
2973   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2974                                        instance.name, True, info)
2975   if not new_id:
2976     return False
2977   if device.physical_id is None:
2978     device.physical_id = new_id
2979   return True
2980
2981
2982 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2983   """Create a tree of block devices on a secondary node.
2984
2985   If this device type has to be created on secondaries, create it and
2986   all its children.
2987
2988   If not, just recurse to children keeping the same 'force' value.
2989
2990   """
2991   if device.CreateOnSecondary():
2992     force = True
2993   if device.children:
2994     for child in device.children:
2995       if not _CreateBlockDevOnSecondary(lu, node, instance,
2996                                         child, force, info):
2997         return False
2998
2999   if not force:
3000     return True
3001   lu.cfg.SetDiskID(device, node)
3002   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
3003                                        instance.name, False, info)
3004   if not new_id:
3005     return False
3006   if device.physical_id is None:
3007     device.physical_id = new_id
3008   return True
3009
3010
3011 def _GenerateUniqueNames(lu, exts):
3012   """Generate a suitable LV name.
3013
3014   This will generate a logical volume name for the given instance.
3015
3016   """
3017   results = []
3018   for val in exts:
3019     new_id = lu.cfg.GenerateUniqueID()
3020     results.append("%s%s" % (new_id, val))
3021   return results
3022
3023
3024 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
3025                          p_minor, s_minor):
3026   """Generate a drbd8 device complete with its children.
3027
3028   """
3029   port = lu.cfg.AllocatePort()
3030   vgname = lu.cfg.GetVGName()
3031   shared_secret = lu.cfg.GenerateDRBDSecret()
3032   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3033                           logical_id=(vgname, names[0]))
3034   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3035                           logical_id=(vgname, names[1]))
3036   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
3037                           logical_id=(primary, secondary, port,
3038                                       p_minor, s_minor,
3039                                       shared_secret),
3040                           children=[dev_data, dev_meta],
3041                           iv_name=iv_name)
3042   return drbd_dev
3043
3044
3045 def _GenerateDiskTemplate(lu, template_name,
3046                           instance_name, primary_node,
3047                           secondary_nodes, disk_sz, swap_sz,
3048                           file_storage_dir, file_driver):
3049   """Generate the entire disk layout for a given template type.
3050
3051   """
3052   #TODO: compute space requirements
3053
3054   vgname = lu.cfg.GetVGName()
3055   if template_name == constants.DT_DISKLESS:
3056     disks = []
3057   elif template_name == constants.DT_PLAIN:
3058     if len(secondary_nodes) != 0:
3059       raise errors.ProgrammerError("Wrong template configuration")
3060
3061     names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3062     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3063                            logical_id=(vgname, names[0]),
3064                            iv_name = "sda")
3065     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3066                            logical_id=(vgname, names[1]),
3067                            iv_name = "sdb")
3068     disks = [sda_dev, sdb_dev]
3069   elif template_name == constants.DT_DRBD8:
3070     if len(secondary_nodes) != 1:
3071       raise errors.ProgrammerError("Wrong template configuration")
3072     remote_node = secondary_nodes[0]
3073     (minor_pa, minor_pb,
3074      minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3075       [primary_node, primary_node, remote_node, remote_node], instance_name)
3076
3077     names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3078                                       ".sdb_data", ".sdb_meta"])
3079     drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3080                                         disk_sz, names[0:2], "sda",
3081                                         minor_pa, minor_sa)
3082     drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3083                                         swap_sz, names[2:4], "sdb",
3084                                         minor_pb, minor_sb)
3085     disks = [drbd_sda_dev, drbd_sdb_dev]
3086   elif template_name == constants.DT_FILE:
3087     if len(secondary_nodes) != 0:
3088       raise errors.ProgrammerError("Wrong template configuration")
3089
3090     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3091                                 iv_name="sda", logical_id=(file_driver,
3092                                 "%s/sda" % file_storage_dir))
3093     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3094                                 iv_name="sdb", logical_id=(file_driver,
3095                                 "%s/sdb" % file_storage_dir))
3096     disks = [file_sda_dev, file_sdb_dev]
3097   else:
3098     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3099   return disks
3100
3101
3102 def _GetInstanceInfoText(instance):
3103   """Compute that text that should be added to the disk's metadata.
3104
3105   """
3106   return "originstname+%s" % instance.name
3107
3108
3109 def _CreateDisks(lu, instance):
3110   """Create all disks for an instance.
3111
3112   This abstracts away some work from AddInstance.
3113
3114   Args:
3115     instance: the instance object
3116
3117   Returns:
3118     True or False showing the success of the creation process
3119
3120   """
3121   info = _GetInstanceInfoText(instance)
3122
3123   if instance.disk_template == constants.DT_FILE:
3124     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3125     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3126                                                  file_storage_dir)
3127
3128     if not result:
3129       logger.Error("Could not connect to node '%s'" % instance.primary_node)
3130       return False
3131
3132     if not result[0]:
3133       logger.Error("failed to create directory '%s'" % file_storage_dir)
3134       return False
3135
3136   for device in instance.disks:
3137     logger.Info("creating volume %s for instance %s" %
3138                 (device.iv_name, instance.name))
3139     #HARDCODE
3140     for secondary_node in instance.secondary_nodes:
3141       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3142                                         device, False, info):
3143         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3144                      (device.iv_name, device, secondary_node))
3145         return False
3146     #HARDCODE
3147     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3148                                     instance, device, info):
3149       logger.Error("failed to create volume %s on primary!" %
3150                    device.iv_name)
3151       return False
3152
3153   return True
3154
3155
3156 def _RemoveDisks(lu, instance):
3157   """Remove all disks for an instance.
3158
3159   This abstracts away some work from `AddInstance()` and
3160   `RemoveInstance()`. Note that in case some of the devices couldn't
3161   be removed, the removal will continue with the other ones (compare
3162   with `_CreateDisks()`).
3163
3164   Args:
3165     instance: the instance object
3166
3167   Returns:
3168     True or False showing the success of the removal proces
3169
3170   """
3171   logger.Info("removing block devices for instance %s" % instance.name)
3172
3173   result = True
3174   for device in instance.disks:
3175     for node, disk in device.ComputeNodeTree(instance.primary_node):
3176       lu.cfg.SetDiskID(disk, node)
3177       if not lu.rpc.call_blockdev_remove(node, disk):
3178         logger.Error("could not remove block device %s on node %s,"
3179                      " continuing anyway" %
3180                      (device.iv_name, node))
3181         result = False
3182
3183   if instance.disk_template == constants.DT_FILE:
3184     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3185     if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3186                                                file_storage_dir):
3187       logger.Error("could not remove directory '%s'" % file_storage_dir)
3188       result = False
3189
3190   return result
3191
3192
3193 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3194   """Compute disk size requirements in the volume group
3195
3196   This is currently hard-coded for the two-drive layout.
3197
3198   """
3199   # Required free disk space as a function of disk and swap space
3200   req_size_dict = {
3201     constants.DT_DISKLESS: None,
3202     constants.DT_PLAIN: disk_size + swap_size,
3203     # 256 MB are added for drbd metadata, 128MB for each drbd device
3204     constants.DT_DRBD8: disk_size + swap_size + 256,
3205     constants.DT_FILE: None,
3206   }
3207
3208   if disk_template not in req_size_dict:
3209     raise errors.ProgrammerError("Disk template '%s' size requirement"
3210                                  " is unknown" %  disk_template)
3211
3212   return req_size_dict[disk_template]
3213
3214
3215 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3216   """Hypervisor parameter validation.
3217
3218   This function abstract the hypervisor parameter validation to be
3219   used in both instance create and instance modify.
3220
3221   @type lu: L{LogicalUnit}
3222   @param lu: the logical unit for which we check
3223   @type nodenames: list
3224   @param nodenames: the list of nodes on which we should check
3225   @type hvname: string
3226   @param hvname: the name of the hypervisor we should use
3227   @type hvparams: dict
3228   @param hvparams: the parameters which we need to check
3229   @raise errors.OpPrereqError: if the parameters are not valid
3230
3231   """
3232   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3233                                                   hvname,
3234                                                   hvparams)
3235   for node in nodenames:
3236     info = hvinfo.get(node, None)
3237     if not info or not isinstance(info, (tuple, list)):
3238       raise errors.OpPrereqError("Cannot get current information"
3239                                  " from node '%s' (%s)" % (node, info))
3240     if not info[0]:
3241       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3242                                  " %s" % info[1])
3243
3244
3245 class LUCreateInstance(LogicalUnit):
3246   """Create an instance.
3247
3248   """
3249   HPATH = "instance-add"
3250   HTYPE = constants.HTYPE_INSTANCE
3251   _OP_REQP = ["instance_name", "disk_size",
3252               "disk_template", "swap_size", "mode", "start",
3253               "wait_for_sync", "ip_check", "mac",
3254               "hvparams", "beparams"]
3255   REQ_BGL = False
3256
3257   def _ExpandNode(self, node):
3258     """Expands and checks one node name.
3259
3260     """
3261     node_full = self.cfg.ExpandNodeName(node)
3262     if node_full is None:
3263       raise errors.OpPrereqError("Unknown node %s" % node)
3264     return node_full
3265
3266   def ExpandNames(self):
3267     """ExpandNames for CreateInstance.
3268
3269     Figure out the right locks for instance creation.
3270
3271     """
3272     self.needed_locks = {}
3273
3274     # set optional parameters to none if they don't exist
3275     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3276       if not hasattr(self.op, attr):
3277         setattr(self.op, attr, None)
3278
3279     # cheap checks, mostly valid constants given
3280
3281     # verify creation mode
3282     if self.op.mode not in (constants.INSTANCE_CREATE,
3283                             constants.INSTANCE_IMPORT):
3284       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3285                                  self.op.mode)
3286
3287     # disk template and mirror node verification
3288     if self.op.disk_template not in constants.DISK_TEMPLATES:
3289       raise errors.OpPrereqError("Invalid disk template name")
3290
3291     if self.op.hypervisor is None:
3292       self.op.hypervisor = self.cfg.GetHypervisorType()
3293
3294     cluster = self.cfg.GetClusterInfo()
3295     enabled_hvs = cluster.enabled_hypervisors
3296     if self.op.hypervisor not in enabled_hvs:
3297       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3298                                  " cluster (%s)" % (self.op.hypervisor,
3299                                   ",".join(enabled_hvs)))
3300
3301     # check hypervisor parameter syntax (locally)
3302
3303     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3304                                   self.op.hvparams)
3305     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3306     hv_type.CheckParameterSyntax(filled_hvp)
3307
3308     # fill and remember the beparams dict
3309     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3310                                     self.op.beparams)
3311
3312     #### instance parameters check
3313
3314     # instance name verification
3315     hostname1 = utils.HostInfo(self.op.instance_name)
3316     self.op.instance_name = instance_name = hostname1.name
3317
3318     # this is just a preventive check, but someone might still add this
3319     # instance in the meantime, and creation will fail at lock-add time
3320     if instance_name in self.cfg.GetInstanceList():
3321       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3322                                  instance_name)
3323
3324     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3325
3326     # ip validity checks
3327     ip = getattr(self.op, "ip", None)
3328     if ip is None or ip.lower() == "none":
3329       inst_ip = None
3330     elif ip.lower() == "auto":
3331       inst_ip = hostname1.ip
3332     else:
3333       if not utils.IsValidIP(ip):
3334         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3335                                    " like a valid IP" % ip)
3336       inst_ip = ip
3337     self.inst_ip = self.op.ip = inst_ip
3338     # used in CheckPrereq for ip ping check
3339     self.check_ip = hostname1.ip
3340
3341     # MAC address verification
3342     if self.op.mac != "auto":
3343       if not utils.IsValidMac(self.op.mac.lower()):
3344         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3345                                    self.op.mac)
3346
3347     # file storage checks
3348     if (self.op.file_driver and
3349         not self.op.file_driver in constants.FILE_DRIVER):
3350       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3351                                  self.op.file_driver)
3352
3353     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3354       raise errors.OpPrereqError("File storage directory path not absolute")
3355
3356     ### Node/iallocator related checks
3357     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3358       raise errors.OpPrereqError("One and only one of iallocator and primary"
3359                                  " node must be given")
3360
3361     if self.op.iallocator:
3362       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3363     else:
3364       self.op.pnode = self._ExpandNode(self.op.pnode)
3365       nodelist = [self.op.pnode]
3366       if self.op.snode is not None:
3367         self.op.snode = self._ExpandNode(self.op.snode)
3368         nodelist.append(self.op.snode)
3369       self.needed_locks[locking.LEVEL_NODE] = nodelist
3370
3371     # in case of import lock the source node too
3372     if self.op.mode == constants.INSTANCE_IMPORT:
3373       src_node = getattr(self.op, "src_node", None)
3374       src_path = getattr(self.op, "src_path", None)
3375
3376       if src_node is None or src_path is None:
3377         raise errors.OpPrereqError("Importing an instance requires source"
3378                                    " node and path options")
3379
3380       if not os.path.isabs(src_path):
3381         raise errors.OpPrereqError("The source path must be absolute")
3382
3383       self.op.src_node = src_node = self._ExpandNode(src_node)
3384       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3385         self.needed_locks[locking.LEVEL_NODE].append(src_node)
3386
3387     else: # INSTANCE_CREATE
3388       if getattr(self.op, "os_type", None) is None:
3389         raise errors.OpPrereqError("No guest OS specified")
3390
3391   def _RunAllocator(self):
3392     """Run the allocator based on input opcode.
3393
3394     """
3395     disks = [{"size": self.op.disk_size, "mode": "w"},
3396              {"size": self.op.swap_size, "mode": "w"}]
3397     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3398              "bridge": self.op.bridge}]
3399     ial = IAllocator(self,
3400                      mode=constants.IALLOCATOR_MODE_ALLOC,
3401                      name=self.op.instance_name,
3402                      disk_template=self.op.disk_template,
3403                      tags=[],
3404                      os=self.op.os_type,
3405                      vcpus=self.be_full[constants.BE_VCPUS],
3406                      mem_size=self.be_full[constants.BE_MEMORY],
3407                      disks=disks,
3408                      nics=nics,
3409                      )
3410
3411     ial.Run(self.op.iallocator)
3412
3413     if not ial.success:
3414       raise errors.OpPrereqError("Can't compute nodes using"
3415                                  " iallocator '%s': %s" % (self.op.iallocator,
3416                                                            ial.info))
3417     if len(ial.nodes) != ial.required_nodes:
3418       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3419                                  " of nodes (%s), required %s" %
3420                                  (self.op.iallocator, len(ial.nodes),
3421                                   ial.required_nodes))
3422     self.op.pnode = ial.nodes[0]
3423     logger.ToStdout("Selected nodes for the instance: %s" %
3424                     (", ".join(ial.nodes),))
3425     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3426                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3427     if ial.required_nodes == 2:
3428       self.op.snode = ial.nodes[1]
3429
3430   def BuildHooksEnv(self):
3431     """Build hooks env.
3432
3433     This runs on master, primary and secondary nodes of the instance.
3434
3435     """
3436     env = {
3437       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3438       "INSTANCE_DISK_SIZE": self.op.disk_size,
3439       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3440       "INSTANCE_ADD_MODE": self.op.mode,
3441       }
3442     if self.op.mode == constants.INSTANCE_IMPORT:
3443       env["INSTANCE_SRC_NODE"] = self.op.src_node
3444       env["INSTANCE_SRC_PATH"] = self.op.src_path
3445       env["INSTANCE_SRC_IMAGE"] = self.src_image
3446
3447     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3448       primary_node=self.op.pnode,
3449       secondary_nodes=self.secondaries,
3450       status=self.instance_status,
3451       os_type=self.op.os_type,
3452       memory=self.be_full[constants.BE_MEMORY],
3453       vcpus=self.be_full[constants.BE_VCPUS],
3454       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3455     ))
3456
3457     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3458           self.secondaries)
3459     return env, nl, nl
3460
3461
3462   def CheckPrereq(self):
3463     """Check prerequisites.
3464
3465     """
3466     if (not self.cfg.GetVGName() and
3467         self.op.disk_template not in constants.DTS_NOT_LVM):
3468       raise errors.OpPrereqError("Cluster does not support lvm-based"
3469                                  " instances")
3470
3471
3472     if self.op.mode == constants.INSTANCE_IMPORT:
3473       src_node = self.op.src_node
3474       src_path = self.op.src_path
3475
3476       export_info = self.rpc.call_export_info(src_node, src_path)
3477
3478       if not export_info:
3479         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3480
3481       if not export_info.has_section(constants.INISECT_EXP):
3482         raise errors.ProgrammerError("Corrupted export config")
3483
3484       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3485       if (int(ei_version) != constants.EXPORT_VERSION):
3486         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3487                                    (ei_version, constants.EXPORT_VERSION))
3488
3489       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3490         raise errors.OpPrereqError("Can't import instance with more than"
3491                                    " one data disk")
3492
3493       # FIXME: are the old os-es, disk sizes, etc. useful?
3494       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3495       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3496                                                          'disk0_dump'))
3497       self.src_image = diskimage
3498
3499     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3500
3501     if self.op.start and not self.op.ip_check:
3502       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3503                                  " adding an instance in start mode")
3504
3505     if self.op.ip_check:
3506       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3507         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3508                                    (self.check_ip, self.op.instance_name))
3509
3510     # bridge verification
3511     bridge = getattr(self.op, "bridge", None)
3512     if bridge is None:
3513       self.op.bridge = self.cfg.GetDefBridge()
3514     else:
3515       self.op.bridge = bridge
3516
3517     #### allocator run
3518
3519     if self.op.iallocator is not None:
3520       self._RunAllocator()
3521
3522     #### node related checks
3523
3524     # check primary node
3525     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3526     assert self.pnode is not None, \
3527       "Cannot retrieve locked node %s" % self.op.pnode
3528     self.secondaries = []
3529
3530     # mirror node verification
3531     if self.op.disk_template in constants.DTS_NET_MIRROR:
3532       if self.op.snode is None:
3533         raise errors.OpPrereqError("The networked disk templates need"
3534                                    " a mirror node")
3535       if self.op.snode == pnode.name:
3536         raise errors.OpPrereqError("The secondary node cannot be"
3537                                    " the primary node.")
3538       self.secondaries.append(self.op.snode)
3539
3540     nodenames = [pnode.name] + self.secondaries
3541
3542     req_size = _ComputeDiskSize(self.op.disk_template,
3543                                 self.op.disk_size, self.op.swap_size)
3544
3545     # Check lv size requirements
3546     if req_size is not None:
3547       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3548                                          self.op.hypervisor)
3549       for node in nodenames:
3550         info = nodeinfo.get(node, None)
3551         if not info:
3552           raise errors.OpPrereqError("Cannot get current information"
3553                                      " from node '%s'" % node)
3554         vg_free = info.get('vg_free', None)
3555         if not isinstance(vg_free, int):
3556           raise errors.OpPrereqError("Can't compute free disk space on"
3557                                      " node %s" % node)
3558         if req_size > info['vg_free']:
3559           raise errors.OpPrereqError("Not enough disk space on target node %s."
3560                                      " %d MB available, %d MB required" %
3561                                      (node, info['vg_free'], req_size))
3562
3563     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3564
3565     # os verification
3566     os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3567     if not os_obj:
3568       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3569                                  " primary node"  % self.op.os_type)
3570
3571     # bridge check on primary node
3572     if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3573       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3574                                  " destination node '%s'" %
3575                                  (self.op.bridge, pnode.name))
3576
3577     # memory check on primary node
3578     if self.op.start:
3579       _CheckNodeFreeMemory(self, self.pnode.name,
3580                            "creating instance %s" % self.op.instance_name,
3581                            self.be_full[constants.BE_MEMORY],
3582                            self.op.hypervisor)
3583
3584     if self.op.start:
3585       self.instance_status = 'up'
3586     else:
3587       self.instance_status = 'down'
3588
3589   def Exec(self, feedback_fn):
3590     """Create and add the instance to the cluster.
3591
3592     """
3593     instance = self.op.instance_name
3594     pnode_name = self.pnode.name
3595
3596     if self.op.mac == "auto":
3597       mac_address = self.cfg.GenerateMAC()
3598     else:
3599       mac_address = self.op.mac
3600
3601     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3602     if self.inst_ip is not None:
3603       nic.ip = self.inst_ip
3604
3605     ht_kind = self.op.hypervisor
3606     if ht_kind in constants.HTS_REQ_PORT:
3607       network_port = self.cfg.AllocatePort()
3608     else:
3609       network_port = None
3610
3611     ##if self.op.vnc_bind_address is None:
3612     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3613
3614     # this is needed because os.path.join does not accept None arguments
3615     if self.op.file_storage_dir is None:
3616       string_file_storage_dir = ""
3617     else:
3618       string_file_storage_dir = self.op.file_storage_dir
3619
3620     # build the full file storage dir path
3621     file_storage_dir = os.path.normpath(os.path.join(
3622                                         self.cfg.GetFileStorageDir(),
3623                                         string_file_storage_dir, instance))
3624
3625
3626     disks = _GenerateDiskTemplate(self,
3627                                   self.op.disk_template,
3628                                   instance, pnode_name,
3629                                   self.secondaries, self.op.disk_size,
3630                                   self.op.swap_size,
3631                                   file_storage_dir,
3632                                   self.op.file_driver)
3633
3634     iobj = objects.Instance(name=instance, os=self.op.os_type,
3635                             primary_node=pnode_name,
3636                             nics=[nic], disks=disks,
3637                             disk_template=self.op.disk_template,
3638                             status=self.instance_status,
3639                             network_port=network_port,
3640                             beparams=self.op.beparams,
3641                             hvparams=self.op.hvparams,
3642                             hypervisor=self.op.hypervisor,
3643                             )
3644
3645     feedback_fn("* creating instance disks...")
3646     if not _CreateDisks(self, iobj):
3647       _RemoveDisks(self, iobj)
3648       self.cfg.ReleaseDRBDMinors(instance)
3649       raise errors.OpExecError("Device creation failed, reverting...")
3650
3651     feedback_fn("adding instance %s to cluster config" % instance)
3652
3653     self.cfg.AddInstance(iobj)
3654     # Declare that we don't want to remove the instance lock anymore, as we've
3655     # added the instance to the config
3656     del self.remove_locks[locking.LEVEL_INSTANCE]
3657     # Remove the temp. assignements for the instance's drbds
3658     self.cfg.ReleaseDRBDMinors(instance)
3659
3660     if self.op.wait_for_sync:
3661       disk_abort = not _WaitForSync(self, iobj)
3662     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3663       # make sure the disks are not degraded (still sync-ing is ok)
3664       time.sleep(15)
3665       feedback_fn("* checking mirrors status")
3666       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3667     else:
3668       disk_abort = False
3669
3670     if disk_abort:
3671       _RemoveDisks(self, iobj)
3672       self.cfg.RemoveInstance(iobj.name)
3673       # Make sure the instance lock gets removed
3674       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3675       raise errors.OpExecError("There are some degraded disks for"
3676                                " this instance")
3677
3678     feedback_fn("creating os for instance %s on node %s" %
3679                 (instance, pnode_name))
3680
3681     if iobj.disk_template != constants.DT_DISKLESS:
3682       if self.op.mode == constants.INSTANCE_CREATE:
3683         feedback_fn("* running the instance OS create scripts...")
3684         if not self.rpc.call_instance_os_add(pnode_name, iobj):
3685           raise errors.OpExecError("could not add os for instance %s"
3686                                    " on node %s" %
3687                                    (instance, pnode_name))
3688
3689       elif self.op.mode == constants.INSTANCE_IMPORT:
3690         feedback_fn("* running the instance OS import scripts...")
3691         src_node = self.op.src_node
3692         src_image = self.src_image
3693         cluster_name = self.cfg.GetClusterName()
3694         if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3695                                                 src_node, src_image,
3696                                                 cluster_name):
3697           raise errors.OpExecError("Could not import os for instance"
3698                                    " %s on node %s" %
3699                                    (instance, pnode_name))
3700       else:
3701         # also checked in the prereq part
3702         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3703                                      % self.op.mode)
3704
3705     if self.op.start:
3706       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3707       feedback_fn("* starting instance...")
3708       if not self.rpc.call_instance_start(pnode_name, iobj, None):
3709         raise errors.OpExecError("Could not start instance")
3710
3711
3712 class LUConnectConsole(NoHooksLU):
3713   """Connect to an instance's console.
3714
3715   This is somewhat special in that it returns the command line that
3716   you need to run on the master node in order to connect to the
3717   console.
3718
3719   """
3720   _OP_REQP = ["instance_name"]
3721   REQ_BGL = False
3722
3723   def ExpandNames(self):
3724     self._ExpandAndLockInstance()
3725
3726   def CheckPrereq(self):
3727     """Check prerequisites.
3728
3729     This checks that the instance is in the cluster.
3730
3731     """
3732     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3733     assert self.instance is not None, \
3734       "Cannot retrieve locked instance %s" % self.op.instance_name
3735
3736   def Exec(self, feedback_fn):
3737     """Connect to the console of an instance
3738
3739     """
3740     instance = self.instance
3741     node = instance.primary_node
3742
3743     node_insts = self.rpc.call_instance_list([node],
3744                                              [instance.hypervisor])[node]
3745     if node_insts is False:
3746       raise errors.OpExecError("Can't connect to node %s." % node)
3747
3748     if instance.name not in node_insts:
3749       raise errors.OpExecError("Instance %s is not running." % instance.name)
3750
3751     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3752
3753     hyper = hypervisor.GetHypervisor(instance.hypervisor)
3754     console_cmd = hyper.GetShellCommandForConsole(instance)
3755
3756     # build ssh cmdline
3757     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3758
3759
3760 class LUReplaceDisks(LogicalUnit):
3761   """Replace the disks of an instance.
3762
3763   """
3764   HPATH = "mirrors-replace"
3765   HTYPE = constants.HTYPE_INSTANCE
3766   _OP_REQP = ["instance_name", "mode", "disks"]
3767   REQ_BGL = False
3768
3769   def ExpandNames(self):
3770     self._ExpandAndLockInstance()
3771
3772     if not hasattr(self.op, "remote_node"):
3773       self.op.remote_node = None
3774
3775     ia_name = getattr(self.op, "iallocator", None)
3776     if ia_name is not None:
3777       if self.op.remote_node is not None:
3778         raise errors.OpPrereqError("Give either the iallocator or the new"
3779                                    " secondary, not both")
3780       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3781     elif self.op.remote_node is not None:
3782       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3783       if remote_node is None:
3784         raise errors.OpPrereqError("Node '%s' not known" %
3785                                    self.op.remote_node)
3786       self.op.remote_node = remote_node
3787       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3788       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3789     else:
3790       self.needed_locks[locking.LEVEL_NODE] = []
3791       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3792
3793   def DeclareLocks(self, level):
3794     # If we're not already locking all nodes in the set we have to declare the
3795     # instance's primary/secondary nodes.
3796     if (level == locking.LEVEL_NODE and
3797         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3798       self._LockInstancesNodes()
3799
3800   def _RunAllocator(self):
3801     """Compute a new secondary node using an IAllocator.
3802
3803     """
3804     ial = IAllocator(self,
3805                      mode=constants.IALLOCATOR_MODE_RELOC,
3806                      name=self.op.instance_name,
3807                      relocate_from=[self.sec_node])
3808
3809     ial.Run(self.op.iallocator)
3810
3811     if not ial.success:
3812       raise errors.OpPrereqError("Can't compute nodes using"
3813                                  " iallocator '%s': %s" % (self.op.iallocator,
3814                                                            ial.info))
3815     if len(ial.nodes) != ial.required_nodes:
3816       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3817                                  " of nodes (%s), required %s" %
3818                                  (len(ial.nodes), ial.required_nodes))
3819     self.op.remote_node = ial.nodes[0]
3820     logger.ToStdout("Selected new secondary for the instance: %s" %
3821                     self.op.remote_node)
3822
3823   def BuildHooksEnv(self):
3824     """Build hooks env.
3825
3826     This runs on the master, the primary and all the secondaries.
3827
3828     """
3829     env = {
3830       "MODE": self.op.mode,
3831       "NEW_SECONDARY": self.op.remote_node,
3832       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3833       }
3834     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3835     nl = [
3836       self.cfg.GetMasterNode(),
3837       self.instance.primary_node,
3838       ]
3839     if self.op.remote_node is not None:
3840       nl.append(self.op.remote_node)
3841     return env, nl, nl
3842
3843   def CheckPrereq(self):
3844     """Check prerequisites.
3845
3846     This checks that the instance is in the cluster.
3847
3848     """
3849     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3850     assert instance is not None, \
3851       "Cannot retrieve locked instance %s" % self.op.instance_name
3852     self.instance = instance
3853
3854     if instance.disk_template not in constants.DTS_NET_MIRROR:
3855       raise errors.OpPrereqError("Instance's disk layout is not"
3856                                  " network mirrored.")
3857
3858     if len(instance.secondary_nodes) != 1:
3859       raise errors.OpPrereqError("The instance has a strange layout,"
3860                                  " expected one secondary but found %d" %
3861                                  len(instance.secondary_nodes))
3862
3863     self.sec_node = instance.secondary_nodes[0]
3864
3865     ia_name = getattr(self.op, "iallocator", None)
3866     if ia_name is not None:
3867       self._RunAllocator()
3868
3869     remote_node = self.op.remote_node
3870     if remote_node is not None:
3871       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3872       assert self.remote_node_info is not None, \
3873         "Cannot retrieve locked node %s" % remote_node
3874     else:
3875       self.remote_node_info = None
3876     if remote_node == instance.primary_node:
3877       raise errors.OpPrereqError("The specified node is the primary node of"
3878                                  " the instance.")
3879     elif remote_node == self.sec_node:
3880       if self.op.mode == constants.REPLACE_DISK_SEC:
3881         # this is for DRBD8, where we can't execute the same mode of
3882         # replacement as for drbd7 (no different port allocated)
3883         raise errors.OpPrereqError("Same secondary given, cannot execute"
3884                                    " replacement")
3885     if instance.disk_template == constants.DT_DRBD8:
3886       if (self.op.mode == constants.REPLACE_DISK_ALL and
3887           remote_node is not None):
3888         # switch to replace secondary mode
3889         self.op.mode = constants.REPLACE_DISK_SEC
3890
3891       if self.op.mode == constants.REPLACE_DISK_ALL:
3892         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3893                                    " secondary disk replacement, not"
3894                                    " both at once")
3895       elif self.op.mode == constants.REPLACE_DISK_PRI:
3896         if remote_node is not None:
3897           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3898                                      " the secondary while doing a primary"
3899                                      " node disk replacement")
3900         self.tgt_node = instance.primary_node
3901         self.oth_node = instance.secondary_nodes[0]
3902       elif self.op.mode == constants.REPLACE_DISK_SEC:
3903         self.new_node = remote_node # this can be None, in which case
3904                                     # we don't change the secondary
3905         self.tgt_node = instance.secondary_nodes[0]
3906         self.oth_node = instance.primary_node
3907       else:
3908         raise errors.ProgrammerError("Unhandled disk replace mode")
3909
3910     for name in self.op.disks:
3911       if instance.FindDisk(name) is None:
3912         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3913                                    (name, instance.name))
3914
3915   def _ExecD8DiskOnly(self, feedback_fn):
3916     """Replace a disk on the primary or secondary for dbrd8.
3917
3918     The algorithm for replace is quite complicated:
3919       - for each disk to be replaced:
3920         - create new LVs on the target node with unique names
3921         - detach old LVs from the drbd device
3922         - rename old LVs to name_replaced.<time_t>
3923         - rename new LVs to old LVs
3924         - attach the new LVs (with the old names now) to the drbd device
3925       - wait for sync across all devices
3926       - for each modified disk:
3927         - remove old LVs (which have the name name_replaces.<time_t>)
3928
3929     Failures are not very well handled.
3930
3931     """
3932     steps_total = 6
3933     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3934     instance = self.instance
3935     iv_names = {}
3936     vgname = self.cfg.GetVGName()
3937     # start of work
3938     cfg = self.cfg
3939     tgt_node = self.tgt_node
3940     oth_node = self.oth_node
3941
3942     # Step: check device activation
3943     self.proc.LogStep(1, steps_total, "check device existence")
3944     info("checking volume groups")
3945     my_vg = cfg.GetVGName()
3946     results = self.rpc.call_vg_list([oth_node, tgt_node])
3947     if not results:
3948       raise errors.OpExecError("Can't list volume groups on the nodes")
3949     for node in oth_node, tgt_node:
3950       res = results.get(node, False)
3951       if not res or my_vg not in res:
3952         raise errors.OpExecError("Volume group '%s' not found on %s" %
3953                                  (my_vg, node))
3954     for dev in instance.disks:
3955       if not dev.iv_name in self.op.disks:
3956         continue
3957       for node in tgt_node, oth_node:
3958         info("checking %s on %s" % (dev.iv_name, node))
3959         cfg.SetDiskID(dev, node)
3960         if not self.rpc.call_blockdev_find(node, dev):
3961           raise errors.OpExecError("Can't find device %s on node %s" %
3962                                    (dev.iv_name, node))
3963
3964     # Step: check other node consistency
3965     self.proc.LogStep(2, steps_total, "check peer consistency")
3966     for dev in instance.disks:
3967       if not dev.iv_name in self.op.disks:
3968         continue
3969       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3970       if not _CheckDiskConsistency(self, dev, oth_node,
3971                                    oth_node==instance.primary_node):
3972         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3973                                  " to replace disks on this node (%s)" %
3974                                  (oth_node, tgt_node))
3975
3976     # Step: create new storage
3977     self.proc.LogStep(3, steps_total, "allocate new storage")
3978     for dev in instance.disks:
3979       if not dev.iv_name in self.op.disks:
3980         continue
3981       size = dev.size
3982       cfg.SetDiskID(dev, tgt_node)
3983       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3984       names = _GenerateUniqueNames(self, lv_names)
3985       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3986                              logical_id=(vgname, names[0]))
3987       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3988                              logical_id=(vgname, names[1]))
3989       new_lvs = [lv_data, lv_meta]
3990       old_lvs = dev.children
3991       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3992       info("creating new local storage on %s for %s" %
3993            (tgt_node, dev.iv_name))
3994       # since we *always* want to create this LV, we use the
3995       # _Create...OnPrimary (which forces the creation), even if we
3996       # are talking about the secondary node
3997       for new_lv in new_lvs:
3998         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
3999                                         _GetInstanceInfoText(instance)):
4000           raise errors.OpExecError("Failed to create new LV named '%s' on"
4001                                    " node '%s'" %
4002                                    (new_lv.logical_id[1], tgt_node))
4003
4004     # Step: for each lv, detach+rename*2+attach
4005     self.proc.LogStep(4, steps_total, "change drbd configuration")
4006     for dev, old_lvs, new_lvs in iv_names.itervalues():
4007       info("detaching %s drbd from local storage" % dev.iv_name)
4008       if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
4009         raise errors.OpExecError("Can't detach drbd from local storage on node"
4010                                  " %s for device %s" % (tgt_node, dev.iv_name))
4011       #dev.children = []
4012       #cfg.Update(instance)
4013
4014       # ok, we created the new LVs, so now we know we have the needed
4015       # storage; as such, we proceed on the target node to rename
4016       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
4017       # using the assumption that logical_id == physical_id (which in
4018       # turn is the unique_id on that node)
4019
4020       # FIXME(iustin): use a better name for the replaced LVs
4021       temp_suffix = int(time.time())
4022       ren_fn = lambda d, suff: (d.physical_id[0],
4023                                 d.physical_id[1] + "_replaced-%s" % suff)
4024       # build the rename list based on what LVs exist on the node
4025       rlist = []
4026       for to_ren in old_lvs:
4027         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
4028         if find_res is not None: # device exists
4029           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
4030
4031       info("renaming the old LVs on the target node")
4032       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4033         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
4034       # now we rename the new LVs to the old LVs
4035       info("renaming the new LVs on the target node")
4036       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
4037       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
4038         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
4039
4040       for old, new in zip(old_lvs, new_lvs):
4041         new.logical_id = old.logical_id
4042         cfg.SetDiskID(new, tgt_node)
4043
4044       for disk in old_lvs:
4045         disk.logical_id = ren_fn(disk, temp_suffix)
4046         cfg.SetDiskID(disk, tgt_node)
4047
4048       # now that the new lvs have the old name, we can add them to the device
4049       info("adding new mirror component on %s" % tgt_node)
4050       if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4051         for new_lv in new_lvs:
4052           if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4053             warning("Can't rollback device %s", hint="manually cleanup unused"
4054                     " logical volumes")
4055         raise errors.OpExecError("Can't add local storage to drbd")
4056
4057       dev.children = new_lvs
4058       cfg.Update(instance)
4059
4060     # Step: wait for sync
4061
4062     # this can fail as the old devices are degraded and _WaitForSync
4063     # does a combined result over all disks, so we don't check its
4064     # return value
4065     self.proc.LogStep(5, steps_total, "sync devices")
4066     _WaitForSync(self, instance, unlock=True)
4067
4068     # so check manually all the devices
4069     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4070       cfg.SetDiskID(dev, instance.primary_node)
4071       is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4072       if is_degr:
4073         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4074
4075     # Step: remove old storage
4076     self.proc.LogStep(6, steps_total, "removing old storage")
4077     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4078       info("remove logical volumes for %s" % name)
4079       for lv in old_lvs:
4080         cfg.SetDiskID(lv, tgt_node)
4081         if not self.rpc.call_blockdev_remove(tgt_node, lv):
4082           warning("Can't remove old LV", hint="manually remove unused LVs")
4083           continue
4084
4085   def _ExecD8Secondary(self, feedback_fn):
4086     """Replace the secondary node for drbd8.
4087
4088     The algorithm for replace is quite complicated:
4089       - for all disks of the instance:
4090         - create new LVs on the new node with same names
4091         - shutdown the drbd device on the old secondary
4092         - disconnect the drbd network on the primary
4093         - create the drbd device on the new secondary
4094         - network attach the drbd on the primary, using an artifice:
4095           the drbd code for Attach() will connect to the network if it
4096           finds a device which is connected to the good local disks but
4097           not network enabled
4098       - wait for sync across all devices
4099       - remove all disks from the old secondary
4100
4101     Failures are not very well handled.
4102
4103     """
4104     steps_total = 6
4105     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4106     instance = self.instance
4107     iv_names = {}
4108     vgname = self.cfg.GetVGName()
4109     # start of work
4110     cfg = self.cfg
4111     old_node = self.tgt_node
4112     new_node = self.new_node
4113     pri_node = instance.primary_node
4114
4115     # Step: check device activation
4116     self.proc.LogStep(1, steps_total, "check device existence")
4117     info("checking volume groups")
4118     my_vg = cfg.GetVGName()
4119     results = self.rpc.call_vg_list([pri_node, new_node])
4120     if not results:
4121       raise errors.OpExecError("Can't list volume groups on the nodes")
4122     for node in pri_node, new_node:
4123       res = results.get(node, False)
4124       if not res or my_vg not in res:
4125         raise errors.OpExecError("Volume group '%s' not found on %s" %
4126                                  (my_vg, node))
4127     for dev in instance.disks:
4128       if not dev.iv_name in self.op.disks:
4129         continue
4130       info("checking %s on %s" % (dev.iv_name, pri_node))
4131       cfg.SetDiskID(dev, pri_node)
4132       if not self.rpc.call_blockdev_find(pri_node, dev):
4133         raise errors.OpExecError("Can't find device %s on node %s" %
4134                                  (dev.iv_name, pri_node))
4135
4136     # Step: check other node consistency
4137     self.proc.LogStep(2, steps_total, "check peer consistency")
4138     for dev in instance.disks:
4139       if not dev.iv_name in self.op.disks:
4140         continue
4141       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4142       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4143         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4144                                  " unsafe to replace the secondary" %
4145                                  pri_node)
4146
4147     # Step: create new storage
4148     self.proc.LogStep(3, steps_total, "allocate new storage")
4149     for dev in instance.disks:
4150       size = dev.size
4151       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4152       # since we *always* want to create this LV, we use the
4153       # _Create...OnPrimary (which forces the creation), even if we
4154       # are talking about the secondary node
4155       for new_lv in dev.children:
4156         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4157                                         _GetInstanceInfoText(instance)):
4158           raise errors.OpExecError("Failed to create new LV named '%s' on"
4159                                    " node '%s'" %
4160                                    (new_lv.logical_id[1], new_node))
4161
4162
4163     # Step 4: dbrd minors and drbd setups changes
4164     # after this, we must manually remove the drbd minors on both the
4165     # error and the success paths
4166     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4167                                    instance.name)
4168     logging.debug("Allocated minors %s" % (minors,))
4169     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4170     for dev, new_minor in zip(instance.disks, minors):
4171       size = dev.size
4172       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4173       # create new devices on new_node
4174       if pri_node == dev.logical_id[0]:
4175         new_logical_id = (pri_node, new_node,
4176                           dev.logical_id[2], dev.logical_id[3], new_minor,
4177                           dev.logical_id[5])
4178       else:
4179         new_logical_id = (new_node, pri_node,
4180                           dev.logical_id[2], new_minor, dev.logical_id[4],
4181                           dev.logical_id[5])
4182       iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4183       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4184                     new_logical_id)
4185       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4186                               logical_id=new_logical_id,
4187                               children=dev.children)
4188       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4189                                         new_drbd, False,
4190                                         _GetInstanceInfoText(instance)):
4191         self.cfg.ReleaseDRBDMinors(instance.name)
4192         raise errors.OpExecError("Failed to create new DRBD on"
4193                                  " node '%s'" % new_node)
4194
4195     for dev in instance.disks:
4196       # we have new devices, shutdown the drbd on the old secondary
4197       info("shutting down drbd for %s on old node" % dev.iv_name)
4198       cfg.SetDiskID(dev, old_node)
4199       if not self.rpc.call_blockdev_shutdown(old_node, dev):
4200         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4201                 hint="Please cleanup this device manually as soon as possible")
4202
4203     info("detaching primary drbds from the network (=> standalone)")
4204     done = 0
4205     for dev in instance.disks:
4206       cfg.SetDiskID(dev, pri_node)
4207       # set the network part of the physical (unique in bdev terms) id
4208       # to None, meaning detach from network
4209       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4210       # and 'find' the device, which will 'fix' it to match the
4211       # standalone state
4212       if self.rpc.call_blockdev_find(pri_node, dev):
4213         done += 1
4214       else:
4215         warning("Failed to detach drbd %s from network, unusual case" %
4216                 dev.iv_name)
4217
4218     if not done:
4219       # no detaches succeeded (very unlikely)
4220       self.cfg.ReleaseDRBDMinors(instance.name)
4221       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4222
4223     # if we managed to detach at least one, we update all the disks of
4224     # the instance to point to the new secondary
4225     info("updating instance configuration")
4226     for dev, _, new_logical_id in iv_names.itervalues():
4227       dev.logical_id = new_logical_id
4228       cfg.SetDiskID(dev, pri_node)
4229     cfg.Update(instance)
4230     # we can remove now the temp minors as now the new values are
4231     # written to the config file (and therefore stable)
4232     self.cfg.ReleaseDRBDMinors(instance.name)
4233
4234     # and now perform the drbd attach
4235     info("attaching primary drbds to new secondary (standalone => connected)")
4236     failures = []
4237     for dev in instance.disks:
4238       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4239       # since the attach is smart, it's enough to 'find' the device,
4240       # it will automatically activate the network, if the physical_id
4241       # is correct
4242       cfg.SetDiskID(dev, pri_node)
4243       logging.debug("Disk to attach: %s", dev)
4244       if not self.rpc.call_blockdev_find(pri_node, dev):
4245         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4246                 "please do a gnt-instance info to see the status of disks")
4247
4248     # this can fail as the old devices are degraded and _WaitForSync
4249     # does a combined result over all disks, so we don't check its
4250     # return value
4251     self.proc.LogStep(5, steps_total, "sync devices")
4252     _WaitForSync(self, instance, unlock=True)
4253
4254     # so check manually all the devices
4255     for name, (dev, old_lvs, _) in iv_names.iteritems():
4256       cfg.SetDiskID(dev, pri_node)
4257       is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4258       if is_degr:
4259         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4260
4261     self.proc.LogStep(6, steps_total, "removing old storage")
4262     for name, (dev, old_lvs, _) in iv_names.iteritems():
4263       info("remove logical volumes for %s" % name)
4264       for lv in old_lvs:
4265         cfg.SetDiskID(lv, old_node)
4266         if not self.rpc.call_blockdev_remove(old_node, lv):
4267           warning("Can't remove LV on old secondary",
4268                   hint="Cleanup stale volumes by hand")
4269
4270   def Exec(self, feedback_fn):
4271     """Execute disk replacement.
4272
4273     This dispatches the disk replacement to the appropriate handler.
4274
4275     """
4276     instance = self.instance
4277
4278     # Activate the instance disks if we're replacing them on a down instance
4279     if instance.status == "down":
4280       _StartInstanceDisks(self, instance, True)
4281
4282     if instance.disk_template == constants.DT_DRBD8:
4283       if self.op.remote_node is None:
4284         fn = self._ExecD8DiskOnly
4285       else:
4286         fn = self._ExecD8Secondary
4287     else:
4288       raise errors.ProgrammerError("Unhandled disk replacement case")
4289
4290     ret = fn(feedback_fn)
4291
4292     # Deactivate the instance disks if we're replacing them on a down instance
4293     if instance.status == "down":
4294       _SafeShutdownInstanceDisks(self, instance)
4295
4296     return ret
4297
4298
4299 class LUGrowDisk(LogicalUnit):
4300   """Grow a disk of an instance.
4301
4302   """
4303   HPATH = "disk-grow"
4304   HTYPE = constants.HTYPE_INSTANCE
4305   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
4306   REQ_BGL = False
4307
4308   def ExpandNames(self):
4309     self._ExpandAndLockInstance()
4310     self.needed_locks[locking.LEVEL_NODE] = []
4311     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4312
4313   def DeclareLocks(self, level):
4314     if level == locking.LEVEL_NODE:
4315       self._LockInstancesNodes()
4316
4317   def BuildHooksEnv(self):
4318     """Build hooks env.
4319
4320     This runs on the master, the primary and all the secondaries.
4321
4322     """
4323     env = {
4324       "DISK": self.op.disk,
4325       "AMOUNT": self.op.amount,
4326       }
4327     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4328     nl = [
4329       self.cfg.GetMasterNode(),
4330       self.instance.primary_node,
4331       ]
4332     return env, nl, nl
4333
4334   def CheckPrereq(self):
4335     """Check prerequisites.
4336
4337     This checks that the instance is in the cluster.
4338
4339     """
4340     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4341     assert instance is not None, \
4342       "Cannot retrieve locked instance %s" % self.op.instance_name
4343
4344     self.instance = instance
4345
4346     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4347       raise errors.OpPrereqError("Instance's disk layout does not support"
4348                                  " growing.")
4349
4350     if instance.FindDisk(self.op.disk) is None:
4351       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4352                                  (self.op.disk, instance.name))
4353
4354     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4355     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4356                                        instance.hypervisor)
4357     for node in nodenames:
4358       info = nodeinfo.get(node, None)
4359       if not info:
4360         raise errors.OpPrereqError("Cannot get current information"
4361                                    " from node '%s'" % node)
4362       vg_free = info.get('vg_free', None)
4363       if not isinstance(vg_free, int):
4364         raise errors.OpPrereqError("Can't compute free disk space on"
4365                                    " node %s" % node)
4366       if self.op.amount > info['vg_free']:
4367         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4368                                    " %d MiB available, %d MiB required" %
4369                                    (node, info['vg_free'], self.op.amount))
4370
4371   def Exec(self, feedback_fn):
4372     """Execute disk grow.
4373
4374     """
4375     instance = self.instance
4376     disk = instance.FindDisk(self.op.disk)
4377     for node in (instance.secondary_nodes + (instance.primary_node,)):
4378       self.cfg.SetDiskID(disk, node)
4379       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4380       if (not result or not isinstance(result, (list, tuple)) or
4381           len(result) != 2):
4382         raise errors.OpExecError("grow request failed to node %s" % node)
4383       elif not result[0]:
4384         raise errors.OpExecError("grow request failed to node %s: %s" %
4385                                  (node, result[1]))
4386     disk.RecordGrow(self.op.amount)
4387     self.cfg.Update(instance)
4388     if self.op.wait_for_sync:
4389       disk_abort = not _WaitForSync(self.cfg, instance, self.proc)
4390       if disk_abort:
4391         logger.Error("Warning: disk sync-ing has not returned a good status.\n"
4392                      " Please check the instance.")
4393
4394
4395 class LUQueryInstanceData(NoHooksLU):
4396   """Query runtime instance data.
4397
4398   """
4399   _OP_REQP = ["instances", "static"]
4400   REQ_BGL = False
4401
4402   def ExpandNames(self):
4403     self.needed_locks = {}
4404     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4405
4406     if not isinstance(self.op.instances, list):
4407       raise errors.OpPrereqError("Invalid argument type 'instances'")
4408
4409     if self.op.instances:
4410       self.wanted_names = []
4411       for name in self.op.instances:
4412         full_name = self.cfg.ExpandInstanceName(name)
4413         if full_name is None:
4414           raise errors.OpPrereqError("Instance '%s' not known" %
4415                                      self.op.instance_name)
4416         self.wanted_names.append(full_name)
4417       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4418     else:
4419       self.wanted_names = None
4420       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4421
4422     self.needed_locks[locking.LEVEL_NODE] = []
4423     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4424
4425   def DeclareLocks(self, level):
4426     if level == locking.LEVEL_NODE:
4427       self._LockInstancesNodes()
4428
4429   def CheckPrereq(self):
4430     """Check prerequisites.
4431
4432     This only checks the optional instance list against the existing names.
4433
4434     """
4435     if self.wanted_names is None:
4436       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4437
4438     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4439                              in self.wanted_names]
4440     return
4441
4442   def _ComputeDiskStatus(self, instance, snode, dev):
4443     """Compute block device status.
4444
4445     """
4446     static = self.op.static
4447     if not static:
4448       self.cfg.SetDiskID(dev, instance.primary_node)
4449       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4450     else:
4451       dev_pstatus = None
4452
4453     if dev.dev_type in constants.LDS_DRBD:
4454       # we change the snode then (otherwise we use the one passed in)
4455       if dev.logical_id[0] == instance.primary_node:
4456         snode = dev.logical_id[1]
4457       else:
4458         snode = dev.logical_id[0]
4459
4460     if snode and not static:
4461       self.cfg.SetDiskID(dev, snode)
4462       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4463     else:
4464       dev_sstatus = None
4465
4466     if dev.children:
4467       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4468                       for child in dev.children]
4469     else:
4470       dev_children = []
4471
4472     data = {
4473       "iv_name": dev.iv_name,
4474       "dev_type": dev.dev_type,
4475       "logical_id": dev.logical_id,
4476       "physical_id": dev.physical_id,
4477       "pstatus": dev_pstatus,
4478       "sstatus": dev_sstatus,
4479       "children": dev_children,
4480       }
4481
4482     return data
4483
4484   def Exec(self, feedback_fn):
4485     """Gather and return data"""
4486     result = {}
4487
4488     cluster = self.cfg.GetClusterInfo()
4489
4490     for instance in self.wanted_instances:
4491       if not self.op.static:
4492         remote_info = self.rpc.call_instance_info(instance.primary_node,
4493                                                   instance.name,
4494                                                   instance.hypervisor)
4495         if remote_info and "state" in remote_info:
4496           remote_state = "up"
4497         else:
4498           remote_state = "down"
4499       else:
4500         remote_state = None
4501       if instance.status == "down":
4502         config_state = "down"
4503       else:
4504         config_state = "up"
4505
4506       disks = [self._ComputeDiskStatus(instance, None, device)
4507                for device in instance.disks]
4508
4509       idict = {
4510         "name": instance.name,
4511         "config_state": config_state,
4512         "run_state": remote_state,
4513         "pnode": instance.primary_node,
4514         "snodes": instance.secondary_nodes,
4515         "os": instance.os,
4516         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4517         "disks": disks,
4518         "hypervisor": instance.hypervisor,
4519         "network_port": instance.network_port,
4520         "hv_instance": instance.hvparams,
4521         "hv_actual": cluster.FillHV(instance),
4522         "be_instance": instance.beparams,
4523         "be_actual": cluster.FillBE(instance),
4524         }
4525
4526       result[instance.name] = idict
4527
4528     return result
4529
4530
4531 class LUSetInstanceParams(LogicalUnit):
4532   """Modifies an instances's parameters.
4533
4534   """
4535   HPATH = "instance-modify"
4536   HTYPE = constants.HTYPE_INSTANCE
4537   _OP_REQP = ["instance_name", "hvparams"]
4538   REQ_BGL = False
4539
4540   def ExpandNames(self):
4541     self._ExpandAndLockInstance()
4542     self.needed_locks[locking.LEVEL_NODE] = []
4543     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4544
4545
4546   def DeclareLocks(self, level):
4547     if level == locking.LEVEL_NODE:
4548       self._LockInstancesNodes()
4549
4550   def BuildHooksEnv(self):
4551     """Build hooks env.
4552
4553     This runs on the master, primary and secondaries.
4554
4555     """
4556     args = dict()
4557     if constants.BE_MEMORY in self.be_new:
4558       args['memory'] = self.be_new[constants.BE_MEMORY]
4559     if constants.BE_VCPUS in self.be_new:
4560       args['vcpus'] = self.be_bnew[constants.BE_VCPUS]
4561     if self.do_ip or self.do_bridge or self.mac:
4562       if self.do_ip:
4563         ip = self.ip
4564       else:
4565         ip = self.instance.nics[0].ip
4566       if self.bridge:
4567         bridge = self.bridge
4568       else:
4569         bridge = self.instance.nics[0].bridge
4570       if self.mac:
4571         mac = self.mac
4572       else:
4573         mac = self.instance.nics[0].mac
4574       args['nics'] = [(ip, bridge, mac)]
4575     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4576     nl = [self.cfg.GetMasterNode(),
4577           self.instance.primary_node] + list(self.instance.secondary_nodes)
4578     return env, nl, nl
4579
4580   def CheckPrereq(self):
4581     """Check prerequisites.
4582
4583     This only checks the instance list against the existing names.
4584
4585     """
4586     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4587     # a separate CheckArguments function, if we implement one, so the operation
4588     # can be aborted without waiting for any lock, should it have an error...
4589     self.ip = getattr(self.op, "ip", None)
4590     self.mac = getattr(self.op, "mac", None)
4591     self.bridge = getattr(self.op, "bridge", None)
4592     self.kernel_path = getattr(self.op, "kernel_path", None)
4593     self.initrd_path = getattr(self.op, "initrd_path", None)
4594     self.force = getattr(self.op, "force", None)
4595     all_parms = [self.ip, self.bridge, self.mac]
4596     if (all_parms.count(None) == len(all_parms) and
4597         not self.op.hvparams and
4598         not self.op.beparams):
4599       raise errors.OpPrereqError("No changes submitted")
4600     for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4601       val = self.op.beparams.get(item, None)
4602       if val is not None:
4603         try:
4604           val = int(val)
4605         except ValueError, err:
4606           raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4607         self.op.beparams[item] = val
4608     if self.ip is not None:
4609       self.do_ip = True
4610       if self.ip.lower() == "none":
4611         self.ip = None
4612       else:
4613         if not utils.IsValidIP(self.ip):
4614           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4615     else:
4616       self.do_ip = False
4617     self.do_bridge = (self.bridge is not None)
4618     if self.mac is not None:
4619       if self.cfg.IsMacInUse(self.mac):
4620         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4621                                    self.mac)
4622       if not utils.IsValidMac(self.mac):
4623         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4624
4625     # checking the new params on the primary/secondary nodes
4626
4627     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4628     assert self.instance is not None, \
4629       "Cannot retrieve locked instance %s" % self.op.instance_name
4630     pnode = self.instance.primary_node
4631     nodelist = [pnode]
4632     nodelist.extend(instance.secondary_nodes)
4633
4634     # hvparams processing
4635     if self.op.hvparams:
4636       i_hvdict = copy.deepcopy(instance.hvparams)
4637       for key, val in self.op.hvparams.iteritems():
4638         if val is None:
4639           try:
4640             del i_hvdict[key]
4641           except KeyError:
4642             pass
4643         else:
4644           i_hvdict[key] = val
4645       cluster = self.cfg.GetClusterInfo()
4646       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4647                                 i_hvdict)
4648       # local check
4649       hypervisor.GetHypervisor(
4650         instance.hypervisor).CheckParameterSyntax(hv_new)
4651       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4652       self.hv_new = hv_new # the new actual values
4653       self.hv_inst = i_hvdict # the new dict (without defaults)
4654     else:
4655       self.hv_new = self.hv_inst = {}
4656
4657     # beparams processing
4658     if self.op.beparams:
4659       i_bedict = copy.deepcopy(instance.beparams)
4660       for key, val in self.op.beparams.iteritems():
4661         if val is None:
4662           try:
4663             del i_bedict[key]
4664           except KeyError:
4665             pass
4666         else:
4667           i_bedict[key] = val
4668       cluster = self.cfg.GetClusterInfo()
4669       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4670                                 i_bedict)
4671       self.be_new = be_new # the new actual values
4672       self.be_inst = i_bedict # the new dict (without defaults)
4673     else:
4674       self.hv_new = self.hv_inst = {}
4675
4676     self.warn = []
4677
4678     if constants.BE_MEMORY in self.op.beparams and not self.force:
4679       mem_check_list = [pnode]
4680       if be_new[constants.BE_AUTO_BALANCE]:
4681         # either we changed auto_balance to yes or it was from before
4682         mem_check_list.extend(instance.secondary_nodes)
4683       instance_info = self.rpc.call_instance_info(pnode, instance.name,
4684                                                   instance.hypervisor)
4685       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4686                                          instance.hypervisor)
4687
4688       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4689         # Assume the primary node is unreachable and go ahead
4690         self.warn.append("Can't get info from primary node %s" % pnode)
4691       else:
4692         if instance_info:
4693           current_mem = instance_info['memory']
4694         else:
4695           # Assume instance not running
4696           # (there is a slight race condition here, but it's not very probable,
4697           # and we have no other way to check)
4698           current_mem = 0
4699         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4700                     nodeinfo[pnode]['memory_free'])
4701         if miss_mem > 0:
4702           raise errors.OpPrereqError("This change will prevent the instance"
4703                                      " from starting, due to %d MB of memory"
4704                                      " missing on its primary node" % miss_mem)
4705
4706       if be_new[constants.BE_AUTO_BALANCE]:
4707         for node in instance.secondary_nodes:
4708           if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4709             self.warn.append("Can't get info from secondary node %s" % node)
4710           elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4711             self.warn.append("Not enough memory to failover instance to"
4712                              " secondary node %s" % node)
4713
4714     return
4715
4716   def Exec(self, feedback_fn):
4717     """Modifies an instance.
4718
4719     All parameters take effect only at the next restart of the instance.
4720     """
4721     # Process here the warnings from CheckPrereq, as we don't have a
4722     # feedback_fn there.
4723     for warn in self.warn:
4724       feedback_fn("WARNING: %s" % warn)
4725
4726     result = []
4727     instance = self.instance
4728     if self.do_ip:
4729       instance.nics[0].ip = self.ip
4730       result.append(("ip", self.ip))
4731     if self.bridge:
4732       instance.nics[0].bridge = self.bridge
4733       result.append(("bridge", self.bridge))
4734     if self.mac:
4735       instance.nics[0].mac = self.mac
4736       result.append(("mac", self.mac))
4737     if self.op.hvparams:
4738       instance.hvparams = self.hv_new
4739       for key, val in self.op.hvparams.iteritems():
4740         result.append(("hv/%s" % key, val))
4741     if self.op.beparams:
4742       instance.beparams = self.be_inst
4743       for key, val in self.op.beparams.iteritems():
4744         result.append(("be/%s" % key, val))
4745
4746     self.cfg.Update(instance)
4747
4748     return result
4749
4750
4751 class LUQueryExports(NoHooksLU):
4752   """Query the exports list
4753
4754   """
4755   _OP_REQP = ['nodes']
4756   REQ_BGL = False
4757
4758   def ExpandNames(self):
4759     self.needed_locks = {}
4760     self.share_locks[locking.LEVEL_NODE] = 1
4761     if not self.op.nodes:
4762       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4763     else:
4764       self.needed_locks[locking.LEVEL_NODE] = \
4765         _GetWantedNodes(self, self.op.nodes)
4766
4767   def CheckPrereq(self):
4768     """Check prerequisites.
4769
4770     """
4771     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4772
4773   def Exec(self, feedback_fn):
4774     """Compute the list of all the exported system images.
4775
4776     Returns:
4777       a dictionary with the structure node->(export-list)
4778       where export-list is a list of the instances exported on
4779       that node.
4780
4781     """
4782     return self.rpc.call_export_list(self.nodes)
4783
4784
4785 class LUExportInstance(LogicalUnit):
4786   """Export an instance to an image in the cluster.
4787
4788   """
4789   HPATH = "instance-export"
4790   HTYPE = constants.HTYPE_INSTANCE
4791   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4792   REQ_BGL = False
4793
4794   def ExpandNames(self):
4795     self._ExpandAndLockInstance()
4796     # FIXME: lock only instance primary and destination node
4797     #
4798     # Sad but true, for now we have do lock all nodes, as we don't know where
4799     # the previous export might be, and and in this LU we search for it and
4800     # remove it from its current node. In the future we could fix this by:
4801     #  - making a tasklet to search (share-lock all), then create the new one,
4802     #    then one to remove, after
4803     #  - removing the removal operation altoghether
4804     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4805
4806   def DeclareLocks(self, level):
4807     """Last minute lock declaration."""
4808     # All nodes are locked anyway, so nothing to do here.
4809
4810   def BuildHooksEnv(self):
4811     """Build hooks env.
4812
4813     This will run on the master, primary node and target node.
4814
4815     """
4816     env = {
4817       "EXPORT_NODE": self.op.target_node,
4818       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4819       }
4820     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4821     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4822           self.op.target_node]
4823     return env, nl, nl
4824
4825   def CheckPrereq(self):
4826     """Check prerequisites.
4827
4828     This checks that the instance and node names are valid.
4829
4830     """
4831     instance_name = self.op.instance_name
4832     self.instance = self.cfg.GetInstanceInfo(instance_name)
4833     assert self.instance is not None, \
4834           "Cannot retrieve locked instance %s" % self.op.instance_name
4835
4836     self.dst_node = self.cfg.GetNodeInfo(
4837       self.cfg.ExpandNodeName(self.op.target_node))
4838
4839     assert self.dst_node is not None, \
4840           "Cannot retrieve locked node %s" % self.op.target_node
4841
4842     # instance disk type verification
4843     for disk in self.instance.disks:
4844       if disk.dev_type == constants.LD_FILE:
4845         raise errors.OpPrereqError("Export not supported for instances with"
4846                                    " file-based disks")
4847
4848   def Exec(self, feedback_fn):
4849     """Export an instance to an image in the cluster.
4850
4851     """
4852     instance = self.instance
4853     dst_node = self.dst_node
4854     src_node = instance.primary_node
4855     if self.op.shutdown:
4856       # shutdown the instance, but not the disks
4857       if not self.rpc.call_instance_shutdown(src_node, instance):
4858         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4859                                  (instance.name, src_node))
4860
4861     vgname = self.cfg.GetVGName()
4862
4863     snap_disks = []
4864
4865     try:
4866       for disk in instance.disks:
4867         if disk.iv_name == "sda":
4868           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4869           new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4870
4871           if not new_dev_name:
4872             logger.Error("could not snapshot block device %s on node %s" %
4873                          (disk.logical_id[1], src_node))
4874           else:
4875             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4876                                       logical_id=(vgname, new_dev_name),
4877                                       physical_id=(vgname, new_dev_name),
4878                                       iv_name=disk.iv_name)
4879             snap_disks.append(new_dev)
4880
4881     finally:
4882       if self.op.shutdown and instance.status == "up":
4883         if not self.rpc.call_instance_start(src_node, instance, None):
4884           _ShutdownInstanceDisks(self, instance)
4885           raise errors.OpExecError("Could not start instance")
4886
4887     # TODO: check for size
4888
4889     cluster_name = self.cfg.GetClusterName()
4890     for dev in snap_disks:
4891       if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4892                                       instance, cluster_name):
4893         logger.Error("could not export block device %s from node %s to node %s"
4894                      % (dev.logical_id[1], src_node, dst_node.name))
4895       if not self.rpc.call_blockdev_remove(src_node, dev):
4896         logger.Error("could not remove snapshot block device %s from node %s" %
4897                      (dev.logical_id[1], src_node))
4898
4899     if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4900       logger.Error("could not finalize export for instance %s on node %s" %
4901                    (instance.name, dst_node.name))
4902
4903     nodelist = self.cfg.GetNodeList()
4904     nodelist.remove(dst_node.name)
4905
4906     # on one-node clusters nodelist will be empty after the removal
4907     # if we proceed the backup would be removed because OpQueryExports
4908     # substitutes an empty list with the full cluster node list.
4909     if nodelist:
4910       exportlist = self.rpc.call_export_list(nodelist)
4911       for node in exportlist:
4912         if instance.name in exportlist[node]:
4913           if not self.rpc.call_export_remove(node, instance.name):
4914             logger.Error("could not remove older export for instance %s"
4915                          " on node %s" % (instance.name, node))
4916
4917
4918 class LURemoveExport(NoHooksLU):
4919   """Remove exports related to the named instance.
4920
4921   """
4922   _OP_REQP = ["instance_name"]
4923   REQ_BGL = False
4924
4925   def ExpandNames(self):
4926     self.needed_locks = {}
4927     # We need all nodes to be locked in order for RemoveExport to work, but we
4928     # don't need to lock the instance itself, as nothing will happen to it (and
4929     # we can remove exports also for a removed instance)
4930     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4931
4932   def CheckPrereq(self):
4933     """Check prerequisites.
4934     """
4935     pass
4936
4937   def Exec(self, feedback_fn):
4938     """Remove any export.
4939
4940     """
4941     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4942     # If the instance was not found we'll try with the name that was passed in.
4943     # This will only work if it was an FQDN, though.
4944     fqdn_warn = False
4945     if not instance_name:
4946       fqdn_warn = True
4947       instance_name = self.op.instance_name
4948
4949     exportlist = self.rpc.call_export_list(self.acquired_locks[
4950       locking.LEVEL_NODE])
4951     found = False
4952     for node in exportlist:
4953       if instance_name in exportlist[node]:
4954         found = True
4955         if not self.rpc.call_export_remove(node, instance_name):
4956           logger.Error("could not remove export for instance %s"
4957                        " on node %s" % (instance_name, node))
4958
4959     if fqdn_warn and not found:
4960       feedback_fn("Export not found. If trying to remove an export belonging"
4961                   " to a deleted instance please use its Fully Qualified"
4962                   " Domain Name.")
4963
4964
4965 class TagsLU(NoHooksLU):
4966   """Generic tags LU.
4967
4968   This is an abstract class which is the parent of all the other tags LUs.
4969
4970   """
4971
4972   def ExpandNames(self):
4973     self.needed_locks = {}
4974     if self.op.kind == constants.TAG_NODE:
4975       name = self.cfg.ExpandNodeName(self.op.name)
4976       if name is None:
4977         raise errors.OpPrereqError("Invalid node name (%s)" %
4978                                    (self.op.name,))
4979       self.op.name = name
4980       self.needed_locks[locking.LEVEL_NODE] = name
4981     elif self.op.kind == constants.TAG_INSTANCE:
4982       name = self.cfg.ExpandInstanceName(self.op.name)
4983       if name is None:
4984         raise errors.OpPrereqError("Invalid instance name (%s)" %
4985                                    (self.op.name,))
4986       self.op.name = name
4987       self.needed_locks[locking.LEVEL_INSTANCE] = name
4988
4989   def CheckPrereq(self):
4990     """Check prerequisites.
4991
4992     """
4993     if self.op.kind == constants.TAG_CLUSTER:
4994       self.target = self.cfg.GetClusterInfo()
4995     elif self.op.kind == constants.TAG_NODE:
4996       self.target = self.cfg.GetNodeInfo(self.op.name)
4997     elif self.op.kind == constants.TAG_INSTANCE:
4998       self.target = self.cfg.GetInstanceInfo(self.op.name)
4999     else:
5000       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
5001                                  str(self.op.kind))
5002
5003
5004 class LUGetTags(TagsLU):
5005   """Returns the tags of a given object.
5006
5007   """
5008   _OP_REQP = ["kind", "name"]
5009   REQ_BGL = False
5010
5011   def Exec(self, feedback_fn):
5012     """Returns the tag list.
5013
5014     """
5015     return list(self.target.GetTags())
5016
5017
5018 class LUSearchTags(NoHooksLU):
5019   """Searches the tags for a given pattern.
5020
5021   """
5022   _OP_REQP = ["pattern"]
5023   REQ_BGL = False
5024
5025   def ExpandNames(self):
5026     self.needed_locks = {}
5027
5028   def CheckPrereq(self):
5029     """Check prerequisites.
5030
5031     This checks the pattern passed for validity by compiling it.
5032
5033     """
5034     try:
5035       self.re = re.compile(self.op.pattern)
5036     except re.error, err:
5037       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
5038                                  (self.op.pattern, err))
5039
5040   def Exec(self, feedback_fn):
5041     """Returns the tag list.
5042
5043     """
5044     cfg = self.cfg
5045     tgts = [("/cluster", cfg.GetClusterInfo())]
5046     ilist = cfg.GetAllInstancesInfo().values()
5047     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
5048     nlist = cfg.GetAllNodesInfo().values()
5049     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
5050     results = []
5051     for path, target in tgts:
5052       for tag in target.GetTags():
5053         if self.re.search(tag):
5054           results.append((path, tag))
5055     return results
5056
5057
5058 class LUAddTags(TagsLU):
5059   """Sets a tag on a given object.
5060
5061   """
5062   _OP_REQP = ["kind", "name", "tags"]
5063   REQ_BGL = False
5064
5065   def CheckPrereq(self):
5066     """Check prerequisites.
5067
5068     This checks the type and length of the tag name and value.
5069
5070     """
5071     TagsLU.CheckPrereq(self)
5072     for tag in self.op.tags:
5073       objects.TaggableObject.ValidateTag(tag)
5074
5075   def Exec(self, feedback_fn):
5076     """Sets the tag.
5077
5078     """
5079     try:
5080       for tag in self.op.tags:
5081         self.target.AddTag(tag)
5082     except errors.TagError, err:
5083       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5084     try:
5085       self.cfg.Update(self.target)
5086     except errors.ConfigurationError:
5087       raise errors.OpRetryError("There has been a modification to the"
5088                                 " config file and the operation has been"
5089                                 " aborted. Please retry.")
5090
5091
5092 class LUDelTags(TagsLU):
5093   """Delete a list of tags from a given object.
5094
5095   """
5096   _OP_REQP = ["kind", "name", "tags"]
5097   REQ_BGL = False
5098
5099   def CheckPrereq(self):
5100     """Check prerequisites.
5101
5102     This checks that we have the given tag.
5103
5104     """
5105     TagsLU.CheckPrereq(self)
5106     for tag in self.op.tags:
5107       objects.TaggableObject.ValidateTag(tag)
5108     del_tags = frozenset(self.op.tags)
5109     cur_tags = self.target.GetTags()
5110     if not del_tags <= cur_tags:
5111       diff_tags = del_tags - cur_tags
5112       diff_names = ["'%s'" % tag for tag in diff_tags]
5113       diff_names.sort()
5114       raise errors.OpPrereqError("Tag(s) %s not found" %
5115                                  (",".join(diff_names)))
5116
5117   def Exec(self, feedback_fn):
5118     """Remove the tag from the object.
5119
5120     """
5121     for tag in self.op.tags:
5122       self.target.RemoveTag(tag)
5123     try:
5124       self.cfg.Update(self.target)
5125     except errors.ConfigurationError:
5126       raise errors.OpRetryError("There has been a modification to the"
5127                                 " config file and the operation has been"
5128                                 " aborted. Please retry.")
5129
5130
5131 class LUTestDelay(NoHooksLU):
5132   """Sleep for a specified amount of time.
5133
5134   This LU sleeps on the master and/or nodes for a specified amount of
5135   time.
5136
5137   """
5138   _OP_REQP = ["duration", "on_master", "on_nodes"]
5139   REQ_BGL = False
5140
5141   def ExpandNames(self):
5142     """Expand names and set required locks.
5143
5144     This expands the node list, if any.
5145
5146     """
5147     self.needed_locks = {}
5148     if self.op.on_nodes:
5149       # _GetWantedNodes can be used here, but is not always appropriate to use
5150       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5151       # more information.
5152       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5153       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5154
5155   def CheckPrereq(self):
5156     """Check prerequisites.
5157
5158     """
5159
5160   def Exec(self, feedback_fn):
5161     """Do the actual sleep.
5162
5163     """
5164     if self.op.on_master:
5165       if not utils.TestDelay(self.op.duration):
5166         raise errors.OpExecError("Error during master delay test")
5167     if self.op.on_nodes:
5168       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5169       if not result:
5170         raise errors.OpExecError("Complete failure from rpc call")
5171       for node, node_result in result.items():
5172         if not node_result:
5173           raise errors.OpExecError("Failure during rpc call to node %s,"
5174                                    " result: %s" % (node, node_result))
5175
5176
5177 class IAllocator(object):
5178   """IAllocator framework.
5179
5180   An IAllocator instance has three sets of attributes:
5181     - cfg that is needed to query the cluster
5182     - input data (all members of the _KEYS class attribute are required)
5183     - four buffer attributes (in|out_data|text), that represent the
5184       input (to the external script) in text and data structure format,
5185       and the output from it, again in two formats
5186     - the result variables from the script (success, info, nodes) for
5187       easy usage
5188
5189   """
5190   _ALLO_KEYS = [
5191     "mem_size", "disks", "disk_template",
5192     "os", "tags", "nics", "vcpus",
5193     ]
5194   _RELO_KEYS = [
5195     "relocate_from",
5196     ]
5197
5198   def __init__(self, lu, mode, name, **kwargs):
5199     self.lu = lu
5200     # init buffer variables
5201     self.in_text = self.out_text = self.in_data = self.out_data = None
5202     # init all input fields so that pylint is happy
5203     self.mode = mode
5204     self.name = name
5205     self.mem_size = self.disks = self.disk_template = None
5206     self.os = self.tags = self.nics = self.vcpus = None
5207     self.relocate_from = None
5208     # computed fields
5209     self.required_nodes = None
5210     # init result fields
5211     self.success = self.info = self.nodes = None
5212     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5213       keyset = self._ALLO_KEYS
5214     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5215       keyset = self._RELO_KEYS
5216     else:
5217       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5218                                    " IAllocator" % self.mode)
5219     for key in kwargs:
5220       if key not in keyset:
5221         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5222                                      " IAllocator" % key)
5223       setattr(self, key, kwargs[key])
5224     for key in keyset:
5225       if key not in kwargs:
5226         raise errors.ProgrammerError("Missing input parameter '%s' to"
5227                                      " IAllocator" % key)
5228     self._BuildInputData()
5229
5230   def _ComputeClusterData(self):
5231     """Compute the generic allocator input data.
5232
5233     This is the data that is independent of the actual operation.
5234
5235     """
5236     cfg = self.lu.cfg
5237     cluster_info = cfg.GetClusterInfo()
5238     # cluster data
5239     data = {
5240       "version": 1,
5241       "cluster_name": cfg.GetClusterName(),
5242       "cluster_tags": list(cluster_info.GetTags()),
5243       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5244       # we don't have job IDs
5245       }
5246
5247     i_list = []
5248     cluster = self.cfg.GetClusterInfo()
5249     for iname in cfg.GetInstanceList():
5250       i_obj = cfg.GetInstanceInfo(iname)
5251       i_list.append((i_obj, cluster.FillBE(i_obj)))
5252
5253     # node data
5254     node_results = {}
5255     node_list = cfg.GetNodeList()
5256     # FIXME: here we have only one hypervisor information, but
5257     # instance can belong to different hypervisors
5258     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5259                                            cfg.GetHypervisorType())
5260     for nname in node_list:
5261       ninfo = cfg.GetNodeInfo(nname)
5262       if nname not in node_data or not isinstance(node_data[nname], dict):
5263         raise errors.OpExecError("Can't get data for node %s" % nname)
5264       remote_info = node_data[nname]
5265       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5266                    'vg_size', 'vg_free', 'cpu_total']:
5267         if attr not in remote_info:
5268           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5269                                    (nname, attr))
5270         try:
5271           remote_info[attr] = int(remote_info[attr])
5272         except ValueError, err:
5273           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5274                                    " %s" % (nname, attr, str(err)))
5275       # compute memory used by primary instances
5276       i_p_mem = i_p_up_mem = 0
5277       for iinfo, beinfo in i_list:
5278         if iinfo.primary_node == nname:
5279           i_p_mem += beinfo[constants.BE_MEMORY]
5280           if iinfo.status == "up":
5281             i_p_up_mem += beinfo[constants.BE_MEMORY]
5282
5283       # compute memory used by instances
5284       pnr = {
5285         "tags": list(ninfo.GetTags()),
5286         "total_memory": remote_info['memory_total'],
5287         "reserved_memory": remote_info['memory_dom0'],
5288         "free_memory": remote_info['memory_free'],
5289         "i_pri_memory": i_p_mem,
5290         "i_pri_up_memory": i_p_up_mem,
5291         "total_disk": remote_info['vg_size'],
5292         "free_disk": remote_info['vg_free'],
5293         "primary_ip": ninfo.primary_ip,
5294         "secondary_ip": ninfo.secondary_ip,
5295         "total_cpus": remote_info['cpu_total'],
5296         }
5297       node_results[nname] = pnr
5298     data["nodes"] = node_results
5299
5300     # instance data
5301     instance_data = {}
5302     for iinfo, beinfo in i_list:
5303       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5304                   for n in iinfo.nics]
5305       pir = {
5306         "tags": list(iinfo.GetTags()),
5307         "should_run": iinfo.status == "up",
5308         "vcpus": beinfo[constants.BE_VCPUS],
5309         "memory": beinfo[constants.BE_MEMORY],
5310         "os": iinfo.os,
5311         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5312         "nics": nic_data,
5313         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5314         "disk_template": iinfo.disk_template,
5315         "hypervisor": iinfo.hypervisor,
5316         }
5317       instance_data[iinfo.name] = pir
5318
5319     data["instances"] = instance_data
5320
5321     self.in_data = data
5322
5323   def _AddNewInstance(self):
5324     """Add new instance data to allocator structure.
5325
5326     This in combination with _AllocatorGetClusterData will create the
5327     correct structure needed as input for the allocator.
5328
5329     The checks for the completeness of the opcode must have already been
5330     done.
5331
5332     """
5333     data = self.in_data
5334     if len(self.disks) != 2:
5335       raise errors.OpExecError("Only two-disk configurations supported")
5336
5337     disk_space = _ComputeDiskSize(self.disk_template,
5338                                   self.disks[0]["size"], self.disks[1]["size"])
5339
5340     if self.disk_template in constants.DTS_NET_MIRROR:
5341       self.required_nodes = 2
5342     else:
5343       self.required_nodes = 1
5344     request = {
5345       "type": "allocate",
5346       "name": self.name,
5347       "disk_template": self.disk_template,
5348       "tags": self.tags,
5349       "os": self.os,
5350       "vcpus": self.vcpus,
5351       "memory": self.mem_size,
5352       "disks": self.disks,
5353       "disk_space_total": disk_space,
5354       "nics": self.nics,
5355       "required_nodes": self.required_nodes,
5356       }
5357     data["request"] = request
5358
5359   def _AddRelocateInstance(self):
5360     """Add relocate instance data to allocator structure.
5361
5362     This in combination with _IAllocatorGetClusterData will create the
5363     correct structure needed as input for the allocator.
5364
5365     The checks for the completeness of the opcode must have already been
5366     done.
5367
5368     """
5369     instance = self.lu.cfg.GetInstanceInfo(self.name)
5370     if instance is None:
5371       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5372                                    " IAllocator" % self.name)
5373
5374     if instance.disk_template not in constants.DTS_NET_MIRROR:
5375       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5376
5377     if len(instance.secondary_nodes) != 1:
5378       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5379
5380     self.required_nodes = 1
5381
5382     disk_space = _ComputeDiskSize(instance.disk_template,
5383                                   instance.disks[0].size,
5384                                   instance.disks[1].size)
5385
5386     request = {
5387       "type": "relocate",
5388       "name": self.name,
5389       "disk_space_total": disk_space,
5390       "required_nodes": self.required_nodes,
5391       "relocate_from": self.relocate_from,
5392       }
5393     self.in_data["request"] = request
5394
5395   def _BuildInputData(self):
5396     """Build input data structures.
5397
5398     """
5399     self._ComputeClusterData()
5400
5401     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5402       self._AddNewInstance()
5403     else:
5404       self._AddRelocateInstance()
5405
5406     self.in_text = serializer.Dump(self.in_data)
5407
5408   def Run(self, name, validate=True, call_fn=None):
5409     """Run an instance allocator and return the results.
5410
5411     """
5412     if call_fn is None:
5413       call_fn = self.lu.rpc.call_iallocator_runner
5414     data = self.in_text
5415
5416     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5417
5418     if not isinstance(result, (list, tuple)) or len(result) != 4:
5419       raise errors.OpExecError("Invalid result from master iallocator runner")
5420
5421     rcode, stdout, stderr, fail = result
5422
5423     if rcode == constants.IARUN_NOTFOUND:
5424       raise errors.OpExecError("Can't find allocator '%s'" % name)
5425     elif rcode == constants.IARUN_FAILURE:
5426       raise errors.OpExecError("Instance allocator call failed: %s,"
5427                                " output: %s" % (fail, stdout+stderr))
5428     self.out_text = stdout
5429     if validate:
5430       self._ValidateResult()
5431
5432   def _ValidateResult(self):
5433     """Process the allocator results.
5434
5435     This will process and if successful save the result in
5436     self.out_data and the other parameters.
5437
5438     """
5439     try:
5440       rdict = serializer.Load(self.out_text)
5441     except Exception, err:
5442       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5443
5444     if not isinstance(rdict, dict):
5445       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5446
5447     for key in "success", "info", "nodes":
5448       if key not in rdict:
5449         raise errors.OpExecError("Can't parse iallocator results:"
5450                                  " missing key '%s'" % key)
5451       setattr(self, key, rdict[key])
5452
5453     if not isinstance(rdict["nodes"], list):
5454       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5455                                " is not a list")
5456     self.out_data = rdict
5457
5458
5459 class LUTestAllocator(NoHooksLU):
5460   """Run allocator tests.
5461
5462   This LU runs the allocator tests
5463
5464   """
5465   _OP_REQP = ["direction", "mode", "name"]
5466
5467   def CheckPrereq(self):
5468     """Check prerequisites.
5469
5470     This checks the opcode parameters depending on the director and mode test.
5471
5472     """
5473     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5474       for attr in ["name", "mem_size", "disks", "disk_template",
5475                    "os", "tags", "nics", "vcpus"]:
5476         if not hasattr(self.op, attr):
5477           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5478                                      attr)
5479       iname = self.cfg.ExpandInstanceName(self.op.name)
5480       if iname is not None:
5481         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5482                                    iname)
5483       if not isinstance(self.op.nics, list):
5484         raise errors.OpPrereqError("Invalid parameter 'nics'")
5485       for row in self.op.nics:
5486         if (not isinstance(row, dict) or
5487             "mac" not in row or
5488             "ip" not in row or
5489             "bridge" not in row):
5490           raise errors.OpPrereqError("Invalid contents of the"
5491                                      " 'nics' parameter")
5492       if not isinstance(self.op.disks, list):
5493         raise errors.OpPrereqError("Invalid parameter 'disks'")
5494       if len(self.op.disks) != 2:
5495         raise errors.OpPrereqError("Only two-disk configurations supported")
5496       for row in self.op.disks:
5497         if (not isinstance(row, dict) or
5498             "size" not in row or
5499             not isinstance(row["size"], int) or
5500             "mode" not in row or
5501             row["mode"] not in ['r', 'w']):
5502           raise errors.OpPrereqError("Invalid contents of the"
5503                                      " 'disks' parameter")
5504     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5505       if not hasattr(self.op, "name"):
5506         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5507       fname = self.cfg.ExpandInstanceName(self.op.name)
5508       if fname is None:
5509         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5510                                    self.op.name)
5511       self.op.name = fname
5512       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5513     else:
5514       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5515                                  self.op.mode)
5516
5517     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5518       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5519         raise errors.OpPrereqError("Missing allocator name")
5520     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5521       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5522                                  self.op.direction)
5523
5524   def Exec(self, feedback_fn):
5525     """Run the allocator test.
5526
5527     """
5528     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5529       ial = IAllocator(self,
5530                        mode=self.op.mode,
5531                        name=self.op.name,
5532                        mem_size=self.op.mem_size,
5533                        disks=self.op.disks,
5534                        disk_template=self.op.disk_template,
5535                        os=self.op.os,
5536                        tags=self.op.tags,
5537                        nics=self.op.nics,
5538                        vcpus=self.op.vcpus,
5539                        )
5540     else:
5541       ial = IAllocator(self,
5542                        mode=self.op.mode,
5543                        name=self.op.name,
5544                        relocate_from=list(self.relocate_from),
5545                        )
5546
5547     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5548       result = ial.in_text
5549     else:
5550       ial.Run(self.op.allocator, validate=False)
5551       result = ial.out_text
5552     return result