Some fixes related to auto_balance
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_MASTER: the LU needs to run on the master node
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_MASTER = True
68   REQ_BGL = True
69
70   def __init__(self, processor, op, context, rpc):
71     """Constructor for LogicalUnit.
72
73     This needs to be overriden in derived classes in order to check op
74     validity.
75
76     """
77     self.proc = processor
78     self.op = op
79     self.cfg = context.cfg
80     self.context = context
81     self.rpc = rpc
82     # Dicts used to declare locking needs to mcpu
83     self.needed_locks = None
84     self.acquired_locks = {}
85     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86     self.add_locks = {}
87     self.remove_locks = {}
88     # Used to force good behavior when calling helper functions
89     self.recalculate_locks = {}
90     self.__ssh = None
91
92     for attr_name in self._OP_REQP:
93       attr_val = getattr(op, attr_name, None)
94       if attr_val is None:
95         raise errors.OpPrereqError("Required parameter '%s' missing" %
96                                    attr_name)
97
98     if not self.cfg.IsCluster():
99       raise errors.OpPrereqError("Cluster not initialized yet,"
100                                  " use 'gnt-cluster init' first.")
101     if self.REQ_MASTER:
102       master = self.cfg.GetMasterNode()
103       if master != utils.HostInfo().name:
104         raise errors.OpPrereqError("Commands must be run on the master"
105                                    " node %s" % master)
106
107   def __GetSSH(self):
108     """Returns the SshRunner object
109
110     """
111     if not self.__ssh:
112       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113     return self.__ssh
114
115   ssh = property(fget=__GetSSH)
116
117   def ExpandNames(self):
118     """Expand names for this LU.
119
120     This method is called before starting to execute the opcode, and it should
121     update all the parameters of the opcode to their canonical form (e.g. a
122     short node name must be fully expanded after this method has successfully
123     completed). This way locking, hooks, logging, ecc. can work correctly.
124
125     LUs which implement this method must also populate the self.needed_locks
126     member, as a dict with lock levels as keys, and a list of needed lock names
127     as values. Rules:
128       - Use an empty dict if you don't need any lock
129       - If you don't need any lock at a particular level omit that level
130       - Don't put anything for the BGL level
131       - If you want all locks at a level use locking.ALL_SET as a value
132
133     If you need to share locks (rather than acquire them exclusively) at one
134     level you can modify self.share_locks, setting a true value (usually 1) for
135     that level. By default locks are not shared.
136
137     Examples:
138     # Acquire all nodes and one instance
139     self.needed_locks = {
140       locking.LEVEL_NODE: locking.ALL_SET,
141       locking.LEVEL_INSTANCE: ['instance1.example.tld'],
142     }
143     # Acquire just two nodes
144     self.needed_locks = {
145       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
146     }
147     # Acquire no locks
148     self.needed_locks = {} # No, you can't leave it to the default value None
149
150     """
151     # The implementation of this method is mandatory only if the new LU is
152     # concurrent, so that old LUs don't need to be changed all at the same
153     # time.
154     if self.REQ_BGL:
155       self.needed_locks = {} # Exclusive LUs don't need locks.
156     else:
157       raise NotImplementedError
158
159   def DeclareLocks(self, level):
160     """Declare LU locking needs for a level
161
162     While most LUs can just declare their locking needs at ExpandNames time,
163     sometimes there's the need to calculate some locks after having acquired
164     the ones before. This function is called just before acquiring locks at a
165     particular level, but after acquiring the ones at lower levels, and permits
166     such calculations. It can be used to modify self.needed_locks, and by
167     default it does nothing.
168
169     This function is only called if you have something already set in
170     self.needed_locks for the level.
171
172     @param level: Locking level which is going to be locked
173     @type level: member of ganeti.locking.LEVELS
174
175     """
176
177   def CheckPrereq(self):
178     """Check prerequisites for this LU.
179
180     This method should check that the prerequisites for the execution
181     of this LU are fulfilled. It can do internode communication, but
182     it should be idempotent - no cluster or system changes are
183     allowed.
184
185     The method should raise errors.OpPrereqError in case something is
186     not fulfilled. Its return value is ignored.
187
188     This method should also update all the parameters of the opcode to
189     their canonical form if it hasn't been done by ExpandNames before.
190
191     """
192     raise NotImplementedError
193
194   def Exec(self, feedback_fn):
195     """Execute the LU.
196
197     This method should implement the actual work. It should raise
198     errors.OpExecError for failures that are somewhat dealt with in
199     code, or expected.
200
201     """
202     raise NotImplementedError
203
204   def BuildHooksEnv(self):
205     """Build hooks environment for this LU.
206
207     This method should return a three-node tuple consisting of: a dict
208     containing the environment that will be used for running the
209     specific hook for this LU, a list of node names on which the hook
210     should run before the execution, and a list of node names on which
211     the hook should run after the execution.
212
213     The keys of the dict must not have 'GANETI_' prefixed as this will
214     be handled in the hooks runner. Also note additional keys will be
215     added by the hooks runner. If the LU doesn't define any
216     environment, an empty dict (and not None) should be returned.
217
218     No nodes should be returned as an empty list (and not None).
219
220     Note that if the HPATH for a LU class is None, this function will
221     not be called.
222
223     """
224     raise NotImplementedError
225
226   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
227     """Notify the LU about the results of its hooks.
228
229     This method is called every time a hooks phase is executed, and notifies
230     the Logical Unit about the hooks' result. The LU can then use it to alter
231     its result based on the hooks.  By default the method does nothing and the
232     previous result is passed back unchanged but any LU can define it if it
233     wants to use the local cluster hook-scripts somehow.
234
235     Args:
236       phase: the hooks phase that has just been run
237       hooks_results: the results of the multi-node hooks rpc call
238       feedback_fn: function to send feedback back to the caller
239       lu_result: the previous result this LU had, or None in the PRE phase.
240
241     """
242     return lu_result
243
244   def _ExpandAndLockInstance(self):
245     """Helper function to expand and lock an instance.
246
247     Many LUs that work on an instance take its name in self.op.instance_name
248     and need to expand it and then declare the expanded name for locking. This
249     function does it, and then updates self.op.instance_name to the expanded
250     name. It also initializes needed_locks as a dict, if this hasn't been done
251     before.
252
253     """
254     if self.needed_locks is None:
255       self.needed_locks = {}
256     else:
257       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
258         "_ExpandAndLockInstance called with instance-level locks set"
259     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
260     if expanded_name is None:
261       raise errors.OpPrereqError("Instance '%s' not known" %
262                                   self.op.instance_name)
263     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
264     self.op.instance_name = expanded_name
265
266   def _LockInstancesNodes(self, primary_only=False):
267     """Helper function to declare instances' nodes for locking.
268
269     This function should be called after locking one or more instances to lock
270     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
271     with all primary or secondary nodes for instances already locked and
272     present in self.needed_locks[locking.LEVEL_INSTANCE].
273
274     It should be called from DeclareLocks, and for safety only works if
275     self.recalculate_locks[locking.LEVEL_NODE] is set.
276
277     In the future it may grow parameters to just lock some instance's nodes, or
278     to just lock primaries or secondary nodes, if needed.
279
280     If should be called in DeclareLocks in a way similar to:
281
282     if level == locking.LEVEL_NODE:
283       self._LockInstancesNodes()
284
285     @type primary_only: boolean
286     @param primary_only: only lock primary nodes of locked instances
287
288     """
289     assert locking.LEVEL_NODE in self.recalculate_locks, \
290       "_LockInstancesNodes helper function called with no nodes to recalculate"
291
292     # TODO: check if we're really been called with the instance locks held
293
294     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
295     # future we might want to have different behaviors depending on the value
296     # of self.recalculate_locks[locking.LEVEL_NODE]
297     wanted_nodes = []
298     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
299       instance = self.context.cfg.GetInstanceInfo(instance_name)
300       wanted_nodes.append(instance.primary_node)
301       if not primary_only:
302         wanted_nodes.extend(instance.secondary_nodes)
303
304     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
305       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
306     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
307       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
308
309     del self.recalculate_locks[locking.LEVEL_NODE]
310
311
312 class NoHooksLU(LogicalUnit):
313   """Simple LU which runs no hooks.
314
315   This LU is intended as a parent for other LogicalUnits which will
316   run no hooks, in order to reduce duplicate code.
317
318   """
319   HPATH = None
320   HTYPE = None
321
322
323 def _GetWantedNodes(lu, nodes):
324   """Returns list of checked and expanded node names.
325
326   Args:
327     nodes: List of nodes (strings) or None for all
328
329   """
330   if not isinstance(nodes, list):
331     raise errors.OpPrereqError("Invalid argument type 'nodes'")
332
333   if not nodes:
334     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
335       " non-empty list of nodes whose name is to be expanded.")
336
337   wanted = []
338   for name in nodes:
339     node = lu.cfg.ExpandNodeName(name)
340     if node is None:
341       raise errors.OpPrereqError("No such node name '%s'" % name)
342     wanted.append(node)
343
344   return utils.NiceSort(wanted)
345
346
347 def _GetWantedInstances(lu, instances):
348   """Returns list of checked and expanded instance names.
349
350   Args:
351     instances: List of instances (strings) or None for all
352
353   """
354   if not isinstance(instances, list):
355     raise errors.OpPrereqError("Invalid argument type 'instances'")
356
357   if instances:
358     wanted = []
359
360     for name in instances:
361       instance = lu.cfg.ExpandInstanceName(name)
362       if instance is None:
363         raise errors.OpPrereqError("No such instance name '%s'" % name)
364       wanted.append(instance)
365
366   else:
367     wanted = lu.cfg.GetInstanceList()
368   return utils.NiceSort(wanted)
369
370
371 def _CheckOutputFields(static, dynamic, selected):
372   """Checks whether all selected fields are valid.
373
374   Args:
375     static: Static fields
376     dynamic: Dynamic fields
377
378   """
379   static_fields = frozenset(static)
380   dynamic_fields = frozenset(dynamic)
381
382   all_fields = static_fields | dynamic_fields
383
384   if not all_fields.issuperset(selected):
385     raise errors.OpPrereqError("Unknown output fields selected: %s"
386                                % ",".join(frozenset(selected).
387                                           difference(all_fields)))
388
389
390 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
391                           memory, vcpus, nics):
392   """Builds instance related env variables for hooks from single variables.
393
394   Args:
395     secondary_nodes: List of secondary nodes as strings
396   """
397   env = {
398     "OP_TARGET": name,
399     "INSTANCE_NAME": name,
400     "INSTANCE_PRIMARY": primary_node,
401     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
402     "INSTANCE_OS_TYPE": os_type,
403     "INSTANCE_STATUS": status,
404     "INSTANCE_MEMORY": memory,
405     "INSTANCE_VCPUS": vcpus,
406   }
407
408   if nics:
409     nic_count = len(nics)
410     for idx, (ip, bridge, mac) in enumerate(nics):
411       if ip is None:
412         ip = ""
413       env["INSTANCE_NIC%d_IP" % idx] = ip
414       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
415       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
416   else:
417     nic_count = 0
418
419   env["INSTANCE_NIC_COUNT"] = nic_count
420
421   return env
422
423
424 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
425   """Builds instance related env variables for hooks from an object.
426
427   Args:
428     instance: objects.Instance object of instance
429     override: dict of values to override
430   """
431   bep = lu.cfg.GetClusterInfo().FillBE(instance)
432   args = {
433     'name': instance.name,
434     'primary_node': instance.primary_node,
435     'secondary_nodes': instance.secondary_nodes,
436     'os_type': instance.os,
437     'status': instance.os,
438     'memory': bep[constants.BE_MEMORY],
439     'vcpus': bep[constants.BE_VCPUS],
440     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
441   }
442   if override:
443     args.update(override)
444   return _BuildInstanceHookEnv(**args)
445
446
447 def _CheckInstanceBridgesExist(lu, instance):
448   """Check that the brigdes needed by an instance exist.
449
450   """
451   # check bridges existance
452   brlist = [nic.bridge for nic in instance.nics]
453   if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
454     raise errors.OpPrereqError("one or more target bridges %s does not"
455                                " exist on destination node '%s'" %
456                                (brlist, instance.primary_node))
457
458
459 class LUDestroyCluster(NoHooksLU):
460   """Logical unit for destroying the cluster.
461
462   """
463   _OP_REQP = []
464
465   def CheckPrereq(self):
466     """Check prerequisites.
467
468     This checks whether the cluster is empty.
469
470     Any errors are signalled by raising errors.OpPrereqError.
471
472     """
473     master = self.cfg.GetMasterNode()
474
475     nodelist = self.cfg.GetNodeList()
476     if len(nodelist) != 1 or nodelist[0] != master:
477       raise errors.OpPrereqError("There are still %d node(s) in"
478                                  " this cluster." % (len(nodelist) - 1))
479     instancelist = self.cfg.GetInstanceList()
480     if instancelist:
481       raise errors.OpPrereqError("There are still %d instance(s) in"
482                                  " this cluster." % len(instancelist))
483
484   def Exec(self, feedback_fn):
485     """Destroys the cluster.
486
487     """
488     master = self.cfg.GetMasterNode()
489     if not self.rpc.call_node_stop_master(master, False):
490       raise errors.OpExecError("Could not disable the master role")
491     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
492     utils.CreateBackup(priv_key)
493     utils.CreateBackup(pub_key)
494     return master
495
496
497 class LUVerifyCluster(LogicalUnit):
498   """Verifies the cluster status.
499
500   """
501   HPATH = "cluster-verify"
502   HTYPE = constants.HTYPE_CLUSTER
503   _OP_REQP = ["skip_checks"]
504   REQ_BGL = False
505
506   def ExpandNames(self):
507     self.needed_locks = {
508       locking.LEVEL_NODE: locking.ALL_SET,
509       locking.LEVEL_INSTANCE: locking.ALL_SET,
510     }
511     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
512
513   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
514                   remote_version, feedback_fn):
515     """Run multiple tests against a node.
516
517     Test list:
518       - compares ganeti version
519       - checks vg existance and size > 20G
520       - checks config file checksum
521       - checks ssh to other nodes
522
523     Args:
524       node: name of the node to check
525       file_list: required list of files
526       local_cksum: dictionary of local files and their checksums
527
528     """
529     # compares ganeti version
530     local_version = constants.PROTOCOL_VERSION
531     if not remote_version:
532       feedback_fn("  - ERROR: connection to %s failed" % (node))
533       return True
534
535     if local_version != remote_version:
536       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
537                       (local_version, node, remote_version))
538       return True
539
540     # checks vg existance and size > 20G
541
542     bad = False
543     if not vglist:
544       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
545                       (node,))
546       bad = True
547     else:
548       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
549                                             constants.MIN_VG_SIZE)
550       if vgstatus:
551         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
552         bad = True
553
554     if not node_result:
555       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
556       return True
557
558     # checks config file checksum
559     # checks ssh to any
560
561     if 'filelist' not in node_result:
562       bad = True
563       feedback_fn("  - ERROR: node hasn't returned file checksum data")
564     else:
565       remote_cksum = node_result['filelist']
566       for file_name in file_list:
567         if file_name not in remote_cksum:
568           bad = True
569           feedback_fn("  - ERROR: file '%s' missing" % file_name)
570         elif remote_cksum[file_name] != local_cksum[file_name]:
571           bad = True
572           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
573
574     if 'nodelist' not in node_result:
575       bad = True
576       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
577     else:
578       if node_result['nodelist']:
579         bad = True
580         for node in node_result['nodelist']:
581           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
582                           (node, node_result['nodelist'][node]))
583     if 'node-net-test' not in node_result:
584       bad = True
585       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
586     else:
587       if node_result['node-net-test']:
588         bad = True
589         nlist = utils.NiceSort(node_result['node-net-test'].keys())
590         for node in nlist:
591           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
592                           (node, node_result['node-net-test'][node]))
593
594     hyp_result = node_result.get('hypervisor', None)
595     if isinstance(hyp_result, dict):
596       for hv_name, hv_result in hyp_result.iteritems():
597         if hv_result is not None:
598           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
599                       (hv_name, hv_result))
600     return bad
601
602   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
603                       node_instance, feedback_fn):
604     """Verify an instance.
605
606     This function checks to see if the required block devices are
607     available on the instance's node.
608
609     """
610     bad = False
611
612     node_current = instanceconfig.primary_node
613
614     node_vol_should = {}
615     instanceconfig.MapLVsByNode(node_vol_should)
616
617     for node in node_vol_should:
618       for volume in node_vol_should[node]:
619         if node not in node_vol_is or volume not in node_vol_is[node]:
620           feedback_fn("  - ERROR: volume %s missing on node %s" %
621                           (volume, node))
622           bad = True
623
624     if not instanceconfig.status == 'down':
625       if (node_current not in node_instance or
626           not instance in node_instance[node_current]):
627         feedback_fn("  - ERROR: instance %s not running on node %s" %
628                         (instance, node_current))
629         bad = True
630
631     for node in node_instance:
632       if (not node == node_current):
633         if instance in node_instance[node]:
634           feedback_fn("  - ERROR: instance %s should not run on node %s" %
635                           (instance, node))
636           bad = True
637
638     return bad
639
640   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
641     """Verify if there are any unknown volumes in the cluster.
642
643     The .os, .swap and backup volumes are ignored. All other volumes are
644     reported as unknown.
645
646     """
647     bad = False
648
649     for node in node_vol_is:
650       for volume in node_vol_is[node]:
651         if node not in node_vol_should or volume not in node_vol_should[node]:
652           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
653                       (volume, node))
654           bad = True
655     return bad
656
657   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
658     """Verify the list of running instances.
659
660     This checks what instances are running but unknown to the cluster.
661
662     """
663     bad = False
664     for node in node_instance:
665       for runninginstance in node_instance[node]:
666         if runninginstance not in instancelist:
667           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
668                           (runninginstance, node))
669           bad = True
670     return bad
671
672   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
673     """Verify N+1 Memory Resilience.
674
675     Check that if one single node dies we can still start all the instances it
676     was primary for.
677
678     """
679     bad = False
680
681     for node, nodeinfo in node_info.iteritems():
682       # This code checks that every node which is now listed as secondary has
683       # enough memory to host all instances it is supposed to should a single
684       # other node in the cluster fail.
685       # FIXME: not ready for failover to an arbitrary node
686       # FIXME: does not support file-backed instances
687       # WARNING: we currently take into account down instances as well as up
688       # ones, considering that even if they're down someone might want to start
689       # them even in the event of a node failure.
690       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
691         needed_mem = 0
692         for instance in instances:
693           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
694           if bep[constants.BE_AUTO_BALANCE]:
695             needed_mem += bep[constants.BE_MEMORY]
696         if nodeinfo['mfree'] < needed_mem:
697           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
698                       " failovers should node %s fail" % (node, prinode))
699           bad = True
700     return bad
701
702   def CheckPrereq(self):
703     """Check prerequisites.
704
705     Transform the list of checks we're going to skip into a set and check that
706     all its members are valid.
707
708     """
709     self.skip_set = frozenset(self.op.skip_checks)
710     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
711       raise errors.OpPrereqError("Invalid checks to be skipped specified")
712
713   def BuildHooksEnv(self):
714     """Build hooks env.
715
716     Cluster-Verify hooks just rone in the post phase and their failure makes
717     the output be logged in the verify output and the verification to fail.
718
719     """
720     all_nodes = self.cfg.GetNodeList()
721     # TODO: populate the environment with useful information for verify hooks
722     env = {}
723     return env, [], all_nodes
724
725   def Exec(self, feedback_fn):
726     """Verify integrity of cluster, performing various test on nodes.
727
728     """
729     bad = False
730     feedback_fn("* Verifying global settings")
731     for msg in self.cfg.VerifyConfig():
732       feedback_fn("  - ERROR: %s" % msg)
733
734     vg_name = self.cfg.GetVGName()
735     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
736     nodelist = utils.NiceSort(self.cfg.GetNodeList())
737     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
738     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
739     i_non_redundant = [] # Non redundant instances
740     i_non_a_balanced = [] # Non auto-balanced instances
741     node_volume = {}
742     node_instance = {}
743     node_info = {}
744     instance_cfg = {}
745
746     # FIXME: verify OS list
747     # do local checksums
748     file_names = []
749     file_names.append(constants.SSL_CERT_FILE)
750     file_names.append(constants.CLUSTER_CONF_FILE)
751     local_checksums = utils.FingerprintFiles(file_names)
752
753     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
754     all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
755     all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
756     all_vglist = self.rpc.call_vg_list(nodelist)
757     node_verify_param = {
758       'filelist': file_names,
759       'nodelist': nodelist,
760       'hypervisor': hypervisors,
761       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
762                         for node in nodeinfo]
763       }
764     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
765                                            self.cfg.GetClusterName())
766     all_rversion = self.rpc.call_version(nodelist)
767     all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
768                                         self.cfg.GetHypervisorType())
769
770     cluster = self.cfg.GetClusterInfo()
771     for node in nodelist:
772       feedback_fn("* Verifying node %s" % node)
773       result = self._VerifyNode(node, file_names, local_checksums,
774                                 all_vglist[node], all_nvinfo[node],
775                                 all_rversion[node], feedback_fn)
776       bad = bad or result
777
778       # node_volume
779       volumeinfo = all_volumeinfo[node]
780
781       if isinstance(volumeinfo, basestring):
782         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
783                     (node, volumeinfo[-400:].encode('string_escape')))
784         bad = True
785         node_volume[node] = {}
786       elif not isinstance(volumeinfo, dict):
787         feedback_fn("  - ERROR: connection to %s failed" % (node,))
788         bad = True
789         continue
790       else:
791         node_volume[node] = volumeinfo
792
793       # node_instance
794       nodeinstance = all_instanceinfo[node]
795       if type(nodeinstance) != list:
796         feedback_fn("  - ERROR: connection to %s failed" % (node,))
797         bad = True
798         continue
799
800       node_instance[node] = nodeinstance
801
802       # node_info
803       nodeinfo = all_ninfo[node]
804       if not isinstance(nodeinfo, dict):
805         feedback_fn("  - ERROR: connection to %s failed" % (node,))
806         bad = True
807         continue
808
809       try:
810         node_info[node] = {
811           "mfree": int(nodeinfo['memory_free']),
812           "dfree": int(nodeinfo['vg_free']),
813           "pinst": [],
814           "sinst": [],
815           # dictionary holding all instances this node is secondary for,
816           # grouped by their primary node. Each key is a cluster node, and each
817           # value is a list of instances which have the key as primary and the
818           # current node as secondary.  this is handy to calculate N+1 memory
819           # availability if you can only failover from a primary to its
820           # secondary.
821           "sinst-by-pnode": {},
822         }
823       except ValueError:
824         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
825         bad = True
826         continue
827
828     node_vol_should = {}
829
830     for instance in instancelist:
831       feedback_fn("* Verifying instance %s" % instance)
832       inst_config = self.cfg.GetInstanceInfo(instance)
833       result =  self._VerifyInstance(instance, inst_config, node_volume,
834                                      node_instance, feedback_fn)
835       bad = bad or result
836
837       inst_config.MapLVsByNode(node_vol_should)
838
839       instance_cfg[instance] = inst_config
840
841       pnode = inst_config.primary_node
842       if pnode in node_info:
843         node_info[pnode]['pinst'].append(instance)
844       else:
845         feedback_fn("  - ERROR: instance %s, connection to primary node"
846                     " %s failed" % (instance, pnode))
847         bad = True
848
849       # If the instance is non-redundant we cannot survive losing its primary
850       # node, so we are not N+1 compliant. On the other hand we have no disk
851       # templates with more than one secondary so that situation is not well
852       # supported either.
853       # FIXME: does not support file-backed instances
854       if len(inst_config.secondary_nodes) == 0:
855         i_non_redundant.append(instance)
856       elif len(inst_config.secondary_nodes) > 1:
857         feedback_fn("  - WARNING: multiple secondaries for instance %s"
858                     % instance)
859
860       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
861         i_non_a_balanced.append(instance)
862
863       for snode in inst_config.secondary_nodes:
864         if snode in node_info:
865           node_info[snode]['sinst'].append(instance)
866           if pnode not in node_info[snode]['sinst-by-pnode']:
867             node_info[snode]['sinst-by-pnode'][pnode] = []
868           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
869         else:
870           feedback_fn("  - ERROR: instance %s, connection to secondary node"
871                       " %s failed" % (instance, snode))
872
873     feedback_fn("* Verifying orphan volumes")
874     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
875                                        feedback_fn)
876     bad = bad or result
877
878     feedback_fn("* Verifying remaining instances")
879     result = self._VerifyOrphanInstances(instancelist, node_instance,
880                                          feedback_fn)
881     bad = bad or result
882
883     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
884       feedback_fn("* Verifying N+1 Memory redundancy")
885       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
886       bad = bad or result
887
888     feedback_fn("* Other Notes")
889     if i_non_redundant:
890       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
891                   % len(i_non_redundant))
892
893     if i_non_a_balanced:
894       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
895                   % len(i_non_a_balanced))
896
897     return not bad
898
899   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
900     """Analize the post-hooks' result, handle it, and send some
901     nicely-formatted feedback back to the user.
902
903     Args:
904       phase: the hooks phase that has just been run
905       hooks_results: the results of the multi-node hooks rpc call
906       feedback_fn: function to send feedback back to the caller
907       lu_result: previous Exec result
908
909     """
910     # We only really run POST phase hooks, and are only interested in
911     # their results
912     if phase == constants.HOOKS_PHASE_POST:
913       # Used to change hooks' output to proper indentation
914       indent_re = re.compile('^', re.M)
915       feedback_fn("* Hooks Results")
916       if not hooks_results:
917         feedback_fn("  - ERROR: general communication failure")
918         lu_result = 1
919       else:
920         for node_name in hooks_results:
921           show_node_header = True
922           res = hooks_results[node_name]
923           if res is False or not isinstance(res, list):
924             feedback_fn("    Communication failure")
925             lu_result = 1
926             continue
927           for script, hkr, output in res:
928             if hkr == constants.HKR_FAIL:
929               # The node header is only shown once, if there are
930               # failing hooks on that node
931               if show_node_header:
932                 feedback_fn("  Node %s:" % node_name)
933                 show_node_header = False
934               feedback_fn("    ERROR: Script %s failed, output:" % script)
935               output = indent_re.sub('      ', output)
936               feedback_fn("%s" % output)
937               lu_result = 1
938
939       return lu_result
940
941
942 class LUVerifyDisks(NoHooksLU):
943   """Verifies the cluster disks status.
944
945   """
946   _OP_REQP = []
947   REQ_BGL = False
948
949   def ExpandNames(self):
950     self.needed_locks = {
951       locking.LEVEL_NODE: locking.ALL_SET,
952       locking.LEVEL_INSTANCE: locking.ALL_SET,
953     }
954     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
955
956   def CheckPrereq(self):
957     """Check prerequisites.
958
959     This has no prerequisites.
960
961     """
962     pass
963
964   def Exec(self, feedback_fn):
965     """Verify integrity of cluster disks.
966
967     """
968     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
969
970     vg_name = self.cfg.GetVGName()
971     nodes = utils.NiceSort(self.cfg.GetNodeList())
972     instances = [self.cfg.GetInstanceInfo(name)
973                  for name in self.cfg.GetInstanceList()]
974
975     nv_dict = {}
976     for inst in instances:
977       inst_lvs = {}
978       if (inst.status != "up" or
979           inst.disk_template not in constants.DTS_NET_MIRROR):
980         continue
981       inst.MapLVsByNode(inst_lvs)
982       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
983       for node, vol_list in inst_lvs.iteritems():
984         for vol in vol_list:
985           nv_dict[(node, vol)] = inst
986
987     if not nv_dict:
988       return result
989
990     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
991
992     to_act = set()
993     for node in nodes:
994       # node_volume
995       lvs = node_lvs[node]
996
997       if isinstance(lvs, basestring):
998         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
999         res_nlvm[node] = lvs
1000       elif not isinstance(lvs, dict):
1001         logger.Info("connection to node %s failed or invalid data returned" %
1002                     (node,))
1003         res_nodes.append(node)
1004         continue
1005
1006       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1007         inst = nv_dict.pop((node, lv_name), None)
1008         if (not lv_online and inst is not None
1009             and inst.name not in res_instances):
1010           res_instances.append(inst.name)
1011
1012     # any leftover items in nv_dict are missing LVs, let's arrange the
1013     # data better
1014     for key, inst in nv_dict.iteritems():
1015       if inst.name not in res_missing:
1016         res_missing[inst.name] = []
1017       res_missing[inst.name].append(key)
1018
1019     return result
1020
1021
1022 class LURenameCluster(LogicalUnit):
1023   """Rename the cluster.
1024
1025   """
1026   HPATH = "cluster-rename"
1027   HTYPE = constants.HTYPE_CLUSTER
1028   _OP_REQP = ["name"]
1029
1030   def BuildHooksEnv(self):
1031     """Build hooks env.
1032
1033     """
1034     env = {
1035       "OP_TARGET": self.cfg.GetClusterName(),
1036       "NEW_NAME": self.op.name,
1037       }
1038     mn = self.cfg.GetMasterNode()
1039     return env, [mn], [mn]
1040
1041   def CheckPrereq(self):
1042     """Verify that the passed name is a valid one.
1043
1044     """
1045     hostname = utils.HostInfo(self.op.name)
1046
1047     new_name = hostname.name
1048     self.ip = new_ip = hostname.ip
1049     old_name = self.cfg.GetClusterName()
1050     old_ip = self.cfg.GetMasterIP()
1051     if new_name == old_name and new_ip == old_ip:
1052       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1053                                  " cluster has changed")
1054     if new_ip != old_ip:
1055       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1056         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1057                                    " reachable on the network. Aborting." %
1058                                    new_ip)
1059
1060     self.op.name = new_name
1061
1062   def Exec(self, feedback_fn):
1063     """Rename the cluster.
1064
1065     """
1066     clustername = self.op.name
1067     ip = self.ip
1068
1069     # shutdown the master IP
1070     master = self.cfg.GetMasterNode()
1071     if not self.rpc.call_node_stop_master(master, False):
1072       raise errors.OpExecError("Could not disable the master role")
1073
1074     try:
1075       # modify the sstore
1076       # TODO: sstore
1077       ss.SetKey(ss.SS_MASTER_IP, ip)
1078       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1079
1080       # Distribute updated ss config to all nodes
1081       myself = self.cfg.GetNodeInfo(master)
1082       dist_nodes = self.cfg.GetNodeList()
1083       if myself.name in dist_nodes:
1084         dist_nodes.remove(myself.name)
1085
1086       logger.Debug("Copying updated ssconf data to all nodes")
1087       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1088         fname = ss.KeyToFilename(keyname)
1089         result = self.rpc.call_upload_file(dist_nodes, fname)
1090         for to_node in dist_nodes:
1091           if not result[to_node]:
1092             logger.Error("copy of file %s to node %s failed" %
1093                          (fname, to_node))
1094     finally:
1095       if not self.rpc.call_node_start_master(master, False):
1096         logger.Error("Could not re-enable the master role on the master,"
1097                      " please restart manually.")
1098
1099
1100 def _RecursiveCheckIfLVMBased(disk):
1101   """Check if the given disk or its children are lvm-based.
1102
1103   Args:
1104     disk: ganeti.objects.Disk object
1105
1106   Returns:
1107     boolean indicating whether a LD_LV dev_type was found or not
1108
1109   """
1110   if disk.children:
1111     for chdisk in disk.children:
1112       if _RecursiveCheckIfLVMBased(chdisk):
1113         return True
1114   return disk.dev_type == constants.LD_LV
1115
1116
1117 class LUSetClusterParams(LogicalUnit):
1118   """Change the parameters of the cluster.
1119
1120   """
1121   HPATH = "cluster-modify"
1122   HTYPE = constants.HTYPE_CLUSTER
1123   _OP_REQP = []
1124   REQ_BGL = False
1125
1126   def ExpandNames(self):
1127     # FIXME: in the future maybe other cluster params won't require checking on
1128     # all nodes to be modified.
1129     self.needed_locks = {
1130       locking.LEVEL_NODE: locking.ALL_SET,
1131     }
1132     self.share_locks[locking.LEVEL_NODE] = 1
1133
1134   def BuildHooksEnv(self):
1135     """Build hooks env.
1136
1137     """
1138     env = {
1139       "OP_TARGET": self.cfg.GetClusterName(),
1140       "NEW_VG_NAME": self.op.vg_name,
1141       }
1142     mn = self.cfg.GetMasterNode()
1143     return env, [mn], [mn]
1144
1145   def CheckPrereq(self):
1146     """Check prerequisites.
1147
1148     This checks whether the given params don't conflict and
1149     if the given volume group is valid.
1150
1151     """
1152     # FIXME: This only works because there is only one parameter that can be
1153     # changed or removed.
1154     if not self.op.vg_name:
1155       instances = self.cfg.GetAllInstancesInfo().values()
1156       for inst in instances:
1157         for disk in inst.disks:
1158           if _RecursiveCheckIfLVMBased(disk):
1159             raise errors.OpPrereqError("Cannot disable lvm storage while"
1160                                        " lvm-based instances exist")
1161
1162     # if vg_name not None, checks given volume group on all nodes
1163     if self.op.vg_name:
1164       node_list = self.acquired_locks[locking.LEVEL_NODE]
1165       vglist = self.rpc.call_vg_list(node_list)
1166       for node in node_list:
1167         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1168                                               constants.MIN_VG_SIZE)
1169         if vgstatus:
1170           raise errors.OpPrereqError("Error on node '%s': %s" %
1171                                      (node, vgstatus))
1172
1173   def Exec(self, feedback_fn):
1174     """Change the parameters of the cluster.
1175
1176     """
1177     if self.op.vg_name != self.cfg.GetVGName():
1178       self.cfg.SetVGName(self.op.vg_name)
1179     else:
1180       feedback_fn("Cluster LVM configuration already in desired"
1181                   " state, not changing")
1182
1183
1184 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1185   """Sleep and poll for an instance's disk to sync.
1186
1187   """
1188   if not instance.disks:
1189     return True
1190
1191   if not oneshot:
1192     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1193
1194   node = instance.primary_node
1195
1196   for dev in instance.disks:
1197     lu.cfg.SetDiskID(dev, node)
1198
1199   retries = 0
1200   while True:
1201     max_time = 0
1202     done = True
1203     cumul_degraded = False
1204     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1205     if not rstats:
1206       lu.proc.LogWarning("Can't get any data from node %s" % node)
1207       retries += 1
1208       if retries >= 10:
1209         raise errors.RemoteError("Can't contact node %s for mirror data,"
1210                                  " aborting." % node)
1211       time.sleep(6)
1212       continue
1213     retries = 0
1214     for i in range(len(rstats)):
1215       mstat = rstats[i]
1216       if mstat is None:
1217         lu.proc.LogWarning("Can't compute data for node %s/%s" %
1218                            (node, instance.disks[i].iv_name))
1219         continue
1220       # we ignore the ldisk parameter
1221       perc_done, est_time, is_degraded, _ = mstat
1222       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1223       if perc_done is not None:
1224         done = False
1225         if est_time is not None:
1226           rem_time = "%d estimated seconds remaining" % est_time
1227           max_time = est_time
1228         else:
1229           rem_time = "no time estimate"
1230         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1231                         (instance.disks[i].iv_name, perc_done, rem_time))
1232     if done or oneshot:
1233       break
1234
1235     time.sleep(min(60, max_time))
1236
1237   if done:
1238     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1239   return not cumul_degraded
1240
1241
1242 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1243   """Check that mirrors are not degraded.
1244
1245   The ldisk parameter, if True, will change the test from the
1246   is_degraded attribute (which represents overall non-ok status for
1247   the device(s)) to the ldisk (representing the local storage status).
1248
1249   """
1250   lu.cfg.SetDiskID(dev, node)
1251   if ldisk:
1252     idx = 6
1253   else:
1254     idx = 5
1255
1256   result = True
1257   if on_primary or dev.AssembleOnSecondary():
1258     rstats = lu.rpc.call_blockdev_find(node, dev)
1259     if not rstats:
1260       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1261       result = False
1262     else:
1263       result = result and (not rstats[idx])
1264   if dev.children:
1265     for child in dev.children:
1266       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1267
1268   return result
1269
1270
1271 class LUDiagnoseOS(NoHooksLU):
1272   """Logical unit for OS diagnose/query.
1273
1274   """
1275   _OP_REQP = ["output_fields", "names"]
1276   REQ_BGL = False
1277
1278   def ExpandNames(self):
1279     if self.op.names:
1280       raise errors.OpPrereqError("Selective OS query not supported")
1281
1282     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1283     _CheckOutputFields(static=[],
1284                        dynamic=self.dynamic_fields,
1285                        selected=self.op.output_fields)
1286
1287     # Lock all nodes, in shared mode
1288     self.needed_locks = {}
1289     self.share_locks[locking.LEVEL_NODE] = 1
1290     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1291
1292   def CheckPrereq(self):
1293     """Check prerequisites.
1294
1295     """
1296
1297   @staticmethod
1298   def _DiagnoseByOS(node_list, rlist):
1299     """Remaps a per-node return list into an a per-os per-node dictionary
1300
1301       Args:
1302         node_list: a list with the names of all nodes
1303         rlist: a map with node names as keys and OS objects as values
1304
1305       Returns:
1306         map: a map with osnames as keys and as value another map, with
1307              nodes as
1308              keys and list of OS objects as values
1309              e.g. {"debian-etch": {"node1": [<object>,...],
1310                                    "node2": [<object>,]}
1311                   }
1312
1313     """
1314     all_os = {}
1315     for node_name, nr in rlist.iteritems():
1316       if not nr:
1317         continue
1318       for os_obj in nr:
1319         if os_obj.name not in all_os:
1320           # build a list of nodes for this os containing empty lists
1321           # for each node in node_list
1322           all_os[os_obj.name] = {}
1323           for nname in node_list:
1324             all_os[os_obj.name][nname] = []
1325         all_os[os_obj.name][node_name].append(os_obj)
1326     return all_os
1327
1328   def Exec(self, feedback_fn):
1329     """Compute the list of OSes.
1330
1331     """
1332     node_list = self.acquired_locks[locking.LEVEL_NODE]
1333     node_data = self.rpc.call_os_diagnose(node_list)
1334     if node_data == False:
1335       raise errors.OpExecError("Can't gather the list of OSes")
1336     pol = self._DiagnoseByOS(node_list, node_data)
1337     output = []
1338     for os_name, os_data in pol.iteritems():
1339       row = []
1340       for field in self.op.output_fields:
1341         if field == "name":
1342           val = os_name
1343         elif field == "valid":
1344           val = utils.all([osl and osl[0] for osl in os_data.values()])
1345         elif field == "node_status":
1346           val = {}
1347           for node_name, nos_list in os_data.iteritems():
1348             val[node_name] = [(v.status, v.path) for v in nos_list]
1349         else:
1350           raise errors.ParameterError(field)
1351         row.append(val)
1352       output.append(row)
1353
1354     return output
1355
1356
1357 class LURemoveNode(LogicalUnit):
1358   """Logical unit for removing a node.
1359
1360   """
1361   HPATH = "node-remove"
1362   HTYPE = constants.HTYPE_NODE
1363   _OP_REQP = ["node_name"]
1364
1365   def BuildHooksEnv(self):
1366     """Build hooks env.
1367
1368     This doesn't run on the target node in the pre phase as a failed
1369     node would then be impossible to remove.
1370
1371     """
1372     env = {
1373       "OP_TARGET": self.op.node_name,
1374       "NODE_NAME": self.op.node_name,
1375       }
1376     all_nodes = self.cfg.GetNodeList()
1377     all_nodes.remove(self.op.node_name)
1378     return env, all_nodes, all_nodes
1379
1380   def CheckPrereq(self):
1381     """Check prerequisites.
1382
1383     This checks:
1384      - the node exists in the configuration
1385      - it does not have primary or secondary instances
1386      - it's not the master
1387
1388     Any errors are signalled by raising errors.OpPrereqError.
1389
1390     """
1391     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1392     if node is None:
1393       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1394
1395     instance_list = self.cfg.GetInstanceList()
1396
1397     masternode = self.cfg.GetMasterNode()
1398     if node.name == masternode:
1399       raise errors.OpPrereqError("Node is the master node,"
1400                                  " you need to failover first.")
1401
1402     for instance_name in instance_list:
1403       instance = self.cfg.GetInstanceInfo(instance_name)
1404       if node.name == instance.primary_node:
1405         raise errors.OpPrereqError("Instance %s still running on the node,"
1406                                    " please remove first." % instance_name)
1407       if node.name in instance.secondary_nodes:
1408         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1409                                    " please remove first." % instance_name)
1410     self.op.node_name = node.name
1411     self.node = node
1412
1413   def Exec(self, feedback_fn):
1414     """Removes the node from the cluster.
1415
1416     """
1417     node = self.node
1418     logger.Info("stopping the node daemon and removing configs from node %s" %
1419                 node.name)
1420
1421     self.context.RemoveNode(node.name)
1422
1423     self.rpc.call_node_leave_cluster(node.name)
1424
1425
1426 class LUQueryNodes(NoHooksLU):
1427   """Logical unit for querying nodes.
1428
1429   """
1430   _OP_REQP = ["output_fields", "names"]
1431   REQ_BGL = False
1432
1433   def ExpandNames(self):
1434     self.dynamic_fields = frozenset([
1435       "dtotal", "dfree",
1436       "mtotal", "mnode", "mfree",
1437       "bootid",
1438       "ctotal",
1439       ])
1440
1441     self.static_fields = frozenset([
1442       "name", "pinst_cnt", "sinst_cnt",
1443       "pinst_list", "sinst_list",
1444       "pip", "sip", "tags",
1445       "serial_no",
1446       ])
1447
1448     _CheckOutputFields(static=self.static_fields,
1449                        dynamic=self.dynamic_fields,
1450                        selected=self.op.output_fields)
1451
1452     self.needed_locks = {}
1453     self.share_locks[locking.LEVEL_NODE] = 1
1454
1455     if self.op.names:
1456       self.wanted = _GetWantedNodes(self, self.op.names)
1457     else:
1458       self.wanted = locking.ALL_SET
1459
1460     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1461     if self.do_locking:
1462       # if we don't request only static fields, we need to lock the nodes
1463       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1464
1465
1466   def CheckPrereq(self):
1467     """Check prerequisites.
1468
1469     """
1470     # The validation of the node list is done in the _GetWantedNodes,
1471     # if non empty, and if empty, there's no validation to do
1472     pass
1473
1474   def Exec(self, feedback_fn):
1475     """Computes the list of nodes and their attributes.
1476
1477     """
1478     all_info = self.cfg.GetAllNodesInfo()
1479     if self.do_locking:
1480       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1481     elif self.wanted != locking.ALL_SET:
1482       nodenames = self.wanted
1483       missing = set(nodenames).difference(all_info.keys())
1484       if missing:
1485         raise errors.OpExecError(
1486           "Some nodes were removed before retrieving their data: %s" % missing)
1487     else:
1488       nodenames = all_info.keys()
1489
1490     nodenames = utils.NiceSort(nodenames)
1491     nodelist = [all_info[name] for name in nodenames]
1492
1493     # begin data gathering
1494
1495     if self.dynamic_fields.intersection(self.op.output_fields):
1496       live_data = {}
1497       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1498                                           self.cfg.GetHypervisorType())
1499       for name in nodenames:
1500         nodeinfo = node_data.get(name, None)
1501         if nodeinfo:
1502           live_data[name] = {
1503             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1504             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1505             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1506             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1507             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1508             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1509             "bootid": nodeinfo['bootid'],
1510             }
1511         else:
1512           live_data[name] = {}
1513     else:
1514       live_data = dict.fromkeys(nodenames, {})
1515
1516     node_to_primary = dict([(name, set()) for name in nodenames])
1517     node_to_secondary = dict([(name, set()) for name in nodenames])
1518
1519     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1520                              "sinst_cnt", "sinst_list"))
1521     if inst_fields & frozenset(self.op.output_fields):
1522       instancelist = self.cfg.GetInstanceList()
1523
1524       for instance_name in instancelist:
1525         inst = self.cfg.GetInstanceInfo(instance_name)
1526         if inst.primary_node in node_to_primary:
1527           node_to_primary[inst.primary_node].add(inst.name)
1528         for secnode in inst.secondary_nodes:
1529           if secnode in node_to_secondary:
1530             node_to_secondary[secnode].add(inst.name)
1531
1532     # end data gathering
1533
1534     output = []
1535     for node in nodelist:
1536       node_output = []
1537       for field in self.op.output_fields:
1538         if field == "name":
1539           val = node.name
1540         elif field == "pinst_list":
1541           val = list(node_to_primary[node.name])
1542         elif field == "sinst_list":
1543           val = list(node_to_secondary[node.name])
1544         elif field == "pinst_cnt":
1545           val = len(node_to_primary[node.name])
1546         elif field == "sinst_cnt":
1547           val = len(node_to_secondary[node.name])
1548         elif field == "pip":
1549           val = node.primary_ip
1550         elif field == "sip":
1551           val = node.secondary_ip
1552         elif field == "tags":
1553           val = list(node.GetTags())
1554         elif field == "serial_no":
1555           val = node.serial_no
1556         elif field in self.dynamic_fields:
1557           val = live_data[node.name].get(field, None)
1558         else:
1559           raise errors.ParameterError(field)
1560         node_output.append(val)
1561       output.append(node_output)
1562
1563     return output
1564
1565
1566 class LUQueryNodeVolumes(NoHooksLU):
1567   """Logical unit for getting volumes on node(s).
1568
1569   """
1570   _OP_REQP = ["nodes", "output_fields"]
1571   REQ_BGL = False
1572
1573   def ExpandNames(self):
1574     _CheckOutputFields(static=["node"],
1575                        dynamic=["phys", "vg", "name", "size", "instance"],
1576                        selected=self.op.output_fields)
1577
1578     self.needed_locks = {}
1579     self.share_locks[locking.LEVEL_NODE] = 1
1580     if not self.op.nodes:
1581       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1582     else:
1583       self.needed_locks[locking.LEVEL_NODE] = \
1584         _GetWantedNodes(self, self.op.nodes)
1585
1586   def CheckPrereq(self):
1587     """Check prerequisites.
1588
1589     This checks that the fields required are valid output fields.
1590
1591     """
1592     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1593
1594   def Exec(self, feedback_fn):
1595     """Computes the list of nodes and their attributes.
1596
1597     """
1598     nodenames = self.nodes
1599     volumes = self.rpc.call_node_volumes(nodenames)
1600
1601     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1602              in self.cfg.GetInstanceList()]
1603
1604     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1605
1606     output = []
1607     for node in nodenames:
1608       if node not in volumes or not volumes[node]:
1609         continue
1610
1611       node_vols = volumes[node][:]
1612       node_vols.sort(key=lambda vol: vol['dev'])
1613
1614       for vol in node_vols:
1615         node_output = []
1616         for field in self.op.output_fields:
1617           if field == "node":
1618             val = node
1619           elif field == "phys":
1620             val = vol['dev']
1621           elif field == "vg":
1622             val = vol['vg']
1623           elif field == "name":
1624             val = vol['name']
1625           elif field == "size":
1626             val = int(float(vol['size']))
1627           elif field == "instance":
1628             for inst in ilist:
1629               if node not in lv_by_node[inst]:
1630                 continue
1631               if vol['name'] in lv_by_node[inst][node]:
1632                 val = inst.name
1633                 break
1634             else:
1635               val = '-'
1636           else:
1637             raise errors.ParameterError(field)
1638           node_output.append(str(val))
1639
1640         output.append(node_output)
1641
1642     return output
1643
1644
1645 class LUAddNode(LogicalUnit):
1646   """Logical unit for adding node to the cluster.
1647
1648   """
1649   HPATH = "node-add"
1650   HTYPE = constants.HTYPE_NODE
1651   _OP_REQP = ["node_name"]
1652
1653   def BuildHooksEnv(self):
1654     """Build hooks env.
1655
1656     This will run on all nodes before, and on all nodes + the new node after.
1657
1658     """
1659     env = {
1660       "OP_TARGET": self.op.node_name,
1661       "NODE_NAME": self.op.node_name,
1662       "NODE_PIP": self.op.primary_ip,
1663       "NODE_SIP": self.op.secondary_ip,
1664       }
1665     nodes_0 = self.cfg.GetNodeList()
1666     nodes_1 = nodes_0 + [self.op.node_name, ]
1667     return env, nodes_0, nodes_1
1668
1669   def CheckPrereq(self):
1670     """Check prerequisites.
1671
1672     This checks:
1673      - the new node is not already in the config
1674      - it is resolvable
1675      - its parameters (single/dual homed) matches the cluster
1676
1677     Any errors are signalled by raising errors.OpPrereqError.
1678
1679     """
1680     node_name = self.op.node_name
1681     cfg = self.cfg
1682
1683     dns_data = utils.HostInfo(node_name)
1684
1685     node = dns_data.name
1686     primary_ip = self.op.primary_ip = dns_data.ip
1687     secondary_ip = getattr(self.op, "secondary_ip", None)
1688     if secondary_ip is None:
1689       secondary_ip = primary_ip
1690     if not utils.IsValidIP(secondary_ip):
1691       raise errors.OpPrereqError("Invalid secondary IP given")
1692     self.op.secondary_ip = secondary_ip
1693
1694     node_list = cfg.GetNodeList()
1695     if not self.op.readd and node in node_list:
1696       raise errors.OpPrereqError("Node %s is already in the configuration" %
1697                                  node)
1698     elif self.op.readd and node not in node_list:
1699       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1700
1701     for existing_node_name in node_list:
1702       existing_node = cfg.GetNodeInfo(existing_node_name)
1703
1704       if self.op.readd and node == existing_node_name:
1705         if (existing_node.primary_ip != primary_ip or
1706             existing_node.secondary_ip != secondary_ip):
1707           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1708                                      " address configuration as before")
1709         continue
1710
1711       if (existing_node.primary_ip == primary_ip or
1712           existing_node.secondary_ip == primary_ip or
1713           existing_node.primary_ip == secondary_ip or
1714           existing_node.secondary_ip == secondary_ip):
1715         raise errors.OpPrereqError("New node ip address(es) conflict with"
1716                                    " existing node %s" % existing_node.name)
1717
1718     # check that the type of the node (single versus dual homed) is the
1719     # same as for the master
1720     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1721     master_singlehomed = myself.secondary_ip == myself.primary_ip
1722     newbie_singlehomed = secondary_ip == primary_ip
1723     if master_singlehomed != newbie_singlehomed:
1724       if master_singlehomed:
1725         raise errors.OpPrereqError("The master has no private ip but the"
1726                                    " new node has one")
1727       else:
1728         raise errors.OpPrereqError("The master has a private ip but the"
1729                                    " new node doesn't have one")
1730
1731     # checks reachablity
1732     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1733       raise errors.OpPrereqError("Node not reachable by ping")
1734
1735     if not newbie_singlehomed:
1736       # check reachability from my secondary ip to newbie's secondary ip
1737       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1738                            source=myself.secondary_ip):
1739         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1740                                    " based ping to noded port")
1741
1742     self.new_node = objects.Node(name=node,
1743                                  primary_ip=primary_ip,
1744                                  secondary_ip=secondary_ip)
1745
1746   def Exec(self, feedback_fn):
1747     """Adds the new node to the cluster.
1748
1749     """
1750     new_node = self.new_node
1751     node = new_node.name
1752
1753     # check connectivity
1754     result = self.rpc.call_version([node])[node]
1755     if result:
1756       if constants.PROTOCOL_VERSION == result:
1757         logger.Info("communication to node %s fine, sw version %s match" %
1758                     (node, result))
1759       else:
1760         raise errors.OpExecError("Version mismatch master version %s,"
1761                                  " node version %s" %
1762                                  (constants.PROTOCOL_VERSION, result))
1763     else:
1764       raise errors.OpExecError("Cannot get version from the new node")
1765
1766     # setup ssh on node
1767     logger.Info("copy ssh key to node %s" % node)
1768     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1769     keyarray = []
1770     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1771                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1772                 priv_key, pub_key]
1773
1774     for i in keyfiles:
1775       f = open(i, 'r')
1776       try:
1777         keyarray.append(f.read())
1778       finally:
1779         f.close()
1780
1781     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1782                                     keyarray[2],
1783                                     keyarray[3], keyarray[4], keyarray[5])
1784
1785     if not result:
1786       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1787
1788     # Add node to our /etc/hosts, and add key to known_hosts
1789     utils.AddHostToEtcHosts(new_node.name)
1790
1791     if new_node.secondary_ip != new_node.primary_ip:
1792       if not self.rpc.call_node_has_ip_address(new_node.name,
1793                                                new_node.secondary_ip):
1794         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1795                                  " you gave (%s). Please fix and re-run this"
1796                                  " command." % new_node.secondary_ip)
1797
1798     node_verify_list = [self.cfg.GetMasterNode()]
1799     node_verify_param = {
1800       'nodelist': [node],
1801       # TODO: do a node-net-test as well?
1802     }
1803
1804     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1805                                        self.cfg.GetClusterName())
1806     for verifier in node_verify_list:
1807       if not result[verifier]:
1808         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1809                                  " for remote verification" % verifier)
1810       if result[verifier]['nodelist']:
1811         for failed in result[verifier]['nodelist']:
1812           feedback_fn("ssh/hostname verification failed %s -> %s" %
1813                       (verifier, result[verifier]['nodelist'][failed]))
1814         raise errors.OpExecError("ssh/hostname verification failed.")
1815
1816     # Distribute updated /etc/hosts and known_hosts to all nodes,
1817     # including the node just added
1818     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1819     dist_nodes = self.cfg.GetNodeList()
1820     if not self.op.readd:
1821       dist_nodes.append(node)
1822     if myself.name in dist_nodes:
1823       dist_nodes.remove(myself.name)
1824
1825     logger.Debug("Copying hosts and known_hosts to all nodes")
1826     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1827       result = self.rpc.call_upload_file(dist_nodes, fname)
1828       for to_node in dist_nodes:
1829         if not result[to_node]:
1830           logger.Error("copy of file %s to node %s failed" %
1831                        (fname, to_node))
1832
1833     to_copy = []
1834     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1835       to_copy.append(constants.VNC_PASSWORD_FILE)
1836     for fname in to_copy:
1837       result = self.rpc.call_upload_file([node], fname)
1838       if not result[node]:
1839         logger.Error("could not copy file %s to node %s" % (fname, node))
1840
1841     if self.op.readd:
1842       self.context.ReaddNode(new_node)
1843     else:
1844       self.context.AddNode(new_node)
1845
1846
1847 class LUQueryClusterInfo(NoHooksLU):
1848   """Query cluster configuration.
1849
1850   """
1851   _OP_REQP = []
1852   REQ_MASTER = False
1853   REQ_BGL = False
1854
1855   def ExpandNames(self):
1856     self.needed_locks = {}
1857
1858   def CheckPrereq(self):
1859     """No prerequsites needed for this LU.
1860
1861     """
1862     pass
1863
1864   def Exec(self, feedback_fn):
1865     """Return cluster config.
1866
1867     """
1868     result = {
1869       "name": self.cfg.GetClusterName(),
1870       "software_version": constants.RELEASE_VERSION,
1871       "protocol_version": constants.PROTOCOL_VERSION,
1872       "config_version": constants.CONFIG_VERSION,
1873       "os_api_version": constants.OS_API_VERSION,
1874       "export_version": constants.EXPORT_VERSION,
1875       "master": self.cfg.GetMasterNode(),
1876       "architecture": (platform.architecture()[0], platform.machine()),
1877       "hypervisor_type": self.cfg.GetHypervisorType(),
1878       "enabled_hypervisors": self.cfg.GetClusterInfo().enabled_hypervisors,
1879       }
1880
1881     return result
1882
1883
1884 class LUQueryConfigValues(NoHooksLU):
1885   """Return configuration values.
1886
1887   """
1888   _OP_REQP = []
1889   REQ_BGL = False
1890
1891   def ExpandNames(self):
1892     self.needed_locks = {}
1893
1894     static_fields = ["cluster_name", "master_node"]
1895     _CheckOutputFields(static=static_fields,
1896                        dynamic=[],
1897                        selected=self.op.output_fields)
1898
1899   def CheckPrereq(self):
1900     """No prerequisites.
1901
1902     """
1903     pass
1904
1905   def Exec(self, feedback_fn):
1906     """Dump a representation of the cluster config to the standard output.
1907
1908     """
1909     values = []
1910     for field in self.op.output_fields:
1911       if field == "cluster_name":
1912         values.append(self.cfg.GetClusterName())
1913       elif field == "master_node":
1914         values.append(self.cfg.GetMasterNode())
1915       else:
1916         raise errors.ParameterError(field)
1917     return values
1918
1919
1920 class LUActivateInstanceDisks(NoHooksLU):
1921   """Bring up an instance's disks.
1922
1923   """
1924   _OP_REQP = ["instance_name"]
1925   REQ_BGL = False
1926
1927   def ExpandNames(self):
1928     self._ExpandAndLockInstance()
1929     self.needed_locks[locking.LEVEL_NODE] = []
1930     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1931
1932   def DeclareLocks(self, level):
1933     if level == locking.LEVEL_NODE:
1934       self._LockInstancesNodes()
1935
1936   def CheckPrereq(self):
1937     """Check prerequisites.
1938
1939     This checks that the instance is in the cluster.
1940
1941     """
1942     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1943     assert self.instance is not None, \
1944       "Cannot retrieve locked instance %s" % self.op.instance_name
1945
1946   def Exec(self, feedback_fn):
1947     """Activate the disks.
1948
1949     """
1950     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
1951     if not disks_ok:
1952       raise errors.OpExecError("Cannot activate block devices")
1953
1954     return disks_info
1955
1956
1957 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
1958   """Prepare the block devices for an instance.
1959
1960   This sets up the block devices on all nodes.
1961
1962   Args:
1963     instance: a ganeti.objects.Instance object
1964     ignore_secondaries: if true, errors on secondary nodes won't result
1965                         in an error return from the function
1966
1967   Returns:
1968     false if the operation failed
1969     list of (host, instance_visible_name, node_visible_name) if the operation
1970          suceeded with the mapping from node devices to instance devices
1971   """
1972   device_info = []
1973   disks_ok = True
1974   iname = instance.name
1975   # With the two passes mechanism we try to reduce the window of
1976   # opportunity for the race condition of switching DRBD to primary
1977   # before handshaking occured, but we do not eliminate it
1978
1979   # The proper fix would be to wait (with some limits) until the
1980   # connection has been made and drbd transitions from WFConnection
1981   # into any other network-connected state (Connected, SyncTarget,
1982   # SyncSource, etc.)
1983
1984   # 1st pass, assemble on all nodes in secondary mode
1985   for inst_disk in instance.disks:
1986     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1987       lu.cfg.SetDiskID(node_disk, node)
1988       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
1989       if not result:
1990         logger.Error("could not prepare block device %s on node %s"
1991                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1992         if not ignore_secondaries:
1993           disks_ok = False
1994
1995   # FIXME: race condition on drbd migration to primary
1996
1997   # 2nd pass, do only the primary node
1998   for inst_disk in instance.disks:
1999     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2000       if node != instance.primary_node:
2001         continue
2002       lu.cfg.SetDiskID(node_disk, node)
2003       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2004       if not result:
2005         logger.Error("could not prepare block device %s on node %s"
2006                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
2007         disks_ok = False
2008     device_info.append((instance.primary_node, inst_disk.iv_name, result))
2009
2010   # leave the disks configured for the primary node
2011   # this is a workaround that would be fixed better by
2012   # improving the logical/physical id handling
2013   for disk in instance.disks:
2014     lu.cfg.SetDiskID(disk, instance.primary_node)
2015
2016   return disks_ok, device_info
2017
2018
2019 def _StartInstanceDisks(lu, instance, force):
2020   """Start the disks of an instance.
2021
2022   """
2023   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2024                                            ignore_secondaries=force)
2025   if not disks_ok:
2026     _ShutdownInstanceDisks(lu, instance)
2027     if force is not None and not force:
2028       logger.Error("If the message above refers to a secondary node,"
2029                    " you can retry the operation using '--force'.")
2030     raise errors.OpExecError("Disk consistency error")
2031
2032
2033 class LUDeactivateInstanceDisks(NoHooksLU):
2034   """Shutdown an instance's disks.
2035
2036   """
2037   _OP_REQP = ["instance_name"]
2038   REQ_BGL = False
2039
2040   def ExpandNames(self):
2041     self._ExpandAndLockInstance()
2042     self.needed_locks[locking.LEVEL_NODE] = []
2043     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2044
2045   def DeclareLocks(self, level):
2046     if level == locking.LEVEL_NODE:
2047       self._LockInstancesNodes()
2048
2049   def CheckPrereq(self):
2050     """Check prerequisites.
2051
2052     This checks that the instance is in the cluster.
2053
2054     """
2055     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2056     assert self.instance is not None, \
2057       "Cannot retrieve locked instance %s" % self.op.instance_name
2058
2059   def Exec(self, feedback_fn):
2060     """Deactivate the disks
2061
2062     """
2063     instance = self.instance
2064     _SafeShutdownInstanceDisks(self, instance)
2065
2066
2067 def _SafeShutdownInstanceDisks(lu, instance):
2068   """Shutdown block devices of an instance.
2069
2070   This function checks if an instance is running, before calling
2071   _ShutdownInstanceDisks.
2072
2073   """
2074   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2075                                       [instance.hypervisor])
2076   ins_l = ins_l[instance.primary_node]
2077   if not type(ins_l) is list:
2078     raise errors.OpExecError("Can't contact node '%s'" %
2079                              instance.primary_node)
2080
2081   if instance.name in ins_l:
2082     raise errors.OpExecError("Instance is running, can't shutdown"
2083                              " block devices.")
2084
2085   _ShutdownInstanceDisks(lu, instance)
2086
2087
2088 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2089   """Shutdown block devices of an instance.
2090
2091   This does the shutdown on all nodes of the instance.
2092
2093   If the ignore_primary is false, errors on the primary node are
2094   ignored.
2095
2096   """
2097   result = True
2098   for disk in instance.disks:
2099     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2100       lu.cfg.SetDiskID(top_disk, node)
2101       if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2102         logger.Error("could not shutdown block device %s on node %s" %
2103                      (disk.iv_name, node))
2104         if not ignore_primary or node != instance.primary_node:
2105           result = False
2106   return result
2107
2108
2109 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2110   """Checks if a node has enough free memory.
2111
2112   This function check if a given node has the needed amount of free
2113   memory. In case the node has less memory or we cannot get the
2114   information from the node, this function raise an OpPrereqError
2115   exception.
2116
2117   @type lu: C{LogicalUnit}
2118   @param lu: a logical unit from which we get configuration data
2119   @type node: C{str}
2120   @param node: the node to check
2121   @type reason: C{str}
2122   @param reason: string to use in the error message
2123   @type requested: C{int}
2124   @param requested: the amount of memory in MiB to check for
2125   @type hypervisor: C{str}
2126   @param hypervisor: the hypervisor to ask for memory stats
2127   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2128       we cannot check the node
2129
2130   """
2131   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2132   if not nodeinfo or not isinstance(nodeinfo, dict):
2133     raise errors.OpPrereqError("Could not contact node %s for resource"
2134                              " information" % (node,))
2135
2136   free_mem = nodeinfo[node].get('memory_free')
2137   if not isinstance(free_mem, int):
2138     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2139                              " was '%s'" % (node, free_mem))
2140   if requested > free_mem:
2141     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2142                              " needed %s MiB, available %s MiB" %
2143                              (node, reason, requested, free_mem))
2144
2145
2146 class LUStartupInstance(LogicalUnit):
2147   """Starts an instance.
2148
2149   """
2150   HPATH = "instance-start"
2151   HTYPE = constants.HTYPE_INSTANCE
2152   _OP_REQP = ["instance_name", "force"]
2153   REQ_BGL = False
2154
2155   def ExpandNames(self):
2156     self._ExpandAndLockInstance()
2157     self.needed_locks[locking.LEVEL_NODE] = []
2158     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2159
2160   def DeclareLocks(self, level):
2161     if level == locking.LEVEL_NODE:
2162       self._LockInstancesNodes()
2163
2164   def BuildHooksEnv(self):
2165     """Build hooks env.
2166
2167     This runs on master, primary and secondary nodes of the instance.
2168
2169     """
2170     env = {
2171       "FORCE": self.op.force,
2172       }
2173     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2174     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2175           list(self.instance.secondary_nodes))
2176     return env, nl, nl
2177
2178   def CheckPrereq(self):
2179     """Check prerequisites.
2180
2181     This checks that the instance is in the cluster.
2182
2183     """
2184     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2185     assert self.instance is not None, \
2186       "Cannot retrieve locked instance %s" % self.op.instance_name
2187
2188     bep = self.cfg.GetClusterInfo().FillBE(instance)
2189     # check bridges existance
2190     _CheckInstanceBridgesExist(self, instance)
2191
2192     _CheckNodeFreeMemory(self, instance.primary_node,
2193                          "starting instance %s" % instance.name,
2194                          bep[constants.BE_MEMORY], instance.hypervisor)
2195
2196   def Exec(self, feedback_fn):
2197     """Start the instance.
2198
2199     """
2200     instance = self.instance
2201     force = self.op.force
2202     extra_args = getattr(self.op, "extra_args", "")
2203
2204     self.cfg.MarkInstanceUp(instance.name)
2205
2206     node_current = instance.primary_node
2207
2208     _StartInstanceDisks(self, instance, force)
2209
2210     if not self.rpc.call_instance_start(node_current, instance, extra_args):
2211       _ShutdownInstanceDisks(self, instance)
2212       raise errors.OpExecError("Could not start instance")
2213
2214
2215 class LURebootInstance(LogicalUnit):
2216   """Reboot an instance.
2217
2218   """
2219   HPATH = "instance-reboot"
2220   HTYPE = constants.HTYPE_INSTANCE
2221   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2222   REQ_BGL = False
2223
2224   def ExpandNames(self):
2225     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2226                                    constants.INSTANCE_REBOOT_HARD,
2227                                    constants.INSTANCE_REBOOT_FULL]:
2228       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2229                                   (constants.INSTANCE_REBOOT_SOFT,
2230                                    constants.INSTANCE_REBOOT_HARD,
2231                                    constants.INSTANCE_REBOOT_FULL))
2232     self._ExpandAndLockInstance()
2233     self.needed_locks[locking.LEVEL_NODE] = []
2234     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2235
2236   def DeclareLocks(self, level):
2237     if level == locking.LEVEL_NODE:
2238       primary_only = not constants.INSTANCE_REBOOT_FULL
2239       self._LockInstancesNodes(primary_only=primary_only)
2240
2241   def BuildHooksEnv(self):
2242     """Build hooks env.
2243
2244     This runs on master, primary and secondary nodes of the instance.
2245
2246     """
2247     env = {
2248       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2249       }
2250     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2251     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2252           list(self.instance.secondary_nodes))
2253     return env, nl, nl
2254
2255   def CheckPrereq(self):
2256     """Check prerequisites.
2257
2258     This checks that the instance is in the cluster.
2259
2260     """
2261     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2262     assert self.instance is not None, \
2263       "Cannot retrieve locked instance %s" % self.op.instance_name
2264
2265     # check bridges existance
2266     _CheckInstanceBridgesExist(self, instance)
2267
2268   def Exec(self, feedback_fn):
2269     """Reboot the instance.
2270
2271     """
2272     instance = self.instance
2273     ignore_secondaries = self.op.ignore_secondaries
2274     reboot_type = self.op.reboot_type
2275     extra_args = getattr(self.op, "extra_args", "")
2276
2277     node_current = instance.primary_node
2278
2279     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2280                        constants.INSTANCE_REBOOT_HARD]:
2281       if not self.rpc.call_instance_reboot(node_current, instance,
2282                                            reboot_type, extra_args):
2283         raise errors.OpExecError("Could not reboot instance")
2284     else:
2285       if not self.rpc.call_instance_shutdown(node_current, instance):
2286         raise errors.OpExecError("could not shutdown instance for full reboot")
2287       _ShutdownInstanceDisks(self, instance)
2288       _StartInstanceDisks(self, instance, ignore_secondaries)
2289       if not self.rpc.call_instance_start(node_current, instance, extra_args):
2290         _ShutdownInstanceDisks(self, instance)
2291         raise errors.OpExecError("Could not start instance for full reboot")
2292
2293     self.cfg.MarkInstanceUp(instance.name)
2294
2295
2296 class LUShutdownInstance(LogicalUnit):
2297   """Shutdown an instance.
2298
2299   """
2300   HPATH = "instance-stop"
2301   HTYPE = constants.HTYPE_INSTANCE
2302   _OP_REQP = ["instance_name"]
2303   REQ_BGL = False
2304
2305   def ExpandNames(self):
2306     self._ExpandAndLockInstance()
2307     self.needed_locks[locking.LEVEL_NODE] = []
2308     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2309
2310   def DeclareLocks(self, level):
2311     if level == locking.LEVEL_NODE:
2312       self._LockInstancesNodes()
2313
2314   def BuildHooksEnv(self):
2315     """Build hooks env.
2316
2317     This runs on master, primary and secondary nodes of the instance.
2318
2319     """
2320     env = _BuildInstanceHookEnvByObject(self, self.instance)
2321     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2322           list(self.instance.secondary_nodes))
2323     return env, nl, nl
2324
2325   def CheckPrereq(self):
2326     """Check prerequisites.
2327
2328     This checks that the instance is in the cluster.
2329
2330     """
2331     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2332     assert self.instance is not None, \
2333       "Cannot retrieve locked instance %s" % self.op.instance_name
2334
2335   def Exec(self, feedback_fn):
2336     """Shutdown the instance.
2337
2338     """
2339     instance = self.instance
2340     node_current = instance.primary_node
2341     self.cfg.MarkInstanceDown(instance.name)
2342     if not self.rpc.call_instance_shutdown(node_current, instance):
2343       logger.Error("could not shutdown instance")
2344
2345     _ShutdownInstanceDisks(self, instance)
2346
2347
2348 class LUReinstallInstance(LogicalUnit):
2349   """Reinstall an instance.
2350
2351   """
2352   HPATH = "instance-reinstall"
2353   HTYPE = constants.HTYPE_INSTANCE
2354   _OP_REQP = ["instance_name"]
2355   REQ_BGL = False
2356
2357   def ExpandNames(self):
2358     self._ExpandAndLockInstance()
2359     self.needed_locks[locking.LEVEL_NODE] = []
2360     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2361
2362   def DeclareLocks(self, level):
2363     if level == locking.LEVEL_NODE:
2364       self._LockInstancesNodes()
2365
2366   def BuildHooksEnv(self):
2367     """Build hooks env.
2368
2369     This runs on master, primary and secondary nodes of the instance.
2370
2371     """
2372     env = _BuildInstanceHookEnvByObject(self, self.instance)
2373     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2374           list(self.instance.secondary_nodes))
2375     return env, nl, nl
2376
2377   def CheckPrereq(self):
2378     """Check prerequisites.
2379
2380     This checks that the instance is in the cluster and is not running.
2381
2382     """
2383     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2384     assert instance is not None, \
2385       "Cannot retrieve locked instance %s" % self.op.instance_name
2386
2387     if instance.disk_template == constants.DT_DISKLESS:
2388       raise errors.OpPrereqError("Instance '%s' has no disks" %
2389                                  self.op.instance_name)
2390     if instance.status != "down":
2391       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2392                                  self.op.instance_name)
2393     remote_info = self.rpc.call_instance_info(instance.primary_node,
2394                                               instance.name,
2395                                               instance.hypervisor)
2396     if remote_info:
2397       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2398                                  (self.op.instance_name,
2399                                   instance.primary_node))
2400
2401     self.op.os_type = getattr(self.op, "os_type", None)
2402     if self.op.os_type is not None:
2403       # OS verification
2404       pnode = self.cfg.GetNodeInfo(
2405         self.cfg.ExpandNodeName(instance.primary_node))
2406       if pnode is None:
2407         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2408                                    self.op.pnode)
2409       os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2410       if not os_obj:
2411         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2412                                    " primary node"  % self.op.os_type)
2413
2414     self.instance = instance
2415
2416   def Exec(self, feedback_fn):
2417     """Reinstall the instance.
2418
2419     """
2420     inst = self.instance
2421
2422     if self.op.os_type is not None:
2423       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2424       inst.os = self.op.os_type
2425       self.cfg.Update(inst)
2426
2427     _StartInstanceDisks(self, inst, None)
2428     try:
2429       feedback_fn("Running the instance OS create scripts...")
2430       if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2431                                            "sda", "sdb"):
2432         raise errors.OpExecError("Could not install OS for instance %s"
2433                                  " on node %s" %
2434                                  (inst.name, inst.primary_node))
2435     finally:
2436       _ShutdownInstanceDisks(self, inst)
2437
2438
2439 class LURenameInstance(LogicalUnit):
2440   """Rename an instance.
2441
2442   """
2443   HPATH = "instance-rename"
2444   HTYPE = constants.HTYPE_INSTANCE
2445   _OP_REQP = ["instance_name", "new_name"]
2446
2447   def BuildHooksEnv(self):
2448     """Build hooks env.
2449
2450     This runs on master, primary and secondary nodes of the instance.
2451
2452     """
2453     env = _BuildInstanceHookEnvByObject(self, self.instance)
2454     env["INSTANCE_NEW_NAME"] = self.op.new_name
2455     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2456           list(self.instance.secondary_nodes))
2457     return env, nl, nl
2458
2459   def CheckPrereq(self):
2460     """Check prerequisites.
2461
2462     This checks that the instance is in the cluster and is not running.
2463
2464     """
2465     instance = self.cfg.GetInstanceInfo(
2466       self.cfg.ExpandInstanceName(self.op.instance_name))
2467     if instance is None:
2468       raise errors.OpPrereqError("Instance '%s' not known" %
2469                                  self.op.instance_name)
2470     if instance.status != "down":
2471       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2472                                  self.op.instance_name)
2473     remote_info = self.rpc.call_instance_info(instance.primary_node,
2474                                               instance.name,
2475                                               instance.hypervisor)
2476     if remote_info:
2477       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2478                                  (self.op.instance_name,
2479                                   instance.primary_node))
2480     self.instance = instance
2481
2482     # new name verification
2483     name_info = utils.HostInfo(self.op.new_name)
2484
2485     self.op.new_name = new_name = name_info.name
2486     instance_list = self.cfg.GetInstanceList()
2487     if new_name in instance_list:
2488       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2489                                  new_name)
2490
2491     if not getattr(self.op, "ignore_ip", False):
2492       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2493         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2494                                    (name_info.ip, new_name))
2495
2496
2497   def Exec(self, feedback_fn):
2498     """Reinstall the instance.
2499
2500     """
2501     inst = self.instance
2502     old_name = inst.name
2503
2504     if inst.disk_template == constants.DT_FILE:
2505       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2506
2507     self.cfg.RenameInstance(inst.name, self.op.new_name)
2508     # Change the instance lock. This is definitely safe while we hold the BGL
2509     self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2510     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2511
2512     # re-read the instance from the configuration after rename
2513     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2514
2515     if inst.disk_template == constants.DT_FILE:
2516       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2517       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2518                                                      old_file_storage_dir,
2519                                                      new_file_storage_dir)
2520
2521       if not result:
2522         raise errors.OpExecError("Could not connect to node '%s' to rename"
2523                                  " directory '%s' to '%s' (but the instance"
2524                                  " has been renamed in Ganeti)" % (
2525                                  inst.primary_node, old_file_storage_dir,
2526                                  new_file_storage_dir))
2527
2528       if not result[0]:
2529         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2530                                  " (but the instance has been renamed in"
2531                                  " Ganeti)" % (old_file_storage_dir,
2532                                                new_file_storage_dir))
2533
2534     _StartInstanceDisks(self, inst, None)
2535     try:
2536       if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2537                                                old_name,
2538                                                "sda", "sdb"):
2539         msg = ("Could not run OS rename script for instance %s on node %s"
2540                " (but the instance has been renamed in Ganeti)" %
2541                (inst.name, inst.primary_node))
2542         logger.Error(msg)
2543     finally:
2544       _ShutdownInstanceDisks(self, inst)
2545
2546
2547 class LURemoveInstance(LogicalUnit):
2548   """Remove an instance.
2549
2550   """
2551   HPATH = "instance-remove"
2552   HTYPE = constants.HTYPE_INSTANCE
2553   _OP_REQP = ["instance_name", "ignore_failures"]
2554   REQ_BGL = False
2555
2556   def ExpandNames(self):
2557     self._ExpandAndLockInstance()
2558     self.needed_locks[locking.LEVEL_NODE] = []
2559     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2560
2561   def DeclareLocks(self, level):
2562     if level == locking.LEVEL_NODE:
2563       self._LockInstancesNodes()
2564
2565   def BuildHooksEnv(self):
2566     """Build hooks env.
2567
2568     This runs on master, primary and secondary nodes of the instance.
2569
2570     """
2571     env = _BuildInstanceHookEnvByObject(self, self.instance)
2572     nl = [self.cfg.GetMasterNode()]
2573     return env, nl, nl
2574
2575   def CheckPrereq(self):
2576     """Check prerequisites.
2577
2578     This checks that the instance is in the cluster.
2579
2580     """
2581     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2582     assert self.instance is not None, \
2583       "Cannot retrieve locked instance %s" % self.op.instance_name
2584
2585   def Exec(self, feedback_fn):
2586     """Remove the instance.
2587
2588     """
2589     instance = self.instance
2590     logger.Info("shutting down instance %s on node %s" %
2591                 (instance.name, instance.primary_node))
2592
2593     if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2594       if self.op.ignore_failures:
2595         feedback_fn("Warning: can't shutdown instance")
2596       else:
2597         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2598                                  (instance.name, instance.primary_node))
2599
2600     logger.Info("removing block devices for instance %s" % instance.name)
2601
2602     if not _RemoveDisks(self, instance):
2603       if self.op.ignore_failures:
2604         feedback_fn("Warning: can't remove instance's disks")
2605       else:
2606         raise errors.OpExecError("Can't remove instance's disks")
2607
2608     logger.Info("removing instance %s out of cluster config" % instance.name)
2609
2610     self.cfg.RemoveInstance(instance.name)
2611     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2612
2613
2614 class LUQueryInstances(NoHooksLU):
2615   """Logical unit for querying instances.
2616
2617   """
2618   _OP_REQP = ["output_fields", "names"]
2619   REQ_BGL = False
2620
2621   def ExpandNames(self):
2622     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2623     hvp = ["hv/%s" % name for name in constants.HVS_PARAMETERS]
2624     bep = ["be/%s" % name for name in constants.BES_PARAMETERS]
2625     self.static_fields = frozenset([
2626       "name", "os", "pnode", "snodes",
2627       "admin_state", "admin_ram",
2628       "disk_template", "ip", "mac", "bridge",
2629       "sda_size", "sdb_size", "vcpus", "tags",
2630       "network_port",
2631       "serial_no", "hypervisor", "hvparams",
2632       ] + hvp + bep)
2633
2634     _CheckOutputFields(static=self.static_fields,
2635                        dynamic=self.dynamic_fields,
2636                        selected=self.op.output_fields)
2637
2638     self.needed_locks = {}
2639     self.share_locks[locking.LEVEL_INSTANCE] = 1
2640     self.share_locks[locking.LEVEL_NODE] = 1
2641
2642     if self.op.names:
2643       self.wanted = _GetWantedInstances(self, self.op.names)
2644     else:
2645       self.wanted = locking.ALL_SET
2646
2647     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2648     if self.do_locking:
2649       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2650       self.needed_locks[locking.LEVEL_NODE] = []
2651       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2652
2653   def DeclareLocks(self, level):
2654     if level == locking.LEVEL_NODE and self.do_locking:
2655       self._LockInstancesNodes()
2656
2657   def CheckPrereq(self):
2658     """Check prerequisites.
2659
2660     """
2661     pass
2662
2663   def Exec(self, feedback_fn):
2664     """Computes the list of nodes and their attributes.
2665
2666     """
2667     all_info = self.cfg.GetAllInstancesInfo()
2668     if self.do_locking:
2669       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2670     elif self.wanted != locking.ALL_SET:
2671       instance_names = self.wanted
2672       missing = set(instance_names).difference(all_info.keys())
2673       if missing:
2674         raise errors.OpExecError(
2675           "Some instances were removed before retrieving their data: %s"
2676           % missing)
2677     else:
2678       instance_names = all_info.keys()
2679
2680     instance_names = utils.NiceSort(instance_names)
2681     instance_list = [all_info[iname] for iname in instance_names]
2682
2683     # begin data gathering
2684
2685     nodes = frozenset([inst.primary_node for inst in instance_list])
2686     hv_list = list(set([inst.hypervisor for inst in instance_list]))
2687
2688     bad_nodes = []
2689     if self.dynamic_fields.intersection(self.op.output_fields):
2690       live_data = {}
2691       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2692       for name in nodes:
2693         result = node_data[name]
2694         if result:
2695           live_data.update(result)
2696         elif result == False:
2697           bad_nodes.append(name)
2698         # else no instance is alive
2699     else:
2700       live_data = dict([(name, {}) for name in instance_names])
2701
2702     # end data gathering
2703
2704     HVPREFIX = "hv/"
2705     BEPREFIX = "be/"
2706     output = []
2707     for instance in instance_list:
2708       iout = []
2709       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2710       i_be = self.cfg.GetClusterInfo().FillBE(instance)
2711       for field in self.op.output_fields:
2712         if field == "name":
2713           val = instance.name
2714         elif field == "os":
2715           val = instance.os
2716         elif field == "pnode":
2717           val = instance.primary_node
2718         elif field == "snodes":
2719           val = list(instance.secondary_nodes)
2720         elif field == "admin_state":
2721           val = (instance.status != "down")
2722         elif field == "oper_state":
2723           if instance.primary_node in bad_nodes:
2724             val = None
2725           else:
2726             val = bool(live_data.get(instance.name))
2727         elif field == "status":
2728           if instance.primary_node in bad_nodes:
2729             val = "ERROR_nodedown"
2730           else:
2731             running = bool(live_data.get(instance.name))
2732             if running:
2733               if instance.status != "down":
2734                 val = "running"
2735               else:
2736                 val = "ERROR_up"
2737             else:
2738               if instance.status != "down":
2739                 val = "ERROR_down"
2740               else:
2741                 val = "ADMIN_down"
2742         elif field == "oper_ram":
2743           if instance.primary_node in bad_nodes:
2744             val = None
2745           elif instance.name in live_data:
2746             val = live_data[instance.name].get("memory", "?")
2747           else:
2748             val = "-"
2749         elif field == "disk_template":
2750           val = instance.disk_template
2751         elif field == "ip":
2752           val = instance.nics[0].ip
2753         elif field == "bridge":
2754           val = instance.nics[0].bridge
2755         elif field == "mac":
2756           val = instance.nics[0].mac
2757         elif field == "sda_size" or field == "sdb_size":
2758           disk = instance.FindDisk(field[:3])
2759           if disk is None:
2760             val = None
2761           else:
2762             val = disk.size
2763         elif field == "tags":
2764           val = list(instance.GetTags())
2765         elif field == "serial_no":
2766           val = instance.serial_no
2767         elif field == "network_port":
2768           val = instance.network_port
2769         elif field == "hypervisor":
2770           val = instance.hypervisor
2771         elif field == "hvparams":
2772           val = i_hv
2773         elif (field.startswith(HVPREFIX) and
2774               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2775           val = i_hv.get(field[len(HVPREFIX):], None)
2776         elif field == "beparams":
2777           val = i_be
2778         elif (field.startswith(BEPREFIX) and
2779               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
2780           val = i_be.get(field[len(BEPREFIX):], None)
2781         else:
2782           raise errors.ParameterError(field)
2783         iout.append(val)
2784       output.append(iout)
2785
2786     return output
2787
2788
2789 class LUFailoverInstance(LogicalUnit):
2790   """Failover an instance.
2791
2792   """
2793   HPATH = "instance-failover"
2794   HTYPE = constants.HTYPE_INSTANCE
2795   _OP_REQP = ["instance_name", "ignore_consistency"]
2796   REQ_BGL = False
2797
2798   def ExpandNames(self):
2799     self._ExpandAndLockInstance()
2800     self.needed_locks[locking.LEVEL_NODE] = []
2801     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2802
2803   def DeclareLocks(self, level):
2804     if level == locking.LEVEL_NODE:
2805       self._LockInstancesNodes()
2806
2807   def BuildHooksEnv(self):
2808     """Build hooks env.
2809
2810     This runs on master, primary and secondary nodes of the instance.
2811
2812     """
2813     env = {
2814       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2815       }
2816     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2817     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2818     return env, nl, nl
2819
2820   def CheckPrereq(self):
2821     """Check prerequisites.
2822
2823     This checks that the instance is in the cluster.
2824
2825     """
2826     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2827     assert self.instance is not None, \
2828       "Cannot retrieve locked instance %s" % self.op.instance_name
2829
2830     bep = self.cfg.GetClusterInfo().FillBE(instance)
2831     if instance.disk_template not in constants.DTS_NET_MIRROR:
2832       raise errors.OpPrereqError("Instance's disk layout is not"
2833                                  " network mirrored, cannot failover.")
2834
2835     secondary_nodes = instance.secondary_nodes
2836     if not secondary_nodes:
2837       raise errors.ProgrammerError("no secondary node but using "
2838                                    "a mirrored disk template")
2839
2840     target_node = secondary_nodes[0]
2841     # check memory requirements on the secondary node
2842     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2843                          instance.name, bep[constants.BE_MEMORY],
2844                          instance.hypervisor)
2845
2846     # check bridge existance
2847     brlist = [nic.bridge for nic in instance.nics]
2848     if not self.rpc.call_bridges_exist(target_node, brlist):
2849       raise errors.OpPrereqError("One or more target bridges %s does not"
2850                                  " exist on destination node '%s'" %
2851                                  (brlist, target_node))
2852
2853   def Exec(self, feedback_fn):
2854     """Failover an instance.
2855
2856     The failover is done by shutting it down on its present node and
2857     starting it on the secondary.
2858
2859     """
2860     instance = self.instance
2861
2862     source_node = instance.primary_node
2863     target_node = instance.secondary_nodes[0]
2864
2865     feedback_fn("* checking disk consistency between source and target")
2866     for dev in instance.disks:
2867       # for drbd, these are drbd over lvm
2868       if not _CheckDiskConsistency(self, dev, target_node, False):
2869         if instance.status == "up" and not self.op.ignore_consistency:
2870           raise errors.OpExecError("Disk %s is degraded on target node,"
2871                                    " aborting failover." % dev.iv_name)
2872
2873     feedback_fn("* shutting down instance on source node")
2874     logger.Info("Shutting down instance %s on node %s" %
2875                 (instance.name, source_node))
2876
2877     if not self.rpc.call_instance_shutdown(source_node, instance):
2878       if self.op.ignore_consistency:
2879         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2880                      " anyway. Please make sure node %s is down"  %
2881                      (instance.name, source_node, source_node))
2882       else:
2883         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2884                                  (instance.name, source_node))
2885
2886     feedback_fn("* deactivating the instance's disks on source node")
2887     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2888       raise errors.OpExecError("Can't shut down the instance's disks.")
2889
2890     instance.primary_node = target_node
2891     # distribute new instance config to the other nodes
2892     self.cfg.Update(instance)
2893
2894     # Only start the instance if it's marked as up
2895     if instance.status == "up":
2896       feedback_fn("* activating the instance's disks on target node")
2897       logger.Info("Starting instance %s on node %s" %
2898                   (instance.name, target_node))
2899
2900       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2901                                                ignore_secondaries=True)
2902       if not disks_ok:
2903         _ShutdownInstanceDisks(self, instance)
2904         raise errors.OpExecError("Can't activate the instance's disks")
2905
2906       feedback_fn("* starting the instance on the target node")
2907       if not self.rpc.call_instance_start(target_node, instance, None):
2908         _ShutdownInstanceDisks(self, instance)
2909         raise errors.OpExecError("Could not start instance %s on node %s." %
2910                                  (instance.name, target_node))
2911
2912
2913 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2914   """Create a tree of block devices on the primary node.
2915
2916   This always creates all devices.
2917
2918   """
2919   if device.children:
2920     for child in device.children:
2921       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2922         return False
2923
2924   lu.cfg.SetDiskID(device, node)
2925   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2926                                        instance.name, True, info)
2927   if not new_id:
2928     return False
2929   if device.physical_id is None:
2930     device.physical_id = new_id
2931   return True
2932
2933
2934 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2935   """Create a tree of block devices on a secondary node.
2936
2937   If this device type has to be created on secondaries, create it and
2938   all its children.
2939
2940   If not, just recurse to children keeping the same 'force' value.
2941
2942   """
2943   if device.CreateOnSecondary():
2944     force = True
2945   if device.children:
2946     for child in device.children:
2947       if not _CreateBlockDevOnSecondary(lu, node, instance,
2948                                         child, force, info):
2949         return False
2950
2951   if not force:
2952     return True
2953   lu.cfg.SetDiskID(device, node)
2954   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2955                                        instance.name, False, info)
2956   if not new_id:
2957     return False
2958   if device.physical_id is None:
2959     device.physical_id = new_id
2960   return True
2961
2962
2963 def _GenerateUniqueNames(lu, exts):
2964   """Generate a suitable LV name.
2965
2966   This will generate a logical volume name for the given instance.
2967
2968   """
2969   results = []
2970   for val in exts:
2971     new_id = lu.cfg.GenerateUniqueID()
2972     results.append("%s%s" % (new_id, val))
2973   return results
2974
2975
2976 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
2977                          p_minor, s_minor):
2978   """Generate a drbd8 device complete with its children.
2979
2980   """
2981   port = lu.cfg.AllocatePort()
2982   vgname = lu.cfg.GetVGName()
2983   shared_secret = lu.cfg.GenerateDRBDSecret()
2984   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2985                           logical_id=(vgname, names[0]))
2986   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2987                           logical_id=(vgname, names[1]))
2988   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2989                           logical_id=(primary, secondary, port,
2990                                       p_minor, s_minor,
2991                                       shared_secret),
2992                           children=[dev_data, dev_meta],
2993                           iv_name=iv_name)
2994   return drbd_dev
2995
2996
2997 def _GenerateDiskTemplate(lu, template_name,
2998                           instance_name, primary_node,
2999                           secondary_nodes, disk_sz, swap_sz,
3000                           file_storage_dir, file_driver):
3001   """Generate the entire disk layout for a given template type.
3002
3003   """
3004   #TODO: compute space requirements
3005
3006   vgname = lu.cfg.GetVGName()
3007   if template_name == constants.DT_DISKLESS:
3008     disks = []
3009   elif template_name == constants.DT_PLAIN:
3010     if len(secondary_nodes) != 0:
3011       raise errors.ProgrammerError("Wrong template configuration")
3012
3013     names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
3014     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
3015                            logical_id=(vgname, names[0]),
3016                            iv_name = "sda")
3017     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
3018                            logical_id=(vgname, names[1]),
3019                            iv_name = "sdb")
3020     disks = [sda_dev, sdb_dev]
3021   elif template_name == constants.DT_DRBD8:
3022     if len(secondary_nodes) != 1:
3023       raise errors.ProgrammerError("Wrong template configuration")
3024     remote_node = secondary_nodes[0]
3025     (minor_pa, minor_pb,
3026      minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3027       [primary_node, primary_node, remote_node, remote_node], instance_name)
3028
3029     names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3030                                       ".sdb_data", ".sdb_meta"])
3031     drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3032                                         disk_sz, names[0:2], "sda",
3033                                         minor_pa, minor_sa)
3034     drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3035                                         swap_sz, names[2:4], "sdb",
3036                                         minor_pb, minor_sb)
3037     disks = [drbd_sda_dev, drbd_sdb_dev]
3038   elif template_name == constants.DT_FILE:
3039     if len(secondary_nodes) != 0:
3040       raise errors.ProgrammerError("Wrong template configuration")
3041
3042     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3043                                 iv_name="sda", logical_id=(file_driver,
3044                                 "%s/sda" % file_storage_dir))
3045     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3046                                 iv_name="sdb", logical_id=(file_driver,
3047                                 "%s/sdb" % file_storage_dir))
3048     disks = [file_sda_dev, file_sdb_dev]
3049   else:
3050     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3051   return disks
3052
3053
3054 def _GetInstanceInfoText(instance):
3055   """Compute that text that should be added to the disk's metadata.
3056
3057   """
3058   return "originstname+%s" % instance.name
3059
3060
3061 def _CreateDisks(lu, instance):
3062   """Create all disks for an instance.
3063
3064   This abstracts away some work from AddInstance.
3065
3066   Args:
3067     instance: the instance object
3068
3069   Returns:
3070     True or False showing the success of the creation process
3071
3072   """
3073   info = _GetInstanceInfoText(instance)
3074
3075   if instance.disk_template == constants.DT_FILE:
3076     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3077     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3078                                                  file_storage_dir)
3079
3080     if not result:
3081       logger.Error("Could not connect to node '%s'" % instance.primary_node)
3082       return False
3083
3084     if not result[0]:
3085       logger.Error("failed to create directory '%s'" % file_storage_dir)
3086       return False
3087
3088   for device in instance.disks:
3089     logger.Info("creating volume %s for instance %s" %
3090                 (device.iv_name, instance.name))
3091     #HARDCODE
3092     for secondary_node in instance.secondary_nodes:
3093       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3094                                         device, False, info):
3095         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3096                      (device.iv_name, device, secondary_node))
3097         return False
3098     #HARDCODE
3099     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3100                                     instance, device, info):
3101       logger.Error("failed to create volume %s on primary!" %
3102                    device.iv_name)
3103       return False
3104
3105   return True
3106
3107
3108 def _RemoveDisks(lu, instance):
3109   """Remove all disks for an instance.
3110
3111   This abstracts away some work from `AddInstance()` and
3112   `RemoveInstance()`. Note that in case some of the devices couldn't
3113   be removed, the removal will continue with the other ones (compare
3114   with `_CreateDisks()`).
3115
3116   Args:
3117     instance: the instance object
3118
3119   Returns:
3120     True or False showing the success of the removal proces
3121
3122   """
3123   logger.Info("removing block devices for instance %s" % instance.name)
3124
3125   result = True
3126   for device in instance.disks:
3127     for node, disk in device.ComputeNodeTree(instance.primary_node):
3128       lu.cfg.SetDiskID(disk, node)
3129       if not lu.rpc.call_blockdev_remove(node, disk):
3130         logger.Error("could not remove block device %s on node %s,"
3131                      " continuing anyway" %
3132                      (device.iv_name, node))
3133         result = False
3134
3135   if instance.disk_template == constants.DT_FILE:
3136     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3137     if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3138                                                file_storage_dir):
3139       logger.Error("could not remove directory '%s'" % file_storage_dir)
3140       result = False
3141
3142   return result
3143
3144
3145 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3146   """Compute disk size requirements in the volume group
3147
3148   This is currently hard-coded for the two-drive layout.
3149
3150   """
3151   # Required free disk space as a function of disk and swap space
3152   req_size_dict = {
3153     constants.DT_DISKLESS: None,
3154     constants.DT_PLAIN: disk_size + swap_size,
3155     # 256 MB are added for drbd metadata, 128MB for each drbd device
3156     constants.DT_DRBD8: disk_size + swap_size + 256,
3157     constants.DT_FILE: None,
3158   }
3159
3160   if disk_template not in req_size_dict:
3161     raise errors.ProgrammerError("Disk template '%s' size requirement"
3162                                  " is unknown" %  disk_template)
3163
3164   return req_size_dict[disk_template]
3165
3166
3167 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3168   """Hypervisor parameter validation.
3169
3170   This function abstract the hypervisor parameter validation to be
3171   used in both instance create and instance modify.
3172
3173   @type lu: L{LogicalUnit}
3174   @param lu: the logical unit for which we check
3175   @type nodenames: list
3176   @param nodenames: the list of nodes on which we should check
3177   @type hvname: string
3178   @param hvname: the name of the hypervisor we should use
3179   @type hvparams: dict
3180   @param hvparams: the parameters which we need to check
3181   @raise errors.OpPrereqError: if the parameters are not valid
3182
3183   """
3184   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3185                                                   hvname,
3186                                                   hvparams)
3187   for node in nodenames:
3188     info = hvinfo.get(node, None)
3189     if not info or not isinstance(info, (tuple, list)):
3190       raise errors.OpPrereqError("Cannot get current information"
3191                                  " from node '%s' (%s)" % (node, info))
3192     if not info[0]:
3193       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3194                                  " %s" % info[1])
3195
3196
3197 class LUCreateInstance(LogicalUnit):
3198   """Create an instance.
3199
3200   """
3201   HPATH = "instance-add"
3202   HTYPE = constants.HTYPE_INSTANCE
3203   _OP_REQP = ["instance_name", "disk_size",
3204               "disk_template", "swap_size", "mode", "start",
3205               "wait_for_sync", "ip_check", "mac",
3206               "hvparams", "beparams"]
3207   REQ_BGL = False
3208
3209   def _ExpandNode(self, node):
3210     """Expands and checks one node name.
3211
3212     """
3213     node_full = self.cfg.ExpandNodeName(node)
3214     if node_full is None:
3215       raise errors.OpPrereqError("Unknown node %s" % node)
3216     return node_full
3217
3218   def ExpandNames(self):
3219     """ExpandNames for CreateInstance.
3220
3221     Figure out the right locks for instance creation.
3222
3223     """
3224     self.needed_locks = {}
3225
3226     # set optional parameters to none if they don't exist
3227     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3228       if not hasattr(self.op, attr):
3229         setattr(self.op, attr, None)
3230
3231     # cheap checks, mostly valid constants given
3232
3233     # verify creation mode
3234     if self.op.mode not in (constants.INSTANCE_CREATE,
3235                             constants.INSTANCE_IMPORT):
3236       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3237                                  self.op.mode)
3238
3239     # disk template and mirror node verification
3240     if self.op.disk_template not in constants.DISK_TEMPLATES:
3241       raise errors.OpPrereqError("Invalid disk template name")
3242
3243     if self.op.hypervisor is None:
3244       self.op.hypervisor = self.cfg.GetHypervisorType()
3245
3246     cluster = self.cfg.GetClusterInfo()
3247     enabled_hvs = cluster.enabled_hypervisors
3248     if self.op.hypervisor not in enabled_hvs:
3249       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3250                                  " cluster (%s)" % (self.op.hypervisor,
3251                                   ",".join(enabled_hvs)))
3252
3253     # check hypervisor parameter syntax (locally)
3254
3255     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
3256                                   self.op.hvparams)
3257     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3258     hv_type.CheckParameterSyntax(filled_hvp)
3259
3260     # fill and remember the beparams dict
3261     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
3262                                     self.op.beparams)
3263
3264     #### instance parameters check
3265
3266     # instance name verification
3267     hostname1 = utils.HostInfo(self.op.instance_name)
3268     self.op.instance_name = instance_name = hostname1.name
3269
3270     # this is just a preventive check, but someone might still add this
3271     # instance in the meantime, and creation will fail at lock-add time
3272     if instance_name in self.cfg.GetInstanceList():
3273       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3274                                  instance_name)
3275
3276     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3277
3278     # ip validity checks
3279     ip = getattr(self.op, "ip", None)
3280     if ip is None or ip.lower() == "none":
3281       inst_ip = None
3282     elif ip.lower() == "auto":
3283       inst_ip = hostname1.ip
3284     else:
3285       if not utils.IsValidIP(ip):
3286         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3287                                    " like a valid IP" % ip)
3288       inst_ip = ip
3289     self.inst_ip = self.op.ip = inst_ip
3290     # used in CheckPrereq for ip ping check
3291     self.check_ip = hostname1.ip
3292
3293     # MAC address verification
3294     if self.op.mac != "auto":
3295       if not utils.IsValidMac(self.op.mac.lower()):
3296         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3297                                    self.op.mac)
3298
3299     # file storage checks
3300     if (self.op.file_driver and
3301         not self.op.file_driver in constants.FILE_DRIVER):
3302       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3303                                  self.op.file_driver)
3304
3305     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3306       raise errors.OpPrereqError("File storage directory path not absolute")
3307
3308     ### Node/iallocator related checks
3309     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3310       raise errors.OpPrereqError("One and only one of iallocator and primary"
3311                                  " node must be given")
3312
3313     if self.op.iallocator:
3314       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3315     else:
3316       self.op.pnode = self._ExpandNode(self.op.pnode)
3317       nodelist = [self.op.pnode]
3318       if self.op.snode is not None:
3319         self.op.snode = self._ExpandNode(self.op.snode)
3320         nodelist.append(self.op.snode)
3321       self.needed_locks[locking.LEVEL_NODE] = nodelist
3322
3323     # in case of import lock the source node too
3324     if self.op.mode == constants.INSTANCE_IMPORT:
3325       src_node = getattr(self.op, "src_node", None)
3326       src_path = getattr(self.op, "src_path", None)
3327
3328       if src_node is None or src_path is None:
3329         raise errors.OpPrereqError("Importing an instance requires source"
3330                                    " node and path options")
3331
3332       if not os.path.isabs(src_path):
3333         raise errors.OpPrereqError("The source path must be absolute")
3334
3335       self.op.src_node = src_node = self._ExpandNode(src_node)
3336       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3337         self.needed_locks[locking.LEVEL_NODE].append(src_node)
3338
3339     else: # INSTANCE_CREATE
3340       if getattr(self.op, "os_type", None) is None:
3341         raise errors.OpPrereqError("No guest OS specified")
3342
3343   def _RunAllocator(self):
3344     """Run the allocator based on input opcode.
3345
3346     """
3347     disks = [{"size": self.op.disk_size, "mode": "w"},
3348              {"size": self.op.swap_size, "mode": "w"}]
3349     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3350              "bridge": self.op.bridge}]
3351     ial = IAllocator(self,
3352                      mode=constants.IALLOCATOR_MODE_ALLOC,
3353                      name=self.op.instance_name,
3354                      disk_template=self.op.disk_template,
3355                      tags=[],
3356                      os=self.op.os_type,
3357                      vcpus=self.be_full[constants.BE_VCPUS],
3358                      mem_size=self.be_full[constants.BE_MEMORY],
3359                      disks=disks,
3360                      nics=nics,
3361                      )
3362
3363     ial.Run(self.op.iallocator)
3364
3365     if not ial.success:
3366       raise errors.OpPrereqError("Can't compute nodes using"
3367                                  " iallocator '%s': %s" % (self.op.iallocator,
3368                                                            ial.info))
3369     if len(ial.nodes) != ial.required_nodes:
3370       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3371                                  " of nodes (%s), required %s" %
3372                                  (self.op.iallocator, len(ial.nodes),
3373                                   ial.required_nodes))
3374     self.op.pnode = ial.nodes[0]
3375     logger.ToStdout("Selected nodes for the instance: %s" %
3376                     (", ".join(ial.nodes),))
3377     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3378                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3379     if ial.required_nodes == 2:
3380       self.op.snode = ial.nodes[1]
3381
3382   def BuildHooksEnv(self):
3383     """Build hooks env.
3384
3385     This runs on master, primary and secondary nodes of the instance.
3386
3387     """
3388     env = {
3389       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3390       "INSTANCE_DISK_SIZE": self.op.disk_size,
3391       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3392       "INSTANCE_ADD_MODE": self.op.mode,
3393       }
3394     if self.op.mode == constants.INSTANCE_IMPORT:
3395       env["INSTANCE_SRC_NODE"] = self.op.src_node
3396       env["INSTANCE_SRC_PATH"] = self.op.src_path
3397       env["INSTANCE_SRC_IMAGE"] = self.src_image
3398
3399     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3400       primary_node=self.op.pnode,
3401       secondary_nodes=self.secondaries,
3402       status=self.instance_status,
3403       os_type=self.op.os_type,
3404       memory=self.be_full[constants.BE_MEMORY],
3405       vcpus=self.be_full[constants.BE_VCPUS],
3406       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3407     ))
3408
3409     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3410           self.secondaries)
3411     return env, nl, nl
3412
3413
3414   def CheckPrereq(self):
3415     """Check prerequisites.
3416
3417     """
3418     if (not self.cfg.GetVGName() and
3419         self.op.disk_template not in constants.DTS_NOT_LVM):
3420       raise errors.OpPrereqError("Cluster does not support lvm-based"
3421                                  " instances")
3422
3423
3424     if self.op.mode == constants.INSTANCE_IMPORT:
3425       src_node = self.op.src_node
3426       src_path = self.op.src_path
3427
3428       export_info = self.rpc.call_export_info(src_node, src_path)
3429
3430       if not export_info:
3431         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3432
3433       if not export_info.has_section(constants.INISECT_EXP):
3434         raise errors.ProgrammerError("Corrupted export config")
3435
3436       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3437       if (int(ei_version) != constants.EXPORT_VERSION):
3438         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3439                                    (ei_version, constants.EXPORT_VERSION))
3440
3441       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3442         raise errors.OpPrereqError("Can't import instance with more than"
3443                                    " one data disk")
3444
3445       # FIXME: are the old os-es, disk sizes, etc. useful?
3446       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3447       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3448                                                          'disk0_dump'))
3449       self.src_image = diskimage
3450
3451     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3452
3453     if self.op.start and not self.op.ip_check:
3454       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3455                                  " adding an instance in start mode")
3456
3457     if self.op.ip_check:
3458       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3459         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3460                                    (self.check_ip, self.op.instance_name))
3461
3462     # bridge verification
3463     bridge = getattr(self.op, "bridge", None)
3464     if bridge is None:
3465       self.op.bridge = self.cfg.GetDefBridge()
3466     else:
3467       self.op.bridge = bridge
3468
3469     #### allocator run
3470
3471     if self.op.iallocator is not None:
3472       self._RunAllocator()
3473
3474     #### node related checks
3475
3476     # check primary node
3477     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3478     assert self.pnode is not None, \
3479       "Cannot retrieve locked node %s" % self.op.pnode
3480     self.secondaries = []
3481
3482     # mirror node verification
3483     if self.op.disk_template in constants.DTS_NET_MIRROR:
3484       if self.op.snode is None:
3485         raise errors.OpPrereqError("The networked disk templates need"
3486                                    " a mirror node")
3487       if self.op.snode == pnode.name:
3488         raise errors.OpPrereqError("The secondary node cannot be"
3489                                    " the primary node.")
3490       self.secondaries.append(self.op.snode)
3491
3492     nodenames = [pnode.name] + self.secondaries
3493
3494     req_size = _ComputeDiskSize(self.op.disk_template,
3495                                 self.op.disk_size, self.op.swap_size)
3496
3497     # Check lv size requirements
3498     if req_size is not None:
3499       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3500                                          self.op.hypervisor)
3501       for node in nodenames:
3502         info = nodeinfo.get(node, None)
3503         if not info:
3504           raise errors.OpPrereqError("Cannot get current information"
3505                                      " from node '%s'" % node)
3506         vg_free = info.get('vg_free', None)
3507         if not isinstance(vg_free, int):
3508           raise errors.OpPrereqError("Can't compute free disk space on"
3509                                      " node %s" % node)
3510         if req_size > info['vg_free']:
3511           raise errors.OpPrereqError("Not enough disk space on target node %s."
3512                                      " %d MB available, %d MB required" %
3513                                      (node, info['vg_free'], req_size))
3514
3515     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3516
3517     # os verification
3518     os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3519     if not os_obj:
3520       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3521                                  " primary node"  % self.op.os_type)
3522
3523     # bridge check on primary node
3524     if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3525       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3526                                  " destination node '%s'" %
3527                                  (self.op.bridge, pnode.name))
3528
3529     # memory check on primary node
3530     if self.op.start:
3531       _CheckNodeFreeMemory(self, self.pnode.name,
3532                            "creating instance %s" % self.op.instance_name,
3533                            self.be_full[constants.BE_MEMORY],
3534                            self.op.hypervisor)
3535
3536     if self.op.start:
3537       self.instance_status = 'up'
3538     else:
3539       self.instance_status = 'down'
3540
3541   def Exec(self, feedback_fn):
3542     """Create and add the instance to the cluster.
3543
3544     """
3545     instance = self.op.instance_name
3546     pnode_name = self.pnode.name
3547
3548     if self.op.mac == "auto":
3549       mac_address = self.cfg.GenerateMAC()
3550     else:
3551       mac_address = self.op.mac
3552
3553     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3554     if self.inst_ip is not None:
3555       nic.ip = self.inst_ip
3556
3557     ht_kind = self.op.hypervisor
3558     if ht_kind in constants.HTS_REQ_PORT:
3559       network_port = self.cfg.AllocatePort()
3560     else:
3561       network_port = None
3562
3563     ##if self.op.vnc_bind_address is None:
3564     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3565
3566     # this is needed because os.path.join does not accept None arguments
3567     if self.op.file_storage_dir is None:
3568       string_file_storage_dir = ""
3569     else:
3570       string_file_storage_dir = self.op.file_storage_dir
3571
3572     # build the full file storage dir path
3573     file_storage_dir = os.path.normpath(os.path.join(
3574                                         self.cfg.GetFileStorageDir(),
3575                                         string_file_storage_dir, instance))
3576
3577
3578     disks = _GenerateDiskTemplate(self,
3579                                   self.op.disk_template,
3580                                   instance, pnode_name,
3581                                   self.secondaries, self.op.disk_size,
3582                                   self.op.swap_size,
3583                                   file_storage_dir,
3584                                   self.op.file_driver)
3585
3586     iobj = objects.Instance(name=instance, os=self.op.os_type,
3587                             primary_node=pnode_name,
3588                             nics=[nic], disks=disks,
3589                             disk_template=self.op.disk_template,
3590                             status=self.instance_status,
3591                             network_port=network_port,
3592                             beparams=self.op.beparams,
3593                             hvparams=self.op.hvparams,
3594                             hypervisor=self.op.hypervisor,
3595                             )
3596
3597     feedback_fn("* creating instance disks...")
3598     if not _CreateDisks(self, iobj):
3599       _RemoveDisks(self, iobj)
3600       self.cfg.ReleaseDRBDMinors(instance)
3601       raise errors.OpExecError("Device creation failed, reverting...")
3602
3603     feedback_fn("adding instance %s to cluster config" % instance)
3604
3605     self.cfg.AddInstance(iobj)
3606     # Declare that we don't want to remove the instance lock anymore, as we've
3607     # added the instance to the config
3608     del self.remove_locks[locking.LEVEL_INSTANCE]
3609     # Remove the temp. assignements for the instance's drbds
3610     self.cfg.ReleaseDRBDMinors(instance)
3611
3612     if self.op.wait_for_sync:
3613       disk_abort = not _WaitForSync(self, iobj)
3614     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3615       # make sure the disks are not degraded (still sync-ing is ok)
3616       time.sleep(15)
3617       feedback_fn("* checking mirrors status")
3618       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3619     else:
3620       disk_abort = False
3621
3622     if disk_abort:
3623       _RemoveDisks(self, iobj)
3624       self.cfg.RemoveInstance(iobj.name)
3625       # Make sure the instance lock gets removed
3626       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3627       raise errors.OpExecError("There are some degraded disks for"
3628                                " this instance")
3629
3630     feedback_fn("creating os for instance %s on node %s" %
3631                 (instance, pnode_name))
3632
3633     if iobj.disk_template != constants.DT_DISKLESS:
3634       if self.op.mode == constants.INSTANCE_CREATE:
3635         feedback_fn("* running the instance OS create scripts...")
3636         if not self.rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3637           raise errors.OpExecError("could not add os for instance %s"
3638                                    " on node %s" %
3639                                    (instance, pnode_name))
3640
3641       elif self.op.mode == constants.INSTANCE_IMPORT:
3642         feedback_fn("* running the instance OS import scripts...")
3643         src_node = self.op.src_node
3644         src_image = self.src_image
3645         cluster_name = self.cfg.GetClusterName()
3646         if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3647                                                 src_node, src_image,
3648                                                 cluster_name):
3649           raise errors.OpExecError("Could not import os for instance"
3650                                    " %s on node %s" %
3651                                    (instance, pnode_name))
3652       else:
3653         # also checked in the prereq part
3654         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3655                                      % self.op.mode)
3656
3657     if self.op.start:
3658       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3659       feedback_fn("* starting instance...")
3660       if not self.rpc.call_instance_start(pnode_name, iobj, None):
3661         raise errors.OpExecError("Could not start instance")
3662
3663
3664 class LUConnectConsole(NoHooksLU):
3665   """Connect to an instance's console.
3666
3667   This is somewhat special in that it returns the command line that
3668   you need to run on the master node in order to connect to the
3669   console.
3670
3671   """
3672   _OP_REQP = ["instance_name"]
3673   REQ_BGL = False
3674
3675   def ExpandNames(self):
3676     self._ExpandAndLockInstance()
3677
3678   def CheckPrereq(self):
3679     """Check prerequisites.
3680
3681     This checks that the instance is in the cluster.
3682
3683     """
3684     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3685     assert self.instance is not None, \
3686       "Cannot retrieve locked instance %s" % self.op.instance_name
3687
3688   def Exec(self, feedback_fn):
3689     """Connect to the console of an instance
3690
3691     """
3692     instance = self.instance
3693     node = instance.primary_node
3694
3695     node_insts = self.rpc.call_instance_list([node],
3696                                              [instance.hypervisor])[node]
3697     if node_insts is False:
3698       raise errors.OpExecError("Can't connect to node %s." % node)
3699
3700     if instance.name not in node_insts:
3701       raise errors.OpExecError("Instance %s is not running." % instance.name)
3702
3703     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3704
3705     hyper = hypervisor.GetHypervisor(instance.hypervisor)
3706     console_cmd = hyper.GetShellCommandForConsole(instance)
3707
3708     # build ssh cmdline
3709     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3710
3711
3712 class LUReplaceDisks(LogicalUnit):
3713   """Replace the disks of an instance.
3714
3715   """
3716   HPATH = "mirrors-replace"
3717   HTYPE = constants.HTYPE_INSTANCE
3718   _OP_REQP = ["instance_name", "mode", "disks"]
3719   REQ_BGL = False
3720
3721   def ExpandNames(self):
3722     self._ExpandAndLockInstance()
3723
3724     if not hasattr(self.op, "remote_node"):
3725       self.op.remote_node = None
3726
3727     ia_name = getattr(self.op, "iallocator", None)
3728     if ia_name is not None:
3729       if self.op.remote_node is not None:
3730         raise errors.OpPrereqError("Give either the iallocator or the new"
3731                                    " secondary, not both")
3732       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3733     elif self.op.remote_node is not None:
3734       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3735       if remote_node is None:
3736         raise errors.OpPrereqError("Node '%s' not known" %
3737                                    self.op.remote_node)
3738       self.op.remote_node = remote_node
3739       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3740       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3741     else:
3742       self.needed_locks[locking.LEVEL_NODE] = []
3743       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3744
3745   def DeclareLocks(self, level):
3746     # If we're not already locking all nodes in the set we have to declare the
3747     # instance's primary/secondary nodes.
3748     if (level == locking.LEVEL_NODE and
3749         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3750       self._LockInstancesNodes()
3751
3752   def _RunAllocator(self):
3753     """Compute a new secondary node using an IAllocator.
3754
3755     """
3756     ial = IAllocator(self,
3757                      mode=constants.IALLOCATOR_MODE_RELOC,
3758                      name=self.op.instance_name,
3759                      relocate_from=[self.sec_node])
3760
3761     ial.Run(self.op.iallocator)
3762
3763     if not ial.success:
3764       raise errors.OpPrereqError("Can't compute nodes using"
3765                                  " iallocator '%s': %s" % (self.op.iallocator,
3766                                                            ial.info))
3767     if len(ial.nodes) != ial.required_nodes:
3768       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3769                                  " of nodes (%s), required %s" %
3770                                  (len(ial.nodes), ial.required_nodes))
3771     self.op.remote_node = ial.nodes[0]
3772     logger.ToStdout("Selected new secondary for the instance: %s" %
3773                     self.op.remote_node)
3774
3775   def BuildHooksEnv(self):
3776     """Build hooks env.
3777
3778     This runs on the master, the primary and all the secondaries.
3779
3780     """
3781     env = {
3782       "MODE": self.op.mode,
3783       "NEW_SECONDARY": self.op.remote_node,
3784       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3785       }
3786     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3787     nl = [
3788       self.cfg.GetMasterNode(),
3789       self.instance.primary_node,
3790       ]
3791     if self.op.remote_node is not None:
3792       nl.append(self.op.remote_node)
3793     return env, nl, nl
3794
3795   def CheckPrereq(self):
3796     """Check prerequisites.
3797
3798     This checks that the instance is in the cluster.
3799
3800     """
3801     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3802     assert instance is not None, \
3803       "Cannot retrieve locked instance %s" % self.op.instance_name
3804     self.instance = instance
3805
3806     if instance.disk_template not in constants.DTS_NET_MIRROR:
3807       raise errors.OpPrereqError("Instance's disk layout is not"
3808                                  " network mirrored.")
3809
3810     if len(instance.secondary_nodes) != 1:
3811       raise errors.OpPrereqError("The instance has a strange layout,"
3812                                  " expected one secondary but found %d" %
3813                                  len(instance.secondary_nodes))
3814
3815     self.sec_node = instance.secondary_nodes[0]
3816
3817     ia_name = getattr(self.op, "iallocator", None)
3818     if ia_name is not None:
3819       self._RunAllocator()
3820
3821     remote_node = self.op.remote_node
3822     if remote_node is not None:
3823       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3824       assert self.remote_node_info is not None, \
3825         "Cannot retrieve locked node %s" % remote_node
3826     else:
3827       self.remote_node_info = None
3828     if remote_node == instance.primary_node:
3829       raise errors.OpPrereqError("The specified node is the primary node of"
3830                                  " the instance.")
3831     elif remote_node == self.sec_node:
3832       if self.op.mode == constants.REPLACE_DISK_SEC:
3833         # this is for DRBD8, where we can't execute the same mode of
3834         # replacement as for drbd7 (no different port allocated)
3835         raise errors.OpPrereqError("Same secondary given, cannot execute"
3836                                    " replacement")
3837     if instance.disk_template == constants.DT_DRBD8:
3838       if (self.op.mode == constants.REPLACE_DISK_ALL and
3839           remote_node is not None):
3840         # switch to replace secondary mode
3841         self.op.mode = constants.REPLACE_DISK_SEC
3842
3843       if self.op.mode == constants.REPLACE_DISK_ALL:
3844         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3845                                    " secondary disk replacement, not"
3846                                    " both at once")
3847       elif self.op.mode == constants.REPLACE_DISK_PRI:
3848         if remote_node is not None:
3849           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3850                                      " the secondary while doing a primary"
3851                                      " node disk replacement")
3852         self.tgt_node = instance.primary_node
3853         self.oth_node = instance.secondary_nodes[0]
3854       elif self.op.mode == constants.REPLACE_DISK_SEC:
3855         self.new_node = remote_node # this can be None, in which case
3856                                     # we don't change the secondary
3857         self.tgt_node = instance.secondary_nodes[0]
3858         self.oth_node = instance.primary_node
3859       else:
3860         raise errors.ProgrammerError("Unhandled disk replace mode")
3861
3862     for name in self.op.disks:
3863       if instance.FindDisk(name) is None:
3864         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3865                                    (name, instance.name))
3866
3867   def _ExecD8DiskOnly(self, feedback_fn):
3868     """Replace a disk on the primary or secondary for dbrd8.
3869
3870     The algorithm for replace is quite complicated:
3871       - for each disk to be replaced:
3872         - create new LVs on the target node with unique names
3873         - detach old LVs from the drbd device
3874         - rename old LVs to name_replaced.<time_t>
3875         - rename new LVs to old LVs
3876         - attach the new LVs (with the old names now) to the drbd device
3877       - wait for sync across all devices
3878       - for each modified disk:
3879         - remove old LVs (which have the name name_replaces.<time_t>)
3880
3881     Failures are not very well handled.
3882
3883     """
3884     steps_total = 6
3885     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3886     instance = self.instance
3887     iv_names = {}
3888     vgname = self.cfg.GetVGName()
3889     # start of work
3890     cfg = self.cfg
3891     tgt_node = self.tgt_node
3892     oth_node = self.oth_node
3893
3894     # Step: check device activation
3895     self.proc.LogStep(1, steps_total, "check device existence")
3896     info("checking volume groups")
3897     my_vg = cfg.GetVGName()
3898     results = self.rpc.call_vg_list([oth_node, tgt_node])
3899     if not results:
3900       raise errors.OpExecError("Can't list volume groups on the nodes")
3901     for node in oth_node, tgt_node:
3902       res = results.get(node, False)
3903       if not res or my_vg not in res:
3904         raise errors.OpExecError("Volume group '%s' not found on %s" %
3905                                  (my_vg, node))
3906     for dev in instance.disks:
3907       if not dev.iv_name in self.op.disks:
3908         continue
3909       for node in tgt_node, oth_node:
3910         info("checking %s on %s" % (dev.iv_name, node))
3911         cfg.SetDiskID(dev, node)
3912         if not self.rpc.call_blockdev_find(node, dev):
3913           raise errors.OpExecError("Can't find device %s on node %s" %
3914                                    (dev.iv_name, node))
3915
3916     # Step: check other node consistency
3917     self.proc.LogStep(2, steps_total, "check peer consistency")
3918     for dev in instance.disks:
3919       if not dev.iv_name in self.op.disks:
3920         continue
3921       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3922       if not _CheckDiskConsistency(self, dev, oth_node,
3923                                    oth_node==instance.primary_node):
3924         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3925                                  " to replace disks on this node (%s)" %
3926                                  (oth_node, tgt_node))
3927
3928     # Step: create new storage
3929     self.proc.LogStep(3, steps_total, "allocate new storage")
3930     for dev in instance.disks:
3931       if not dev.iv_name in self.op.disks:
3932         continue
3933       size = dev.size
3934       cfg.SetDiskID(dev, tgt_node)
3935       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3936       names = _GenerateUniqueNames(self, lv_names)
3937       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3938                              logical_id=(vgname, names[0]))
3939       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3940                              logical_id=(vgname, names[1]))
3941       new_lvs = [lv_data, lv_meta]
3942       old_lvs = dev.children
3943       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3944       info("creating new local storage on %s for %s" %
3945            (tgt_node, dev.iv_name))
3946       # since we *always* want to create this LV, we use the
3947       # _Create...OnPrimary (which forces the creation), even if we
3948       # are talking about the secondary node
3949       for new_lv in new_lvs:
3950         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
3951                                         _GetInstanceInfoText(instance)):
3952           raise errors.OpExecError("Failed to create new LV named '%s' on"
3953                                    " node '%s'" %
3954                                    (new_lv.logical_id[1], tgt_node))
3955
3956     # Step: for each lv, detach+rename*2+attach
3957     self.proc.LogStep(4, steps_total, "change drbd configuration")
3958     for dev, old_lvs, new_lvs in iv_names.itervalues():
3959       info("detaching %s drbd from local storage" % dev.iv_name)
3960       if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3961         raise errors.OpExecError("Can't detach drbd from local storage on node"
3962                                  " %s for device %s" % (tgt_node, dev.iv_name))
3963       #dev.children = []
3964       #cfg.Update(instance)
3965
3966       # ok, we created the new LVs, so now we know we have the needed
3967       # storage; as such, we proceed on the target node to rename
3968       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3969       # using the assumption that logical_id == physical_id (which in
3970       # turn is the unique_id on that node)
3971
3972       # FIXME(iustin): use a better name for the replaced LVs
3973       temp_suffix = int(time.time())
3974       ren_fn = lambda d, suff: (d.physical_id[0],
3975                                 d.physical_id[1] + "_replaced-%s" % suff)
3976       # build the rename list based on what LVs exist on the node
3977       rlist = []
3978       for to_ren in old_lvs:
3979         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
3980         if find_res is not None: # device exists
3981           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3982
3983       info("renaming the old LVs on the target node")
3984       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3985         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3986       # now we rename the new LVs to the old LVs
3987       info("renaming the new LVs on the target node")
3988       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3989       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3990         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3991
3992       for old, new in zip(old_lvs, new_lvs):
3993         new.logical_id = old.logical_id
3994         cfg.SetDiskID(new, tgt_node)
3995
3996       for disk in old_lvs:
3997         disk.logical_id = ren_fn(disk, temp_suffix)
3998         cfg.SetDiskID(disk, tgt_node)
3999
4000       # now that the new lvs have the old name, we can add them to the device
4001       info("adding new mirror component on %s" % tgt_node)
4002       if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
4003         for new_lv in new_lvs:
4004           if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
4005             warning("Can't rollback device %s", hint="manually cleanup unused"
4006                     " logical volumes")
4007         raise errors.OpExecError("Can't add local storage to drbd")
4008
4009       dev.children = new_lvs
4010       cfg.Update(instance)
4011
4012     # Step: wait for sync
4013
4014     # this can fail as the old devices are degraded and _WaitForSync
4015     # does a combined result over all disks, so we don't check its
4016     # return value
4017     self.proc.LogStep(5, steps_total, "sync devices")
4018     _WaitForSync(self, instance, unlock=True)
4019
4020     # so check manually all the devices
4021     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4022       cfg.SetDiskID(dev, instance.primary_node)
4023       is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
4024       if is_degr:
4025         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4026
4027     # Step: remove old storage
4028     self.proc.LogStep(6, steps_total, "removing old storage")
4029     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
4030       info("remove logical volumes for %s" % name)
4031       for lv in old_lvs:
4032         cfg.SetDiskID(lv, tgt_node)
4033         if not self.rpc.call_blockdev_remove(tgt_node, lv):
4034           warning("Can't remove old LV", hint="manually remove unused LVs")
4035           continue
4036
4037   def _ExecD8Secondary(self, feedback_fn):
4038     """Replace the secondary node for drbd8.
4039
4040     The algorithm for replace is quite complicated:
4041       - for all disks of the instance:
4042         - create new LVs on the new node with same names
4043         - shutdown the drbd device on the old secondary
4044         - disconnect the drbd network on the primary
4045         - create the drbd device on the new secondary
4046         - network attach the drbd on the primary, using an artifice:
4047           the drbd code for Attach() will connect to the network if it
4048           finds a device which is connected to the good local disks but
4049           not network enabled
4050       - wait for sync across all devices
4051       - remove all disks from the old secondary
4052
4053     Failures are not very well handled.
4054
4055     """
4056     steps_total = 6
4057     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4058     instance = self.instance
4059     iv_names = {}
4060     vgname = self.cfg.GetVGName()
4061     # start of work
4062     cfg = self.cfg
4063     old_node = self.tgt_node
4064     new_node = self.new_node
4065     pri_node = instance.primary_node
4066
4067     # Step: check device activation
4068     self.proc.LogStep(1, steps_total, "check device existence")
4069     info("checking volume groups")
4070     my_vg = cfg.GetVGName()
4071     results = self.rpc.call_vg_list([pri_node, new_node])
4072     if not results:
4073       raise errors.OpExecError("Can't list volume groups on the nodes")
4074     for node in pri_node, new_node:
4075       res = results.get(node, False)
4076       if not res or my_vg not in res:
4077         raise errors.OpExecError("Volume group '%s' not found on %s" %
4078                                  (my_vg, node))
4079     for dev in instance.disks:
4080       if not dev.iv_name in self.op.disks:
4081         continue
4082       info("checking %s on %s" % (dev.iv_name, pri_node))
4083       cfg.SetDiskID(dev, pri_node)
4084       if not self.rpc.call_blockdev_find(pri_node, dev):
4085         raise errors.OpExecError("Can't find device %s on node %s" %
4086                                  (dev.iv_name, pri_node))
4087
4088     # Step: check other node consistency
4089     self.proc.LogStep(2, steps_total, "check peer consistency")
4090     for dev in instance.disks:
4091       if not dev.iv_name in self.op.disks:
4092         continue
4093       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4094       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4095         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4096                                  " unsafe to replace the secondary" %
4097                                  pri_node)
4098
4099     # Step: create new storage
4100     self.proc.LogStep(3, steps_total, "allocate new storage")
4101     for dev in instance.disks:
4102       size = dev.size
4103       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4104       # since we *always* want to create this LV, we use the
4105       # _Create...OnPrimary (which forces the creation), even if we
4106       # are talking about the secondary node
4107       for new_lv in dev.children:
4108         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4109                                         _GetInstanceInfoText(instance)):
4110           raise errors.OpExecError("Failed to create new LV named '%s' on"
4111                                    " node '%s'" %
4112                                    (new_lv.logical_id[1], new_node))
4113
4114
4115     # Step 4: dbrd minors and drbd setups changes
4116     # after this, we must manually remove the drbd minors on both the
4117     # error and the success paths
4118     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4119                                    instance.name)
4120     logging.debug("Allocated minors %s" % (minors,))
4121     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4122     for dev, new_minor in zip(instance.disks, minors):
4123       size = dev.size
4124       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4125       # create new devices on new_node
4126       if pri_node == dev.logical_id[0]:
4127         new_logical_id = (pri_node, new_node,
4128                           dev.logical_id[2], dev.logical_id[3], new_minor,
4129                           dev.logical_id[5])
4130       else:
4131         new_logical_id = (new_node, pri_node,
4132                           dev.logical_id[2], new_minor, dev.logical_id[4],
4133                           dev.logical_id[5])
4134       iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4135       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4136                     new_logical_id)
4137       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4138                               logical_id=new_logical_id,
4139                               children=dev.children)
4140       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4141                                         new_drbd, False,
4142                                         _GetInstanceInfoText(instance)):
4143         self.cfg.ReleaseDRBDMinors(instance.name)
4144         raise errors.OpExecError("Failed to create new DRBD on"
4145                                  " node '%s'" % new_node)
4146
4147     for dev in instance.disks:
4148       # we have new devices, shutdown the drbd on the old secondary
4149       info("shutting down drbd for %s on old node" % dev.iv_name)
4150       cfg.SetDiskID(dev, old_node)
4151       if not self.rpc.call_blockdev_shutdown(old_node, dev):
4152         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4153                 hint="Please cleanup this device manually as soon as possible")
4154
4155     info("detaching primary drbds from the network (=> standalone)")
4156     done = 0
4157     for dev in instance.disks:
4158       cfg.SetDiskID(dev, pri_node)
4159       # set the network part of the physical (unique in bdev terms) id
4160       # to None, meaning detach from network
4161       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4162       # and 'find' the device, which will 'fix' it to match the
4163       # standalone state
4164       if self.rpc.call_blockdev_find(pri_node, dev):
4165         done += 1
4166       else:
4167         warning("Failed to detach drbd %s from network, unusual case" %
4168                 dev.iv_name)
4169
4170     if not done:
4171       # no detaches succeeded (very unlikely)
4172       self.cfg.ReleaseDRBDMinors(instance.name)
4173       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4174
4175     # if we managed to detach at least one, we update all the disks of
4176     # the instance to point to the new secondary
4177     info("updating instance configuration")
4178     for dev, _, new_logical_id in iv_names.itervalues():
4179       dev.logical_id = new_logical_id
4180       cfg.SetDiskID(dev, pri_node)
4181     cfg.Update(instance)
4182     # we can remove now the temp minors as now the new values are
4183     # written to the config file (and therefore stable)
4184     self.cfg.ReleaseDRBDMinors(instance.name)
4185
4186     # and now perform the drbd attach
4187     info("attaching primary drbds to new secondary (standalone => connected)")
4188     failures = []
4189     for dev in instance.disks:
4190       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4191       # since the attach is smart, it's enough to 'find' the device,
4192       # it will automatically activate the network, if the physical_id
4193       # is correct
4194       cfg.SetDiskID(dev, pri_node)
4195       logging.debug("Disk to attach: %s", dev)
4196       if not self.rpc.call_blockdev_find(pri_node, dev):
4197         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4198                 "please do a gnt-instance info to see the status of disks")
4199
4200     # this can fail as the old devices are degraded and _WaitForSync
4201     # does a combined result over all disks, so we don't check its
4202     # return value
4203     self.proc.LogStep(5, steps_total, "sync devices")
4204     _WaitForSync(self, instance, unlock=True)
4205
4206     # so check manually all the devices
4207     for name, (dev, old_lvs, _) in iv_names.iteritems():
4208       cfg.SetDiskID(dev, pri_node)
4209       is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4210       if is_degr:
4211         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4212
4213     self.proc.LogStep(6, steps_total, "removing old storage")
4214     for name, (dev, old_lvs, _) in iv_names.iteritems():
4215       info("remove logical volumes for %s" % name)
4216       for lv in old_lvs:
4217         cfg.SetDiskID(lv, old_node)
4218         if not self.rpc.call_blockdev_remove(old_node, lv):
4219           warning("Can't remove LV on old secondary",
4220                   hint="Cleanup stale volumes by hand")
4221
4222   def Exec(self, feedback_fn):
4223     """Execute disk replacement.
4224
4225     This dispatches the disk replacement to the appropriate handler.
4226
4227     """
4228     instance = self.instance
4229
4230     # Activate the instance disks if we're replacing them on a down instance
4231     if instance.status == "down":
4232       _StartInstanceDisks(self, instance, True)
4233
4234     if instance.disk_template == constants.DT_DRBD8:
4235       if self.op.remote_node is None:
4236         fn = self._ExecD8DiskOnly
4237       else:
4238         fn = self._ExecD8Secondary
4239     else:
4240       raise errors.ProgrammerError("Unhandled disk replacement case")
4241
4242     ret = fn(feedback_fn)
4243
4244     # Deactivate the instance disks if we're replacing them on a down instance
4245     if instance.status == "down":
4246       _SafeShutdownInstanceDisks(self, instance)
4247
4248     return ret
4249
4250
4251 class LUGrowDisk(LogicalUnit):
4252   """Grow a disk of an instance.
4253
4254   """
4255   HPATH = "disk-grow"
4256   HTYPE = constants.HTYPE_INSTANCE
4257   _OP_REQP = ["instance_name", "disk", "amount"]
4258   REQ_BGL = False
4259
4260   def ExpandNames(self):
4261     self._ExpandAndLockInstance()
4262     self.needed_locks[locking.LEVEL_NODE] = []
4263     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4264
4265   def DeclareLocks(self, level):
4266     if level == locking.LEVEL_NODE:
4267       self._LockInstancesNodes()
4268
4269   def BuildHooksEnv(self):
4270     """Build hooks env.
4271
4272     This runs on the master, the primary and all the secondaries.
4273
4274     """
4275     env = {
4276       "DISK": self.op.disk,
4277       "AMOUNT": self.op.amount,
4278       }
4279     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4280     nl = [
4281       self.cfg.GetMasterNode(),
4282       self.instance.primary_node,
4283       ]
4284     return env, nl, nl
4285
4286   def CheckPrereq(self):
4287     """Check prerequisites.
4288
4289     This checks that the instance is in the cluster.
4290
4291     """
4292     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4293     assert instance is not None, \
4294       "Cannot retrieve locked instance %s" % self.op.instance_name
4295
4296     self.instance = instance
4297
4298     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4299       raise errors.OpPrereqError("Instance's disk layout does not support"
4300                                  " growing.")
4301
4302     if instance.FindDisk(self.op.disk) is None:
4303       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4304                                  (self.op.disk, instance.name))
4305
4306     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4307     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4308                                        instance.hypervisor)
4309     for node in nodenames:
4310       info = nodeinfo.get(node, None)
4311       if not info:
4312         raise errors.OpPrereqError("Cannot get current information"
4313                                    " from node '%s'" % node)
4314       vg_free = info.get('vg_free', None)
4315       if not isinstance(vg_free, int):
4316         raise errors.OpPrereqError("Can't compute free disk space on"
4317                                    " node %s" % node)
4318       if self.op.amount > info['vg_free']:
4319         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4320                                    " %d MiB available, %d MiB required" %
4321                                    (node, info['vg_free'], self.op.amount))
4322
4323   def Exec(self, feedback_fn):
4324     """Execute disk grow.
4325
4326     """
4327     instance = self.instance
4328     disk = instance.FindDisk(self.op.disk)
4329     for node in (instance.secondary_nodes + (instance.primary_node,)):
4330       self.cfg.SetDiskID(disk, node)
4331       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4332       if (not result or not isinstance(result, (list, tuple)) or
4333           len(result) != 2):
4334         raise errors.OpExecError("grow request failed to node %s" % node)
4335       elif not result[0]:
4336         raise errors.OpExecError("grow request failed to node %s: %s" %
4337                                  (node, result[1]))
4338     disk.RecordGrow(self.op.amount)
4339     self.cfg.Update(instance)
4340     return
4341
4342
4343 class LUQueryInstanceData(NoHooksLU):
4344   """Query runtime instance data.
4345
4346   """
4347   _OP_REQP = ["instances", "static"]
4348   REQ_BGL = False
4349
4350   def ExpandNames(self):
4351     self.needed_locks = {}
4352     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4353
4354     if not isinstance(self.op.instances, list):
4355       raise errors.OpPrereqError("Invalid argument type 'instances'")
4356
4357     if self.op.instances:
4358       self.wanted_names = []
4359       for name in self.op.instances:
4360         full_name = self.cfg.ExpandInstanceName(name)
4361         if full_name is None:
4362           raise errors.OpPrereqError("Instance '%s' not known" %
4363                                      self.op.instance_name)
4364         self.wanted_names.append(full_name)
4365       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4366     else:
4367       self.wanted_names = None
4368       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4369
4370     self.needed_locks[locking.LEVEL_NODE] = []
4371     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4372
4373   def DeclareLocks(self, level):
4374     if level == locking.LEVEL_NODE:
4375       self._LockInstancesNodes()
4376
4377   def CheckPrereq(self):
4378     """Check prerequisites.
4379
4380     This only checks the optional instance list against the existing names.
4381
4382     """
4383     if self.wanted_names is None:
4384       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4385
4386     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4387                              in self.wanted_names]
4388     return
4389
4390   def _ComputeDiskStatus(self, instance, snode, dev):
4391     """Compute block device status.
4392
4393     """
4394     static = self.op.static
4395     if not static:
4396       self.cfg.SetDiskID(dev, instance.primary_node)
4397       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4398     else:
4399       dev_pstatus = None
4400
4401     if dev.dev_type in constants.LDS_DRBD:
4402       # we change the snode then (otherwise we use the one passed in)
4403       if dev.logical_id[0] == instance.primary_node:
4404         snode = dev.logical_id[1]
4405       else:
4406         snode = dev.logical_id[0]
4407
4408     if snode and not static:
4409       self.cfg.SetDiskID(dev, snode)
4410       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4411     else:
4412       dev_sstatus = None
4413
4414     if dev.children:
4415       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4416                       for child in dev.children]
4417     else:
4418       dev_children = []
4419
4420     data = {
4421       "iv_name": dev.iv_name,
4422       "dev_type": dev.dev_type,
4423       "logical_id": dev.logical_id,
4424       "physical_id": dev.physical_id,
4425       "pstatus": dev_pstatus,
4426       "sstatus": dev_sstatus,
4427       "children": dev_children,
4428       }
4429
4430     return data
4431
4432   def Exec(self, feedback_fn):
4433     """Gather and return data"""
4434     result = {}
4435
4436     cluster = self.cfg.GetClusterInfo()
4437
4438     for instance in self.wanted_instances:
4439       if not self.op.static:
4440         remote_info = self.rpc.call_instance_info(instance.primary_node,
4441                                                   instance.name,
4442                                                   instance.hypervisor)
4443         if remote_info and "state" in remote_info:
4444           remote_state = "up"
4445         else:
4446           remote_state = "down"
4447       else:
4448         remote_state = None
4449       if instance.status == "down":
4450         config_state = "down"
4451       else:
4452         config_state = "up"
4453
4454       disks = [self._ComputeDiskStatus(instance, None, device)
4455                for device in instance.disks]
4456
4457       idict = {
4458         "name": instance.name,
4459         "config_state": config_state,
4460         "run_state": remote_state,
4461         "pnode": instance.primary_node,
4462         "snodes": instance.secondary_nodes,
4463         "os": instance.os,
4464         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4465         "disks": disks,
4466         "hypervisor": instance.hypervisor,
4467         "network_port": instance.network_port,
4468         "hv_instance": instance.hvparams,
4469         "hv_actual": cluster.FillHV(instance),
4470         "be_instance": instance.beparams,
4471         "be_actual": cluster.FillBE(instance),
4472         }
4473
4474       result[instance.name] = idict
4475
4476     return result
4477
4478
4479 class LUSetInstanceParams(LogicalUnit):
4480   """Modifies an instances's parameters.
4481
4482   """
4483   HPATH = "instance-modify"
4484   HTYPE = constants.HTYPE_INSTANCE
4485   _OP_REQP = ["instance_name", "hvparams"]
4486   REQ_BGL = False
4487
4488   def ExpandNames(self):
4489     self._ExpandAndLockInstance()
4490     self.needed_locks[locking.LEVEL_NODE] = []
4491     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4492
4493
4494   def DeclareLocks(self, level):
4495     if level == locking.LEVEL_NODE:
4496       self._LockInstancesNodes()
4497
4498   def BuildHooksEnv(self):
4499     """Build hooks env.
4500
4501     This runs on the master, primary and secondaries.
4502
4503     """
4504     args = dict()
4505     if constants.BE_MEMORY in self.be_new:
4506       args['memory'] = self.be_new[constants.BE_MEMORY]
4507     if constants.BE_VCPUS in self.be_new:
4508       args['vcpus'] = self.be_bnew[constants.BE_VCPUS]
4509     if self.do_ip or self.do_bridge or self.mac:
4510       if self.do_ip:
4511         ip = self.ip
4512       else:
4513         ip = self.instance.nics[0].ip
4514       if self.bridge:
4515         bridge = self.bridge
4516       else:
4517         bridge = self.instance.nics[0].bridge
4518       if self.mac:
4519         mac = self.mac
4520       else:
4521         mac = self.instance.nics[0].mac
4522       args['nics'] = [(ip, bridge, mac)]
4523     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
4524     nl = [self.cfg.GetMasterNode(),
4525           self.instance.primary_node] + list(self.instance.secondary_nodes)
4526     return env, nl, nl
4527
4528   def CheckPrereq(self):
4529     """Check prerequisites.
4530
4531     This only checks the instance list against the existing names.
4532
4533     """
4534     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4535     # a separate CheckArguments function, if we implement one, so the operation
4536     # can be aborted without waiting for any lock, should it have an error...
4537     self.ip = getattr(self.op, "ip", None)
4538     self.mac = getattr(self.op, "mac", None)
4539     self.bridge = getattr(self.op, "bridge", None)
4540     self.kernel_path = getattr(self.op, "kernel_path", None)
4541     self.initrd_path = getattr(self.op, "initrd_path", None)
4542     self.force = getattr(self.op, "force", None)
4543     all_parms = [self.ip, self.bridge, self.mac]
4544     if (all_parms.count(None) == len(all_parms) and
4545         not self.op.hvparams and
4546         not self.op.beparams):
4547       raise errors.OpPrereqError("No changes submitted")
4548     for item in (constants.BE_MEMORY, constants.BE_VCPUS):
4549       val = self.op.beparams.get(item, None)
4550       if val is not None:
4551         try:
4552           val = int(val)
4553         except ValueError, err:
4554           raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
4555         self.op.beparams[item] = val
4556     if self.ip is not None:
4557       self.do_ip = True
4558       if self.ip.lower() == "none":
4559         self.ip = None
4560       else:
4561         if not utils.IsValidIP(self.ip):
4562           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4563     else:
4564       self.do_ip = False
4565     self.do_bridge = (self.bridge is not None)
4566     if self.mac is not None:
4567       if self.cfg.IsMacInUse(self.mac):
4568         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4569                                    self.mac)
4570       if not utils.IsValidMac(self.mac):
4571         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4572
4573     # checking the new params on the primary/secondary nodes
4574
4575     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4576     assert self.instance is not None, \
4577       "Cannot retrieve locked instance %s" % self.op.instance_name
4578     pnode = self.instance.primary_node
4579     nodelist = [pnode]
4580     nodelist.extend(instance.secondary_nodes)
4581
4582     # hvparams processing
4583     if self.op.hvparams:
4584       i_hvdict = copy.deepcopy(instance.hvparams)
4585       for key, val in self.op.hvparams.iteritems():
4586         if val is None:
4587           try:
4588             del i_hvdict[key]
4589           except KeyError:
4590             pass
4591         else:
4592           i_hvdict[key] = val
4593       cluster = self.cfg.GetClusterInfo()
4594       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4595                                 i_hvdict)
4596       # local check
4597       hypervisor.GetHypervisor(
4598         instance.hypervisor).CheckParameterSyntax(hv_new)
4599       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4600       self.hv_new = hv_new # the new actual values
4601       self.hv_inst = i_hvdict # the new dict (without defaults)
4602     else:
4603       self.hv_new = self.hv_inst = {}
4604
4605     # beparams processing
4606     if self.op.beparams:
4607       i_bedict = copy.deepcopy(instance.beparams)
4608       for key, val in self.op.beparams.iteritems():
4609         if val is None:
4610           try:
4611             del i_bedict[key]
4612           except KeyError:
4613             pass
4614         else:
4615           i_bedict[key] = val
4616       cluster = self.cfg.GetClusterInfo()
4617       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4618                                 i_bedict)
4619       self.be_new = be_new # the new actual values
4620       self.be_inst = i_bedict # the new dict (without defaults)
4621     else:
4622       self.hv_new = self.hv_inst = {}
4623
4624     self.warn = []
4625
4626     if constants.BE_MEMORY in self.op.beparams and not self.force:
4627       mem_check_list = [pnode]
4628       if be_new[constants.BE_AUTO_BALANCE]:
4629         # either we changed auto_balance to yes or it was from before
4630         mem_check_list.extend(instance.secondary_nodes)
4631       instance_info = self.rpc.call_instance_info(pnode, instance.name,
4632                                                   instance.hypervisor)
4633       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
4634                                          instance.hypervisor)
4635
4636       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4637         # Assume the primary node is unreachable and go ahead
4638         self.warn.append("Can't get info from primary node %s" % pnode)
4639       else:
4640         if instance_info:
4641           current_mem = instance_info['memory']
4642         else:
4643           # Assume instance not running
4644           # (there is a slight race condition here, but it's not very probable,
4645           # and we have no other way to check)
4646           current_mem = 0
4647         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
4648                     nodeinfo[pnode]['memory_free'])
4649         if miss_mem > 0:
4650           raise errors.OpPrereqError("This change will prevent the instance"
4651                                      " from starting, due to %d MB of memory"
4652                                      " missing on its primary node" % miss_mem)
4653
4654       if be_new[constants.BE_AUTO_BALANCE]:
4655         for node in instance.secondary_nodes:
4656           if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4657             self.warn.append("Can't get info from secondary node %s" % node)
4658           elif be_new[constants.BE_MEMORY] > nodeinfo[node]['memory_free']:
4659             self.warn.append("Not enough memory to failover instance to"
4660                              " secondary node %s" % node)
4661
4662     return
4663
4664   def Exec(self, feedback_fn):
4665     """Modifies an instance.
4666
4667     All parameters take effect only at the next restart of the instance.
4668     """
4669     # Process here the warnings from CheckPrereq, as we don't have a
4670     # feedback_fn there.
4671     for warn in self.warn:
4672       feedback_fn("WARNING: %s" % warn)
4673
4674     result = []
4675     instance = self.instance
4676     if self.do_ip:
4677       instance.nics[0].ip = self.ip
4678       result.append(("ip", self.ip))
4679     if self.bridge:
4680       instance.nics[0].bridge = self.bridge
4681       result.append(("bridge", self.bridge))
4682     if self.mac:
4683       instance.nics[0].mac = self.mac
4684       result.append(("mac", self.mac))
4685     if self.op.hvparams:
4686       instance.hvparams = self.hv_new
4687       for key, val in self.op.hvparams.iteritems():
4688         result.append(("hv/%s" % key, val))
4689     if self.op.beparams:
4690       instance.beparams = self.be_inst
4691       for key, val in self.op.beparams.iteritems():
4692         result.append(("be/%s" % key, val))
4693
4694     self.cfg.Update(instance)
4695
4696     return result
4697
4698
4699 class LUQueryExports(NoHooksLU):
4700   """Query the exports list
4701
4702   """
4703   _OP_REQP = ['nodes']
4704   REQ_BGL = False
4705
4706   def ExpandNames(self):
4707     self.needed_locks = {}
4708     self.share_locks[locking.LEVEL_NODE] = 1
4709     if not self.op.nodes:
4710       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4711     else:
4712       self.needed_locks[locking.LEVEL_NODE] = \
4713         _GetWantedNodes(self, self.op.nodes)
4714
4715   def CheckPrereq(self):
4716     """Check prerequisites.
4717
4718     """
4719     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4720
4721   def Exec(self, feedback_fn):
4722     """Compute the list of all the exported system images.
4723
4724     Returns:
4725       a dictionary with the structure node->(export-list)
4726       where export-list is a list of the instances exported on
4727       that node.
4728
4729     """
4730     return self.rpc.call_export_list(self.nodes)
4731
4732
4733 class LUExportInstance(LogicalUnit):
4734   """Export an instance to an image in the cluster.
4735
4736   """
4737   HPATH = "instance-export"
4738   HTYPE = constants.HTYPE_INSTANCE
4739   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4740   REQ_BGL = False
4741
4742   def ExpandNames(self):
4743     self._ExpandAndLockInstance()
4744     # FIXME: lock only instance primary and destination node
4745     #
4746     # Sad but true, for now we have do lock all nodes, as we don't know where
4747     # the previous export might be, and and in this LU we search for it and
4748     # remove it from its current node. In the future we could fix this by:
4749     #  - making a tasklet to search (share-lock all), then create the new one,
4750     #    then one to remove, after
4751     #  - removing the removal operation altoghether
4752     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4753
4754   def DeclareLocks(self, level):
4755     """Last minute lock declaration."""
4756     # All nodes are locked anyway, so nothing to do here.
4757
4758   def BuildHooksEnv(self):
4759     """Build hooks env.
4760
4761     This will run on the master, primary node and target node.
4762
4763     """
4764     env = {
4765       "EXPORT_NODE": self.op.target_node,
4766       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4767       }
4768     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
4769     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4770           self.op.target_node]
4771     return env, nl, nl
4772
4773   def CheckPrereq(self):
4774     """Check prerequisites.
4775
4776     This checks that the instance and node names are valid.
4777
4778     """
4779     instance_name = self.op.instance_name
4780     self.instance = self.cfg.GetInstanceInfo(instance_name)
4781     assert self.instance is not None, \
4782           "Cannot retrieve locked instance %s" % self.op.instance_name
4783
4784     self.dst_node = self.cfg.GetNodeInfo(
4785       self.cfg.ExpandNodeName(self.op.target_node))
4786
4787     assert self.dst_node is not None, \
4788           "Cannot retrieve locked node %s" % self.op.target_node
4789
4790     # instance disk type verification
4791     for disk in self.instance.disks:
4792       if disk.dev_type == constants.LD_FILE:
4793         raise errors.OpPrereqError("Export not supported for instances with"
4794                                    " file-based disks")
4795
4796   def Exec(self, feedback_fn):
4797     """Export an instance to an image in the cluster.
4798
4799     """
4800     instance = self.instance
4801     dst_node = self.dst_node
4802     src_node = instance.primary_node
4803     if self.op.shutdown:
4804       # shutdown the instance, but not the disks
4805       if not self.rpc.call_instance_shutdown(src_node, instance):
4806         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4807                                  (instance.name, src_node))
4808
4809     vgname = self.cfg.GetVGName()
4810
4811     snap_disks = []
4812
4813     try:
4814       for disk in instance.disks:
4815         if disk.iv_name == "sda":
4816           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4817           new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4818
4819           if not new_dev_name:
4820             logger.Error("could not snapshot block device %s on node %s" %
4821                          (disk.logical_id[1], src_node))
4822           else:
4823             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4824                                       logical_id=(vgname, new_dev_name),
4825                                       physical_id=(vgname, new_dev_name),
4826                                       iv_name=disk.iv_name)
4827             snap_disks.append(new_dev)
4828
4829     finally:
4830       if self.op.shutdown and instance.status == "up":
4831         if not self.rpc.call_instance_start(src_node, instance, None):
4832           _ShutdownInstanceDisks(self, instance)
4833           raise errors.OpExecError("Could not start instance")
4834
4835     # TODO: check for size
4836
4837     cluster_name = self.cfg.GetClusterName()
4838     for dev in snap_disks:
4839       if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4840                                       instance, cluster_name):
4841         logger.Error("could not export block device %s from node %s to node %s"
4842                      % (dev.logical_id[1], src_node, dst_node.name))
4843       if not self.rpc.call_blockdev_remove(src_node, dev):
4844         logger.Error("could not remove snapshot block device %s from node %s" %
4845                      (dev.logical_id[1], src_node))
4846
4847     if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4848       logger.Error("could not finalize export for instance %s on node %s" %
4849                    (instance.name, dst_node.name))
4850
4851     nodelist = self.cfg.GetNodeList()
4852     nodelist.remove(dst_node.name)
4853
4854     # on one-node clusters nodelist will be empty after the removal
4855     # if we proceed the backup would be removed because OpQueryExports
4856     # substitutes an empty list with the full cluster node list.
4857     if nodelist:
4858       exportlist = self.rpc.call_export_list(nodelist)
4859       for node in exportlist:
4860         if instance.name in exportlist[node]:
4861           if not self.rpc.call_export_remove(node, instance.name):
4862             logger.Error("could not remove older export for instance %s"
4863                          " on node %s" % (instance.name, node))
4864
4865
4866 class LURemoveExport(NoHooksLU):
4867   """Remove exports related to the named instance.
4868
4869   """
4870   _OP_REQP = ["instance_name"]
4871   REQ_BGL = False
4872
4873   def ExpandNames(self):
4874     self.needed_locks = {}
4875     # We need all nodes to be locked in order for RemoveExport to work, but we
4876     # don't need to lock the instance itself, as nothing will happen to it (and
4877     # we can remove exports also for a removed instance)
4878     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4879
4880   def CheckPrereq(self):
4881     """Check prerequisites.
4882     """
4883     pass
4884
4885   def Exec(self, feedback_fn):
4886     """Remove any export.
4887
4888     """
4889     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4890     # If the instance was not found we'll try with the name that was passed in.
4891     # This will only work if it was an FQDN, though.
4892     fqdn_warn = False
4893     if not instance_name:
4894       fqdn_warn = True
4895       instance_name = self.op.instance_name
4896
4897     exportlist = self.rpc.call_export_list(self.acquired_locks[
4898       locking.LEVEL_NODE])
4899     found = False
4900     for node in exportlist:
4901       if instance_name in exportlist[node]:
4902         found = True
4903         if not self.rpc.call_export_remove(node, instance_name):
4904           logger.Error("could not remove export for instance %s"
4905                        " on node %s" % (instance_name, node))
4906
4907     if fqdn_warn and not found:
4908       feedback_fn("Export not found. If trying to remove an export belonging"
4909                   " to a deleted instance please use its Fully Qualified"
4910                   " Domain Name.")
4911
4912
4913 class TagsLU(NoHooksLU):
4914   """Generic tags LU.
4915
4916   This is an abstract class which is the parent of all the other tags LUs.
4917
4918   """
4919
4920   def ExpandNames(self):
4921     self.needed_locks = {}
4922     if self.op.kind == constants.TAG_NODE:
4923       name = self.cfg.ExpandNodeName(self.op.name)
4924       if name is None:
4925         raise errors.OpPrereqError("Invalid node name (%s)" %
4926                                    (self.op.name,))
4927       self.op.name = name
4928       self.needed_locks[locking.LEVEL_NODE] = name
4929     elif self.op.kind == constants.TAG_INSTANCE:
4930       name = self.cfg.ExpandInstanceName(self.op.name)
4931       if name is None:
4932         raise errors.OpPrereqError("Invalid instance name (%s)" %
4933                                    (self.op.name,))
4934       self.op.name = name
4935       self.needed_locks[locking.LEVEL_INSTANCE] = name
4936
4937   def CheckPrereq(self):
4938     """Check prerequisites.
4939
4940     """
4941     if self.op.kind == constants.TAG_CLUSTER:
4942       self.target = self.cfg.GetClusterInfo()
4943     elif self.op.kind == constants.TAG_NODE:
4944       self.target = self.cfg.GetNodeInfo(self.op.name)
4945     elif self.op.kind == constants.TAG_INSTANCE:
4946       self.target = self.cfg.GetInstanceInfo(self.op.name)
4947     else:
4948       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4949                                  str(self.op.kind))
4950
4951
4952 class LUGetTags(TagsLU):
4953   """Returns the tags of a given object.
4954
4955   """
4956   _OP_REQP = ["kind", "name"]
4957   REQ_BGL = False
4958
4959   def Exec(self, feedback_fn):
4960     """Returns the tag list.
4961
4962     """
4963     return list(self.target.GetTags())
4964
4965
4966 class LUSearchTags(NoHooksLU):
4967   """Searches the tags for a given pattern.
4968
4969   """
4970   _OP_REQP = ["pattern"]
4971   REQ_BGL = False
4972
4973   def ExpandNames(self):
4974     self.needed_locks = {}
4975
4976   def CheckPrereq(self):
4977     """Check prerequisites.
4978
4979     This checks the pattern passed for validity by compiling it.
4980
4981     """
4982     try:
4983       self.re = re.compile(self.op.pattern)
4984     except re.error, err:
4985       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4986                                  (self.op.pattern, err))
4987
4988   def Exec(self, feedback_fn):
4989     """Returns the tag list.
4990
4991     """
4992     cfg = self.cfg
4993     tgts = [("/cluster", cfg.GetClusterInfo())]
4994     ilist = cfg.GetAllInstancesInfo().values()
4995     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4996     nlist = cfg.GetAllNodesInfo().values()
4997     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4998     results = []
4999     for path, target in tgts:
5000       for tag in target.GetTags():
5001         if self.re.search(tag):
5002           results.append((path, tag))
5003     return results
5004
5005
5006 class LUAddTags(TagsLU):
5007   """Sets a tag on a given object.
5008
5009   """
5010   _OP_REQP = ["kind", "name", "tags"]
5011   REQ_BGL = False
5012
5013   def CheckPrereq(self):
5014     """Check prerequisites.
5015
5016     This checks the type and length of the tag name and value.
5017
5018     """
5019     TagsLU.CheckPrereq(self)
5020     for tag in self.op.tags:
5021       objects.TaggableObject.ValidateTag(tag)
5022
5023   def Exec(self, feedback_fn):
5024     """Sets the tag.
5025
5026     """
5027     try:
5028       for tag in self.op.tags:
5029         self.target.AddTag(tag)
5030     except errors.TagError, err:
5031       raise errors.OpExecError("Error while setting tag: %s" % str(err))
5032     try:
5033       self.cfg.Update(self.target)
5034     except errors.ConfigurationError:
5035       raise errors.OpRetryError("There has been a modification to the"
5036                                 " config file and the operation has been"
5037                                 " aborted. Please retry.")
5038
5039
5040 class LUDelTags(TagsLU):
5041   """Delete a list of tags from a given object.
5042
5043   """
5044   _OP_REQP = ["kind", "name", "tags"]
5045   REQ_BGL = False
5046
5047   def CheckPrereq(self):
5048     """Check prerequisites.
5049
5050     This checks that we have the given tag.
5051
5052     """
5053     TagsLU.CheckPrereq(self)
5054     for tag in self.op.tags:
5055       objects.TaggableObject.ValidateTag(tag)
5056     del_tags = frozenset(self.op.tags)
5057     cur_tags = self.target.GetTags()
5058     if not del_tags <= cur_tags:
5059       diff_tags = del_tags - cur_tags
5060       diff_names = ["'%s'" % tag for tag in diff_tags]
5061       diff_names.sort()
5062       raise errors.OpPrereqError("Tag(s) %s not found" %
5063                                  (",".join(diff_names)))
5064
5065   def Exec(self, feedback_fn):
5066     """Remove the tag from the object.
5067
5068     """
5069     for tag in self.op.tags:
5070       self.target.RemoveTag(tag)
5071     try:
5072       self.cfg.Update(self.target)
5073     except errors.ConfigurationError:
5074       raise errors.OpRetryError("There has been a modification to the"
5075                                 " config file and the operation has been"
5076                                 " aborted. Please retry.")
5077
5078
5079 class LUTestDelay(NoHooksLU):
5080   """Sleep for a specified amount of time.
5081
5082   This LU sleeps on the master and/or nodes for a specified amount of
5083   time.
5084
5085   """
5086   _OP_REQP = ["duration", "on_master", "on_nodes"]
5087   REQ_BGL = False
5088
5089   def ExpandNames(self):
5090     """Expand names and set required locks.
5091
5092     This expands the node list, if any.
5093
5094     """
5095     self.needed_locks = {}
5096     if self.op.on_nodes:
5097       # _GetWantedNodes can be used here, but is not always appropriate to use
5098       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5099       # more information.
5100       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5101       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5102
5103   def CheckPrereq(self):
5104     """Check prerequisites.
5105
5106     """
5107
5108   def Exec(self, feedback_fn):
5109     """Do the actual sleep.
5110
5111     """
5112     if self.op.on_master:
5113       if not utils.TestDelay(self.op.duration):
5114         raise errors.OpExecError("Error during master delay test")
5115     if self.op.on_nodes:
5116       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5117       if not result:
5118         raise errors.OpExecError("Complete failure from rpc call")
5119       for node, node_result in result.items():
5120         if not node_result:
5121           raise errors.OpExecError("Failure during rpc call to node %s,"
5122                                    " result: %s" % (node, node_result))
5123
5124
5125 class IAllocator(object):
5126   """IAllocator framework.
5127
5128   An IAllocator instance has three sets of attributes:
5129     - cfg that is needed to query the cluster
5130     - input data (all members of the _KEYS class attribute are required)
5131     - four buffer attributes (in|out_data|text), that represent the
5132       input (to the external script) in text and data structure format,
5133       and the output from it, again in two formats
5134     - the result variables from the script (success, info, nodes) for
5135       easy usage
5136
5137   """
5138   _ALLO_KEYS = [
5139     "mem_size", "disks", "disk_template",
5140     "os", "tags", "nics", "vcpus",
5141     ]
5142   _RELO_KEYS = [
5143     "relocate_from",
5144     ]
5145
5146   def __init__(self, lu, mode, name, **kwargs):
5147     self.lu = lu
5148     # init buffer variables
5149     self.in_text = self.out_text = self.in_data = self.out_data = None
5150     # init all input fields so that pylint is happy
5151     self.mode = mode
5152     self.name = name
5153     self.mem_size = self.disks = self.disk_template = None
5154     self.os = self.tags = self.nics = self.vcpus = None
5155     self.relocate_from = None
5156     # computed fields
5157     self.required_nodes = None
5158     # init result fields
5159     self.success = self.info = self.nodes = None
5160     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5161       keyset = self._ALLO_KEYS
5162     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5163       keyset = self._RELO_KEYS
5164     else:
5165       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5166                                    " IAllocator" % self.mode)
5167     for key in kwargs:
5168       if key not in keyset:
5169         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5170                                      " IAllocator" % key)
5171       setattr(self, key, kwargs[key])
5172     for key in keyset:
5173       if key not in kwargs:
5174         raise errors.ProgrammerError("Missing input parameter '%s' to"
5175                                      " IAllocator" % key)
5176     self._BuildInputData()
5177
5178   def _ComputeClusterData(self):
5179     """Compute the generic allocator input data.
5180
5181     This is the data that is independent of the actual operation.
5182
5183     """
5184     cfg = self.lu.cfg
5185     cluster_info = cfg.GetClusterInfo()
5186     # cluster data
5187     data = {
5188       "version": 1,
5189       "cluster_name": cfg.GetClusterName(),
5190       "cluster_tags": list(cluster_info.GetTags()),
5191       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5192       # we don't have job IDs
5193       }
5194
5195     i_list = []
5196     cluster = self.cfg.GetClusterInfo()
5197     for iname in cfg.GetInstanceList():
5198       i_obj = cfg.GetInstanceInfo(iname)
5199       i_list.append((i_obj, cluster.FillBE(i_obj)))
5200
5201     # node data
5202     node_results = {}
5203     node_list = cfg.GetNodeList()
5204     # FIXME: here we have only one hypervisor information, but
5205     # instance can belong to different hypervisors
5206     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5207                                            cfg.GetHypervisorType())
5208     for nname in node_list:
5209       ninfo = cfg.GetNodeInfo(nname)
5210       if nname not in node_data or not isinstance(node_data[nname], dict):
5211         raise errors.OpExecError("Can't get data for node %s" % nname)
5212       remote_info = node_data[nname]
5213       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5214                    'vg_size', 'vg_free', 'cpu_total']:
5215         if attr not in remote_info:
5216           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5217                                    (nname, attr))
5218         try:
5219           remote_info[attr] = int(remote_info[attr])
5220         except ValueError, err:
5221           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5222                                    " %s" % (nname, attr, str(err)))
5223       # compute memory used by primary instances
5224       i_p_mem = i_p_up_mem = 0
5225       for iinfo, beinfo in i_list:
5226         if iinfo.primary_node == nname:
5227           i_p_mem += beinfo[constants.BE_MEMORY]
5228           if iinfo.status == "up":
5229             i_p_up_mem += beinfo[constants.BE_MEMORY]
5230
5231       # compute memory used by instances
5232       pnr = {
5233         "tags": list(ninfo.GetTags()),
5234         "total_memory": remote_info['memory_total'],
5235         "reserved_memory": remote_info['memory_dom0'],
5236         "free_memory": remote_info['memory_free'],
5237         "i_pri_memory": i_p_mem,
5238         "i_pri_up_memory": i_p_up_mem,
5239         "total_disk": remote_info['vg_size'],
5240         "free_disk": remote_info['vg_free'],
5241         "primary_ip": ninfo.primary_ip,
5242         "secondary_ip": ninfo.secondary_ip,
5243         "total_cpus": remote_info['cpu_total'],
5244         }
5245       node_results[nname] = pnr
5246     data["nodes"] = node_results
5247
5248     # instance data
5249     instance_data = {}
5250     for iinfo, beinfo in i_list:
5251       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5252                   for n in iinfo.nics]
5253       pir = {
5254         "tags": list(iinfo.GetTags()),
5255         "should_run": iinfo.status == "up",
5256         "vcpus": beinfo[constants.BE_VCPUS],
5257         "memory": beinfo[constants.BE_MEMORY],
5258         "os": iinfo.os,
5259         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5260         "nics": nic_data,
5261         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5262         "disk_template": iinfo.disk_template,
5263         "hypervisor": iinfo.hypervisor,
5264         }
5265       instance_data[iinfo.name] = pir
5266
5267     data["instances"] = instance_data
5268
5269     self.in_data = data
5270
5271   def _AddNewInstance(self):
5272     """Add new instance data to allocator structure.
5273
5274     This in combination with _AllocatorGetClusterData will create the
5275     correct structure needed as input for the allocator.
5276
5277     The checks for the completeness of the opcode must have already been
5278     done.
5279
5280     """
5281     data = self.in_data
5282     if len(self.disks) != 2:
5283       raise errors.OpExecError("Only two-disk configurations supported")
5284
5285     disk_space = _ComputeDiskSize(self.disk_template,
5286                                   self.disks[0]["size"], self.disks[1]["size"])
5287
5288     if self.disk_template in constants.DTS_NET_MIRROR:
5289       self.required_nodes = 2
5290     else:
5291       self.required_nodes = 1
5292     request = {
5293       "type": "allocate",
5294       "name": self.name,
5295       "disk_template": self.disk_template,
5296       "tags": self.tags,
5297       "os": self.os,
5298       "vcpus": self.vcpus,
5299       "memory": self.mem_size,
5300       "disks": self.disks,
5301       "disk_space_total": disk_space,
5302       "nics": self.nics,
5303       "required_nodes": self.required_nodes,
5304       }
5305     data["request"] = request
5306
5307   def _AddRelocateInstance(self):
5308     """Add relocate instance data to allocator structure.
5309
5310     This in combination with _IAllocatorGetClusterData will create the
5311     correct structure needed as input for the allocator.
5312
5313     The checks for the completeness of the opcode must have already been
5314     done.
5315
5316     """
5317     instance = self.lu.cfg.GetInstanceInfo(self.name)
5318     if instance is None:
5319       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5320                                    " IAllocator" % self.name)
5321
5322     if instance.disk_template not in constants.DTS_NET_MIRROR:
5323       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5324
5325     if len(instance.secondary_nodes) != 1:
5326       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5327
5328     self.required_nodes = 1
5329
5330     disk_space = _ComputeDiskSize(instance.disk_template,
5331                                   instance.disks[0].size,
5332                                   instance.disks[1].size)
5333
5334     request = {
5335       "type": "relocate",
5336       "name": self.name,
5337       "disk_space_total": disk_space,
5338       "required_nodes": self.required_nodes,
5339       "relocate_from": self.relocate_from,
5340       }
5341     self.in_data["request"] = request
5342
5343   def _BuildInputData(self):
5344     """Build input data structures.
5345
5346     """
5347     self._ComputeClusterData()
5348
5349     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5350       self._AddNewInstance()
5351     else:
5352       self._AddRelocateInstance()
5353
5354     self.in_text = serializer.Dump(self.in_data)
5355
5356   def Run(self, name, validate=True, call_fn=None):
5357     """Run an instance allocator and return the results.
5358
5359     """
5360     if call_fn is None:
5361       call_fn = self.lu.rpc.call_iallocator_runner
5362     data = self.in_text
5363
5364     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5365
5366     if not isinstance(result, (list, tuple)) or len(result) != 4:
5367       raise errors.OpExecError("Invalid result from master iallocator runner")
5368
5369     rcode, stdout, stderr, fail = result
5370
5371     if rcode == constants.IARUN_NOTFOUND:
5372       raise errors.OpExecError("Can't find allocator '%s'" % name)
5373     elif rcode == constants.IARUN_FAILURE:
5374       raise errors.OpExecError("Instance allocator call failed: %s,"
5375                                " output: %s" % (fail, stdout+stderr))
5376     self.out_text = stdout
5377     if validate:
5378       self._ValidateResult()
5379
5380   def _ValidateResult(self):
5381     """Process the allocator results.
5382
5383     This will process and if successful save the result in
5384     self.out_data and the other parameters.
5385
5386     """
5387     try:
5388       rdict = serializer.Load(self.out_text)
5389     except Exception, err:
5390       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5391
5392     if not isinstance(rdict, dict):
5393       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5394
5395     for key in "success", "info", "nodes":
5396       if key not in rdict:
5397         raise errors.OpExecError("Can't parse iallocator results:"
5398                                  " missing key '%s'" % key)
5399       setattr(self, key, rdict[key])
5400
5401     if not isinstance(rdict["nodes"], list):
5402       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5403                                " is not a list")
5404     self.out_data = rdict
5405
5406
5407 class LUTestAllocator(NoHooksLU):
5408   """Run allocator tests.
5409
5410   This LU runs the allocator tests
5411
5412   """
5413   _OP_REQP = ["direction", "mode", "name"]
5414
5415   def CheckPrereq(self):
5416     """Check prerequisites.
5417
5418     This checks the opcode parameters depending on the director and mode test.
5419
5420     """
5421     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5422       for attr in ["name", "mem_size", "disks", "disk_template",
5423                    "os", "tags", "nics", "vcpus"]:
5424         if not hasattr(self.op, attr):
5425           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5426                                      attr)
5427       iname = self.cfg.ExpandInstanceName(self.op.name)
5428       if iname is not None:
5429         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5430                                    iname)
5431       if not isinstance(self.op.nics, list):
5432         raise errors.OpPrereqError("Invalid parameter 'nics'")
5433       for row in self.op.nics:
5434         if (not isinstance(row, dict) or
5435             "mac" not in row or
5436             "ip" not in row or
5437             "bridge" not in row):
5438           raise errors.OpPrereqError("Invalid contents of the"
5439                                      " 'nics' parameter")
5440       if not isinstance(self.op.disks, list):
5441         raise errors.OpPrereqError("Invalid parameter 'disks'")
5442       if len(self.op.disks) != 2:
5443         raise errors.OpPrereqError("Only two-disk configurations supported")
5444       for row in self.op.disks:
5445         if (not isinstance(row, dict) or
5446             "size" not in row or
5447             not isinstance(row["size"], int) or
5448             "mode" not in row or
5449             row["mode"] not in ['r', 'w']):
5450           raise errors.OpPrereqError("Invalid contents of the"
5451                                      " 'disks' parameter")
5452     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5453       if not hasattr(self.op, "name"):
5454         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5455       fname = self.cfg.ExpandInstanceName(self.op.name)
5456       if fname is None:
5457         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5458                                    self.op.name)
5459       self.op.name = fname
5460       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5461     else:
5462       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5463                                  self.op.mode)
5464
5465     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5466       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5467         raise errors.OpPrereqError("Missing allocator name")
5468     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5469       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5470                                  self.op.direction)
5471
5472   def Exec(self, feedback_fn):
5473     """Run the allocator test.
5474
5475     """
5476     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5477       ial = IAllocator(self,
5478                        mode=self.op.mode,
5479                        name=self.op.name,
5480                        mem_size=self.op.mem_size,
5481                        disks=self.op.disks,
5482                        disk_template=self.op.disk_template,
5483                        os=self.op.os,
5484                        tags=self.op.tags,
5485                        nics=self.op.nics,
5486                        vcpus=self.op.vcpus,
5487                        )
5488     else:
5489       ial = IAllocator(self,
5490                        mode=self.op.mode,
5491                        name=self.op.name,
5492                        relocate_from=list(self.relocate_from),
5493                        )
5494
5495     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5496       result = ial.in_text
5497     else:
5498       ial.Run(self.op.allocator, validate=False)
5499       result = ial.out_text
5500     return result