309fe3400c8a4bb4f9678b8ac40a08df96dab18f
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import sha
29 import time
30 import tempfile
31 import re
32 import platform
33 import logging
34 import copy
35
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import serializer
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_MASTER: the LU needs to run on the master node
59         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
60
61   Note that all commands require root permissions.
62
63   """
64   HPATH = None
65   HTYPE = None
66   _OP_REQP = []
67   REQ_MASTER = True
68   REQ_BGL = True
69
70   def __init__(self, processor, op, context, rpc):
71     """Constructor for LogicalUnit.
72
73     This needs to be overriden in derived classes in order to check op
74     validity.
75
76     """
77     self.proc = processor
78     self.op = op
79     self.cfg = context.cfg
80     self.context = context
81     self.rpc = rpc
82     # Dicts used to declare locking needs to mcpu
83     self.needed_locks = None
84     self.acquired_locks = {}
85     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
86     self.add_locks = {}
87     self.remove_locks = {}
88     # Used to force good behavior when calling helper functions
89     self.recalculate_locks = {}
90     self.__ssh = None
91
92     for attr_name in self._OP_REQP:
93       attr_val = getattr(op, attr_name, None)
94       if attr_val is None:
95         raise errors.OpPrereqError("Required parameter '%s' missing" %
96                                    attr_name)
97
98     if not self.cfg.IsCluster():
99       raise errors.OpPrereqError("Cluster not initialized yet,"
100                                  " use 'gnt-cluster init' first.")
101     if self.REQ_MASTER:
102       master = self.cfg.GetMasterNode()
103       if master != utils.HostInfo().name:
104         raise errors.OpPrereqError("Commands must be run on the master"
105                                    " node %s" % master)
106
107   def __GetSSH(self):
108     """Returns the SshRunner object
109
110     """
111     if not self.__ssh:
112       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
113     return self.__ssh
114
115   ssh = property(fget=__GetSSH)
116
117   def ExpandNames(self):
118     """Expand names for this LU.
119
120     This method is called before starting to execute the opcode, and it should
121     update all the parameters of the opcode to their canonical form (e.g. a
122     short node name must be fully expanded after this method has successfully
123     completed). This way locking, hooks, logging, ecc. can work correctly.
124
125     LUs which implement this method must also populate the self.needed_locks
126     member, as a dict with lock levels as keys, and a list of needed lock names
127     as values. Rules:
128       - Use an empty dict if you don't need any lock
129       - If you don't need any lock at a particular level omit that level
130       - Don't put anything for the BGL level
131       - If you want all locks at a level use locking.ALL_SET as a value
132
133     If you need to share locks (rather than acquire them exclusively) at one
134     level you can modify self.share_locks, setting a true value (usually 1) for
135     that level. By default locks are not shared.
136
137     Examples:
138     # Acquire all nodes and one instance
139     self.needed_locks = {
140       locking.LEVEL_NODE: locking.ALL_SET,
141       locking.LEVEL_INSTANCE: ['instance1.example.tld'],
142     }
143     # Acquire just two nodes
144     self.needed_locks = {
145       locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
146     }
147     # Acquire no locks
148     self.needed_locks = {} # No, you can't leave it to the default value None
149
150     """
151     # The implementation of this method is mandatory only if the new LU is
152     # concurrent, so that old LUs don't need to be changed all at the same
153     # time.
154     if self.REQ_BGL:
155       self.needed_locks = {} # Exclusive LUs don't need locks.
156     else:
157       raise NotImplementedError
158
159   def DeclareLocks(self, level):
160     """Declare LU locking needs for a level
161
162     While most LUs can just declare their locking needs at ExpandNames time,
163     sometimes there's the need to calculate some locks after having acquired
164     the ones before. This function is called just before acquiring locks at a
165     particular level, but after acquiring the ones at lower levels, and permits
166     such calculations. It can be used to modify self.needed_locks, and by
167     default it does nothing.
168
169     This function is only called if you have something already set in
170     self.needed_locks for the level.
171
172     @param level: Locking level which is going to be locked
173     @type level: member of ganeti.locking.LEVELS
174
175     """
176
177   def CheckPrereq(self):
178     """Check prerequisites for this LU.
179
180     This method should check that the prerequisites for the execution
181     of this LU are fulfilled. It can do internode communication, but
182     it should be idempotent - no cluster or system changes are
183     allowed.
184
185     The method should raise errors.OpPrereqError in case something is
186     not fulfilled. Its return value is ignored.
187
188     This method should also update all the parameters of the opcode to
189     their canonical form if it hasn't been done by ExpandNames before.
190
191     """
192     raise NotImplementedError
193
194   def Exec(self, feedback_fn):
195     """Execute the LU.
196
197     This method should implement the actual work. It should raise
198     errors.OpExecError for failures that are somewhat dealt with in
199     code, or expected.
200
201     """
202     raise NotImplementedError
203
204   def BuildHooksEnv(self):
205     """Build hooks environment for this LU.
206
207     This method should return a three-node tuple consisting of: a dict
208     containing the environment that will be used for running the
209     specific hook for this LU, a list of node names on which the hook
210     should run before the execution, and a list of node names on which
211     the hook should run after the execution.
212
213     The keys of the dict must not have 'GANETI_' prefixed as this will
214     be handled in the hooks runner. Also note additional keys will be
215     added by the hooks runner. If the LU doesn't define any
216     environment, an empty dict (and not None) should be returned.
217
218     No nodes should be returned as an empty list (and not None).
219
220     Note that if the HPATH for a LU class is None, this function will
221     not be called.
222
223     """
224     raise NotImplementedError
225
226   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
227     """Notify the LU about the results of its hooks.
228
229     This method is called every time a hooks phase is executed, and notifies
230     the Logical Unit about the hooks' result. The LU can then use it to alter
231     its result based on the hooks.  By default the method does nothing and the
232     previous result is passed back unchanged but any LU can define it if it
233     wants to use the local cluster hook-scripts somehow.
234
235     Args:
236       phase: the hooks phase that has just been run
237       hooks_results: the results of the multi-node hooks rpc call
238       feedback_fn: function to send feedback back to the caller
239       lu_result: the previous result this LU had, or None in the PRE phase.
240
241     """
242     return lu_result
243
244   def _ExpandAndLockInstance(self):
245     """Helper function to expand and lock an instance.
246
247     Many LUs that work on an instance take its name in self.op.instance_name
248     and need to expand it and then declare the expanded name for locking. This
249     function does it, and then updates self.op.instance_name to the expanded
250     name. It also initializes needed_locks as a dict, if this hasn't been done
251     before.
252
253     """
254     if self.needed_locks is None:
255       self.needed_locks = {}
256     else:
257       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
258         "_ExpandAndLockInstance called with instance-level locks set"
259     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
260     if expanded_name is None:
261       raise errors.OpPrereqError("Instance '%s' not known" %
262                                   self.op.instance_name)
263     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
264     self.op.instance_name = expanded_name
265
266   def _LockInstancesNodes(self, primary_only=False):
267     """Helper function to declare instances' nodes for locking.
268
269     This function should be called after locking one or more instances to lock
270     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
271     with all primary or secondary nodes for instances already locked and
272     present in self.needed_locks[locking.LEVEL_INSTANCE].
273
274     It should be called from DeclareLocks, and for safety only works if
275     self.recalculate_locks[locking.LEVEL_NODE] is set.
276
277     In the future it may grow parameters to just lock some instance's nodes, or
278     to just lock primaries or secondary nodes, if needed.
279
280     If should be called in DeclareLocks in a way similar to:
281
282     if level == locking.LEVEL_NODE:
283       self._LockInstancesNodes()
284
285     @type primary_only: boolean
286     @param primary_only: only lock primary nodes of locked instances
287
288     """
289     assert locking.LEVEL_NODE in self.recalculate_locks, \
290       "_LockInstancesNodes helper function called with no nodes to recalculate"
291
292     # TODO: check if we're really been called with the instance locks held
293
294     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
295     # future we might want to have different behaviors depending on the value
296     # of self.recalculate_locks[locking.LEVEL_NODE]
297     wanted_nodes = []
298     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
299       instance = self.context.cfg.GetInstanceInfo(instance_name)
300       wanted_nodes.append(instance.primary_node)
301       if not primary_only:
302         wanted_nodes.extend(instance.secondary_nodes)
303
304     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
305       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
306     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
307       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
308
309     del self.recalculate_locks[locking.LEVEL_NODE]
310
311
312 class NoHooksLU(LogicalUnit):
313   """Simple LU which runs no hooks.
314
315   This LU is intended as a parent for other LogicalUnits which will
316   run no hooks, in order to reduce duplicate code.
317
318   """
319   HPATH = None
320   HTYPE = None
321
322
323 def _GetWantedNodes(lu, nodes):
324   """Returns list of checked and expanded node names.
325
326   Args:
327     nodes: List of nodes (strings) or None for all
328
329   """
330   if not isinstance(nodes, list):
331     raise errors.OpPrereqError("Invalid argument type 'nodes'")
332
333   if not nodes:
334     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
335       " non-empty list of nodes whose name is to be expanded.")
336
337   wanted = []
338   for name in nodes:
339     node = lu.cfg.ExpandNodeName(name)
340     if node is None:
341       raise errors.OpPrereqError("No such node name '%s'" % name)
342     wanted.append(node)
343
344   return utils.NiceSort(wanted)
345
346
347 def _GetWantedInstances(lu, instances):
348   """Returns list of checked and expanded instance names.
349
350   Args:
351     instances: List of instances (strings) or None for all
352
353   """
354   if not isinstance(instances, list):
355     raise errors.OpPrereqError("Invalid argument type 'instances'")
356
357   if instances:
358     wanted = []
359
360     for name in instances:
361       instance = lu.cfg.ExpandInstanceName(name)
362       if instance is None:
363         raise errors.OpPrereqError("No such instance name '%s'" % name)
364       wanted.append(instance)
365
366   else:
367     wanted = lu.cfg.GetInstanceList()
368   return utils.NiceSort(wanted)
369
370
371 def _CheckOutputFields(static, dynamic, selected):
372   """Checks whether all selected fields are valid.
373
374   Args:
375     static: Static fields
376     dynamic: Dynamic fields
377
378   """
379   static_fields = frozenset(static)
380   dynamic_fields = frozenset(dynamic)
381
382   all_fields = static_fields | dynamic_fields
383
384   if not all_fields.issuperset(selected):
385     raise errors.OpPrereqError("Unknown output fields selected: %s"
386                                % ",".join(frozenset(selected).
387                                           difference(all_fields)))
388
389
390 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
391                           memory, vcpus, nics):
392   """Builds instance related env variables for hooks from single variables.
393
394   Args:
395     secondary_nodes: List of secondary nodes as strings
396   """
397   env = {
398     "OP_TARGET": name,
399     "INSTANCE_NAME": name,
400     "INSTANCE_PRIMARY": primary_node,
401     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
402     "INSTANCE_OS_TYPE": os_type,
403     "INSTANCE_STATUS": status,
404     "INSTANCE_MEMORY": memory,
405     "INSTANCE_VCPUS": vcpus,
406   }
407
408   if nics:
409     nic_count = len(nics)
410     for idx, (ip, bridge, mac) in enumerate(nics):
411       if ip is None:
412         ip = ""
413       env["INSTANCE_NIC%d_IP" % idx] = ip
414       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
415       env["INSTANCE_NIC%d_HWADDR" % idx] = mac
416   else:
417     nic_count = 0
418
419   env["INSTANCE_NIC_COUNT"] = nic_count
420
421   return env
422
423
424 def _BuildInstanceHookEnvByObject(instance, override=None):
425   """Builds instance related env variables for hooks from an object.
426
427   Args:
428     instance: objects.Instance object of instance
429     override: dict of values to override
430   """
431   args = {
432     'name': instance.name,
433     'primary_node': instance.primary_node,
434     'secondary_nodes': instance.secondary_nodes,
435     'os_type': instance.os,
436     'status': instance.os,
437     'memory': instance.memory,
438     'vcpus': instance.vcpus,
439     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
440   }
441   if override:
442     args.update(override)
443   return _BuildInstanceHookEnv(**args)
444
445
446 def _CheckInstanceBridgesExist(lu, instance):
447   """Check that the brigdes needed by an instance exist.
448
449   """
450   # check bridges existance
451   brlist = [nic.bridge for nic in instance.nics]
452   if not lu.rpc.call_bridges_exist(instance.primary_node, brlist):
453     raise errors.OpPrereqError("one or more target bridges %s does not"
454                                " exist on destination node '%s'" %
455                                (brlist, instance.primary_node))
456
457
458 class LUDestroyCluster(NoHooksLU):
459   """Logical unit for destroying the cluster.
460
461   """
462   _OP_REQP = []
463
464   def CheckPrereq(self):
465     """Check prerequisites.
466
467     This checks whether the cluster is empty.
468
469     Any errors are signalled by raising errors.OpPrereqError.
470
471     """
472     master = self.cfg.GetMasterNode()
473
474     nodelist = self.cfg.GetNodeList()
475     if len(nodelist) != 1 or nodelist[0] != master:
476       raise errors.OpPrereqError("There are still %d node(s) in"
477                                  " this cluster." % (len(nodelist) - 1))
478     instancelist = self.cfg.GetInstanceList()
479     if instancelist:
480       raise errors.OpPrereqError("There are still %d instance(s) in"
481                                  " this cluster." % len(instancelist))
482
483   def Exec(self, feedback_fn):
484     """Destroys the cluster.
485
486     """
487     master = self.cfg.GetMasterNode()
488     if not self.rpc.call_node_stop_master(master, False):
489       raise errors.OpExecError("Could not disable the master role")
490     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
491     utils.CreateBackup(priv_key)
492     utils.CreateBackup(pub_key)
493     return master
494
495
496 class LUVerifyCluster(LogicalUnit):
497   """Verifies the cluster status.
498
499   """
500   HPATH = "cluster-verify"
501   HTYPE = constants.HTYPE_CLUSTER
502   _OP_REQP = ["skip_checks"]
503   REQ_BGL = False
504
505   def ExpandNames(self):
506     self.needed_locks = {
507       locking.LEVEL_NODE: locking.ALL_SET,
508       locking.LEVEL_INSTANCE: locking.ALL_SET,
509     }
510     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
511
512   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
513                   remote_version, feedback_fn):
514     """Run multiple tests against a node.
515
516     Test list:
517       - compares ganeti version
518       - checks vg existance and size > 20G
519       - checks config file checksum
520       - checks ssh to other nodes
521
522     Args:
523       node: name of the node to check
524       file_list: required list of files
525       local_cksum: dictionary of local files and their checksums
526
527     """
528     # compares ganeti version
529     local_version = constants.PROTOCOL_VERSION
530     if not remote_version:
531       feedback_fn("  - ERROR: connection to %s failed" % (node))
532       return True
533
534     if local_version != remote_version:
535       feedback_fn("  - ERROR: sw version mismatch: master %s, node(%s) %s" %
536                       (local_version, node, remote_version))
537       return True
538
539     # checks vg existance and size > 20G
540
541     bad = False
542     if not vglist:
543       feedback_fn("  - ERROR: unable to check volume groups on node %s." %
544                       (node,))
545       bad = True
546     else:
547       vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
548                                             constants.MIN_VG_SIZE)
549       if vgstatus:
550         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
551         bad = True
552
553     if not node_result:
554       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
555       return True
556
557     # checks config file checksum
558     # checks ssh to any
559
560     if 'filelist' not in node_result:
561       bad = True
562       feedback_fn("  - ERROR: node hasn't returned file checksum data")
563     else:
564       remote_cksum = node_result['filelist']
565       for file_name in file_list:
566         if file_name not in remote_cksum:
567           bad = True
568           feedback_fn("  - ERROR: file '%s' missing" % file_name)
569         elif remote_cksum[file_name] != local_cksum[file_name]:
570           bad = True
571           feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
572
573     if 'nodelist' not in node_result:
574       bad = True
575       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
576     else:
577       if node_result['nodelist']:
578         bad = True
579         for node in node_result['nodelist']:
580           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
581                           (node, node_result['nodelist'][node]))
582     if 'node-net-test' not in node_result:
583       bad = True
584       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
585     else:
586       if node_result['node-net-test']:
587         bad = True
588         nlist = utils.NiceSort(node_result['node-net-test'].keys())
589         for node in nlist:
590           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
591                           (node, node_result['node-net-test'][node]))
592
593     hyp_result = node_result.get('hypervisor', None)
594     if isinstance(hyp_result, dict):
595       for hv_name, hv_result in hyp_result.iteritems():
596         if hv_result is not None:
597           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
598                       (hv_name, hv_result))
599     return bad
600
601   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
602                       node_instance, feedback_fn):
603     """Verify an instance.
604
605     This function checks to see if the required block devices are
606     available on the instance's node.
607
608     """
609     bad = False
610
611     node_current = instanceconfig.primary_node
612
613     node_vol_should = {}
614     instanceconfig.MapLVsByNode(node_vol_should)
615
616     for node in node_vol_should:
617       for volume in node_vol_should[node]:
618         if node not in node_vol_is or volume not in node_vol_is[node]:
619           feedback_fn("  - ERROR: volume %s missing on node %s" %
620                           (volume, node))
621           bad = True
622
623     if not instanceconfig.status == 'down':
624       if (node_current not in node_instance or
625           not instance in node_instance[node_current]):
626         feedback_fn("  - ERROR: instance %s not running on node %s" %
627                         (instance, node_current))
628         bad = True
629
630     for node in node_instance:
631       if (not node == node_current):
632         if instance in node_instance[node]:
633           feedback_fn("  - ERROR: instance %s should not run on node %s" %
634                           (instance, node))
635           bad = True
636
637     return bad
638
639   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
640     """Verify if there are any unknown volumes in the cluster.
641
642     The .os, .swap and backup volumes are ignored. All other volumes are
643     reported as unknown.
644
645     """
646     bad = False
647
648     for node in node_vol_is:
649       for volume in node_vol_is[node]:
650         if node not in node_vol_should or volume not in node_vol_should[node]:
651           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
652                       (volume, node))
653           bad = True
654     return bad
655
656   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
657     """Verify the list of running instances.
658
659     This checks what instances are running but unknown to the cluster.
660
661     """
662     bad = False
663     for node in node_instance:
664       for runninginstance in node_instance[node]:
665         if runninginstance not in instancelist:
666           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
667                           (runninginstance, node))
668           bad = True
669     return bad
670
671   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
672     """Verify N+1 Memory Resilience.
673
674     Check that if one single node dies we can still start all the instances it
675     was primary for.
676
677     """
678     bad = False
679
680     for node, nodeinfo in node_info.iteritems():
681       # This code checks that every node which is now listed as secondary has
682       # enough memory to host all instances it is supposed to should a single
683       # other node in the cluster fail.
684       # FIXME: not ready for failover to an arbitrary node
685       # FIXME: does not support file-backed instances
686       # WARNING: we currently take into account down instances as well as up
687       # ones, considering that even if they're down someone might want to start
688       # them even in the event of a node failure.
689       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
690         needed_mem = 0
691         for instance in instances:
692           needed_mem += instance_cfg[instance].memory
693         if nodeinfo['mfree'] < needed_mem:
694           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
695                       " failovers should node %s fail" % (node, prinode))
696           bad = True
697     return bad
698
699   def CheckPrereq(self):
700     """Check prerequisites.
701
702     Transform the list of checks we're going to skip into a set and check that
703     all its members are valid.
704
705     """
706     self.skip_set = frozenset(self.op.skip_checks)
707     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
708       raise errors.OpPrereqError("Invalid checks to be skipped specified")
709
710   def BuildHooksEnv(self):
711     """Build hooks env.
712
713     Cluster-Verify hooks just rone in the post phase and their failure makes
714     the output be logged in the verify output and the verification to fail.
715
716     """
717     all_nodes = self.cfg.GetNodeList()
718     # TODO: populate the environment with useful information for verify hooks
719     env = {}
720     return env, [], all_nodes
721
722   def Exec(self, feedback_fn):
723     """Verify integrity of cluster, performing various test on nodes.
724
725     """
726     bad = False
727     feedback_fn("* Verifying global settings")
728     for msg in self.cfg.VerifyConfig():
729       feedback_fn("  - ERROR: %s" % msg)
730
731     vg_name = self.cfg.GetVGName()
732     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
733     nodelist = utils.NiceSort(self.cfg.GetNodeList())
734     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
735     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
736     i_non_redundant = [] # Non redundant instances
737     node_volume = {}
738     node_instance = {}
739     node_info = {}
740     instance_cfg = {}
741
742     # FIXME: verify OS list
743     # do local checksums
744     file_names = []
745     file_names.append(constants.SSL_CERT_FILE)
746     file_names.append(constants.CLUSTER_CONF_FILE)
747     local_checksums = utils.FingerprintFiles(file_names)
748
749     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
750     all_volumeinfo = self.rpc.call_volume_list(nodelist, vg_name)
751     all_instanceinfo = self.rpc.call_instance_list(nodelist, hypervisors)
752     all_vglist = self.rpc.call_vg_list(nodelist)
753     node_verify_param = {
754       'filelist': file_names,
755       'nodelist': nodelist,
756       'hypervisor': hypervisors,
757       'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
758                         for node in nodeinfo]
759       }
760     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
761                                            self.cfg.GetClusterName())
762     all_rversion = self.rpc.call_version(nodelist)
763     all_ninfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
764                                         self.cfg.GetHypervisorType())
765
766     for node in nodelist:
767       feedback_fn("* Verifying node %s" % node)
768       result = self._VerifyNode(node, file_names, local_checksums,
769                                 all_vglist[node], all_nvinfo[node],
770                                 all_rversion[node], feedback_fn)
771       bad = bad or result
772
773       # node_volume
774       volumeinfo = all_volumeinfo[node]
775
776       if isinstance(volumeinfo, basestring):
777         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
778                     (node, volumeinfo[-400:].encode('string_escape')))
779         bad = True
780         node_volume[node] = {}
781       elif not isinstance(volumeinfo, dict):
782         feedback_fn("  - ERROR: connection to %s failed" % (node,))
783         bad = True
784         continue
785       else:
786         node_volume[node] = volumeinfo
787
788       # node_instance
789       nodeinstance = all_instanceinfo[node]
790       if type(nodeinstance) != list:
791         feedback_fn("  - ERROR: connection to %s failed" % (node,))
792         bad = True
793         continue
794
795       node_instance[node] = nodeinstance
796
797       # node_info
798       nodeinfo = all_ninfo[node]
799       if not isinstance(nodeinfo, dict):
800         feedback_fn("  - ERROR: connection to %s failed" % (node,))
801         bad = True
802         continue
803
804       try:
805         node_info[node] = {
806           "mfree": int(nodeinfo['memory_free']),
807           "dfree": int(nodeinfo['vg_free']),
808           "pinst": [],
809           "sinst": [],
810           # dictionary holding all instances this node is secondary for,
811           # grouped by their primary node. Each key is a cluster node, and each
812           # value is a list of instances which have the key as primary and the
813           # current node as secondary.  this is handy to calculate N+1 memory
814           # availability if you can only failover from a primary to its
815           # secondary.
816           "sinst-by-pnode": {},
817         }
818       except ValueError:
819         feedback_fn("  - ERROR: invalid value returned from node %s" % (node,))
820         bad = True
821         continue
822
823     node_vol_should = {}
824
825     for instance in instancelist:
826       feedback_fn("* Verifying instance %s" % instance)
827       inst_config = self.cfg.GetInstanceInfo(instance)
828       result =  self._VerifyInstance(instance, inst_config, node_volume,
829                                      node_instance, feedback_fn)
830       bad = bad or result
831
832       inst_config.MapLVsByNode(node_vol_should)
833
834       instance_cfg[instance] = inst_config
835
836       pnode = inst_config.primary_node
837       if pnode in node_info:
838         node_info[pnode]['pinst'].append(instance)
839       else:
840         feedback_fn("  - ERROR: instance %s, connection to primary node"
841                     " %s failed" % (instance, pnode))
842         bad = True
843
844       # If the instance is non-redundant we cannot survive losing its primary
845       # node, so we are not N+1 compliant. On the other hand we have no disk
846       # templates with more than one secondary so that situation is not well
847       # supported either.
848       # FIXME: does not support file-backed instances
849       if len(inst_config.secondary_nodes) == 0:
850         i_non_redundant.append(instance)
851       elif len(inst_config.secondary_nodes) > 1:
852         feedback_fn("  - WARNING: multiple secondaries for instance %s"
853                     % instance)
854
855       for snode in inst_config.secondary_nodes:
856         if snode in node_info:
857           node_info[snode]['sinst'].append(instance)
858           if pnode not in node_info[snode]['sinst-by-pnode']:
859             node_info[snode]['sinst-by-pnode'][pnode] = []
860           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
861         else:
862           feedback_fn("  - ERROR: instance %s, connection to secondary node"
863                       " %s failed" % (instance, snode))
864
865     feedback_fn("* Verifying orphan volumes")
866     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
867                                        feedback_fn)
868     bad = bad or result
869
870     feedback_fn("* Verifying remaining instances")
871     result = self._VerifyOrphanInstances(instancelist, node_instance,
872                                          feedback_fn)
873     bad = bad or result
874
875     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
876       feedback_fn("* Verifying N+1 Memory redundancy")
877       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
878       bad = bad or result
879
880     feedback_fn("* Other Notes")
881     if i_non_redundant:
882       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
883                   % len(i_non_redundant))
884
885     return not bad
886
887   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
888     """Analize the post-hooks' result, handle it, and send some
889     nicely-formatted feedback back to the user.
890
891     Args:
892       phase: the hooks phase that has just been run
893       hooks_results: the results of the multi-node hooks rpc call
894       feedback_fn: function to send feedback back to the caller
895       lu_result: previous Exec result
896
897     """
898     # We only really run POST phase hooks, and are only interested in
899     # their results
900     if phase == constants.HOOKS_PHASE_POST:
901       # Used to change hooks' output to proper indentation
902       indent_re = re.compile('^', re.M)
903       feedback_fn("* Hooks Results")
904       if not hooks_results:
905         feedback_fn("  - ERROR: general communication failure")
906         lu_result = 1
907       else:
908         for node_name in hooks_results:
909           show_node_header = True
910           res = hooks_results[node_name]
911           if res is False or not isinstance(res, list):
912             feedback_fn("    Communication failure")
913             lu_result = 1
914             continue
915           for script, hkr, output in res:
916             if hkr == constants.HKR_FAIL:
917               # The node header is only shown once, if there are
918               # failing hooks on that node
919               if show_node_header:
920                 feedback_fn("  Node %s:" % node_name)
921                 show_node_header = False
922               feedback_fn("    ERROR: Script %s failed, output:" % script)
923               output = indent_re.sub('      ', output)
924               feedback_fn("%s" % output)
925               lu_result = 1
926
927       return lu_result
928
929
930 class LUVerifyDisks(NoHooksLU):
931   """Verifies the cluster disks status.
932
933   """
934   _OP_REQP = []
935   REQ_BGL = False
936
937   def ExpandNames(self):
938     self.needed_locks = {
939       locking.LEVEL_NODE: locking.ALL_SET,
940       locking.LEVEL_INSTANCE: locking.ALL_SET,
941     }
942     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
943
944   def CheckPrereq(self):
945     """Check prerequisites.
946
947     This has no prerequisites.
948
949     """
950     pass
951
952   def Exec(self, feedback_fn):
953     """Verify integrity of cluster disks.
954
955     """
956     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
957
958     vg_name = self.cfg.GetVGName()
959     nodes = utils.NiceSort(self.cfg.GetNodeList())
960     instances = [self.cfg.GetInstanceInfo(name)
961                  for name in self.cfg.GetInstanceList()]
962
963     nv_dict = {}
964     for inst in instances:
965       inst_lvs = {}
966       if (inst.status != "up" or
967           inst.disk_template not in constants.DTS_NET_MIRROR):
968         continue
969       inst.MapLVsByNode(inst_lvs)
970       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
971       for node, vol_list in inst_lvs.iteritems():
972         for vol in vol_list:
973           nv_dict[(node, vol)] = inst
974
975     if not nv_dict:
976       return result
977
978     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
979
980     to_act = set()
981     for node in nodes:
982       # node_volume
983       lvs = node_lvs[node]
984
985       if isinstance(lvs, basestring):
986         logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
987         res_nlvm[node] = lvs
988       elif not isinstance(lvs, dict):
989         logger.Info("connection to node %s failed or invalid data returned" %
990                     (node,))
991         res_nodes.append(node)
992         continue
993
994       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
995         inst = nv_dict.pop((node, lv_name), None)
996         if (not lv_online and inst is not None
997             and inst.name not in res_instances):
998           res_instances.append(inst.name)
999
1000     # any leftover items in nv_dict are missing LVs, let's arrange the
1001     # data better
1002     for key, inst in nv_dict.iteritems():
1003       if inst.name not in res_missing:
1004         res_missing[inst.name] = []
1005       res_missing[inst.name].append(key)
1006
1007     return result
1008
1009
1010 class LURenameCluster(LogicalUnit):
1011   """Rename the cluster.
1012
1013   """
1014   HPATH = "cluster-rename"
1015   HTYPE = constants.HTYPE_CLUSTER
1016   _OP_REQP = ["name"]
1017
1018   def BuildHooksEnv(self):
1019     """Build hooks env.
1020
1021     """
1022     env = {
1023       "OP_TARGET": self.cfg.GetClusterName(),
1024       "NEW_NAME": self.op.name,
1025       }
1026     mn = self.cfg.GetMasterNode()
1027     return env, [mn], [mn]
1028
1029   def CheckPrereq(self):
1030     """Verify that the passed name is a valid one.
1031
1032     """
1033     hostname = utils.HostInfo(self.op.name)
1034
1035     new_name = hostname.name
1036     self.ip = new_ip = hostname.ip
1037     old_name = self.cfg.GetClusterName()
1038     old_ip = self.cfg.GetMasterIP()
1039     if new_name == old_name and new_ip == old_ip:
1040       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1041                                  " cluster has changed")
1042     if new_ip != old_ip:
1043       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1044         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1045                                    " reachable on the network. Aborting." %
1046                                    new_ip)
1047
1048     self.op.name = new_name
1049
1050   def Exec(self, feedback_fn):
1051     """Rename the cluster.
1052
1053     """
1054     clustername = self.op.name
1055     ip = self.ip
1056
1057     # shutdown the master IP
1058     master = self.cfg.GetMasterNode()
1059     if not self.rpc.call_node_stop_master(master, False):
1060       raise errors.OpExecError("Could not disable the master role")
1061
1062     try:
1063       # modify the sstore
1064       # TODO: sstore
1065       ss.SetKey(ss.SS_MASTER_IP, ip)
1066       ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1067
1068       # Distribute updated ss config to all nodes
1069       myself = self.cfg.GetNodeInfo(master)
1070       dist_nodes = self.cfg.GetNodeList()
1071       if myself.name in dist_nodes:
1072         dist_nodes.remove(myself.name)
1073
1074       logger.Debug("Copying updated ssconf data to all nodes")
1075       for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1076         fname = ss.KeyToFilename(keyname)
1077         result = self.rpc.call_upload_file(dist_nodes, fname)
1078         for to_node in dist_nodes:
1079           if not result[to_node]:
1080             logger.Error("copy of file %s to node %s failed" %
1081                          (fname, to_node))
1082     finally:
1083       if not self.rpc.call_node_start_master(master, False):
1084         logger.Error("Could not re-enable the master role on the master,"
1085                      " please restart manually.")
1086
1087
1088 def _RecursiveCheckIfLVMBased(disk):
1089   """Check if the given disk or its children are lvm-based.
1090
1091   Args:
1092     disk: ganeti.objects.Disk object
1093
1094   Returns:
1095     boolean indicating whether a LD_LV dev_type was found or not
1096
1097   """
1098   if disk.children:
1099     for chdisk in disk.children:
1100       if _RecursiveCheckIfLVMBased(chdisk):
1101         return True
1102   return disk.dev_type == constants.LD_LV
1103
1104
1105 class LUSetClusterParams(LogicalUnit):
1106   """Change the parameters of the cluster.
1107
1108   """
1109   HPATH = "cluster-modify"
1110   HTYPE = constants.HTYPE_CLUSTER
1111   _OP_REQP = []
1112   REQ_BGL = False
1113
1114   def ExpandNames(self):
1115     # FIXME: in the future maybe other cluster params won't require checking on
1116     # all nodes to be modified.
1117     self.needed_locks = {
1118       locking.LEVEL_NODE: locking.ALL_SET,
1119     }
1120     self.share_locks[locking.LEVEL_NODE] = 1
1121
1122   def BuildHooksEnv(self):
1123     """Build hooks env.
1124
1125     """
1126     env = {
1127       "OP_TARGET": self.cfg.GetClusterName(),
1128       "NEW_VG_NAME": self.op.vg_name,
1129       }
1130     mn = self.cfg.GetMasterNode()
1131     return env, [mn], [mn]
1132
1133   def CheckPrereq(self):
1134     """Check prerequisites.
1135
1136     This checks whether the given params don't conflict and
1137     if the given volume group is valid.
1138
1139     """
1140     # FIXME: This only works because there is only one parameter that can be
1141     # changed or removed.
1142     if not self.op.vg_name:
1143       instances = self.cfg.GetAllInstancesInfo().values()
1144       for inst in instances:
1145         for disk in inst.disks:
1146           if _RecursiveCheckIfLVMBased(disk):
1147             raise errors.OpPrereqError("Cannot disable lvm storage while"
1148                                        " lvm-based instances exist")
1149
1150     # if vg_name not None, checks given volume group on all nodes
1151     if self.op.vg_name:
1152       node_list = self.acquired_locks[locking.LEVEL_NODE]
1153       vglist = self.rpc.call_vg_list(node_list)
1154       for node in node_list:
1155         vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1156                                               constants.MIN_VG_SIZE)
1157         if vgstatus:
1158           raise errors.OpPrereqError("Error on node '%s': %s" %
1159                                      (node, vgstatus))
1160
1161   def Exec(self, feedback_fn):
1162     """Change the parameters of the cluster.
1163
1164     """
1165     if self.op.vg_name != self.cfg.GetVGName():
1166       self.cfg.SetVGName(self.op.vg_name)
1167     else:
1168       feedback_fn("Cluster LVM configuration already in desired"
1169                   " state, not changing")
1170
1171
1172 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1173   """Sleep and poll for an instance's disk to sync.
1174
1175   """
1176   if not instance.disks:
1177     return True
1178
1179   if not oneshot:
1180     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1181
1182   node = instance.primary_node
1183
1184   for dev in instance.disks:
1185     lu.cfg.SetDiskID(dev, node)
1186
1187   retries = 0
1188   while True:
1189     max_time = 0
1190     done = True
1191     cumul_degraded = False
1192     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1193     if not rstats:
1194       lu.proc.LogWarning("Can't get any data from node %s" % node)
1195       retries += 1
1196       if retries >= 10:
1197         raise errors.RemoteError("Can't contact node %s for mirror data,"
1198                                  " aborting." % node)
1199       time.sleep(6)
1200       continue
1201     retries = 0
1202     for i in range(len(rstats)):
1203       mstat = rstats[i]
1204       if mstat is None:
1205         lu.proc.LogWarning("Can't compute data for node %s/%s" %
1206                            (node, instance.disks[i].iv_name))
1207         continue
1208       # we ignore the ldisk parameter
1209       perc_done, est_time, is_degraded, _ = mstat
1210       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1211       if perc_done is not None:
1212         done = False
1213         if est_time is not None:
1214           rem_time = "%d estimated seconds remaining" % est_time
1215           max_time = est_time
1216         else:
1217           rem_time = "no time estimate"
1218         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1219                         (instance.disks[i].iv_name, perc_done, rem_time))
1220     if done or oneshot:
1221       break
1222
1223     time.sleep(min(60, max_time))
1224
1225   if done:
1226     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1227   return not cumul_degraded
1228
1229
1230 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1231   """Check that mirrors are not degraded.
1232
1233   The ldisk parameter, if True, will change the test from the
1234   is_degraded attribute (which represents overall non-ok status for
1235   the device(s)) to the ldisk (representing the local storage status).
1236
1237   """
1238   lu.cfg.SetDiskID(dev, node)
1239   if ldisk:
1240     idx = 6
1241   else:
1242     idx = 5
1243
1244   result = True
1245   if on_primary or dev.AssembleOnSecondary():
1246     rstats = lu.rpc.call_blockdev_find(node, dev)
1247     if not rstats:
1248       logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1249       result = False
1250     else:
1251       result = result and (not rstats[idx])
1252   if dev.children:
1253     for child in dev.children:
1254       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1255
1256   return result
1257
1258
1259 class LUDiagnoseOS(NoHooksLU):
1260   """Logical unit for OS diagnose/query.
1261
1262   """
1263   _OP_REQP = ["output_fields", "names"]
1264   REQ_BGL = False
1265
1266   def ExpandNames(self):
1267     if self.op.names:
1268       raise errors.OpPrereqError("Selective OS query not supported")
1269
1270     self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1271     _CheckOutputFields(static=[],
1272                        dynamic=self.dynamic_fields,
1273                        selected=self.op.output_fields)
1274
1275     # Lock all nodes, in shared mode
1276     self.needed_locks = {}
1277     self.share_locks[locking.LEVEL_NODE] = 1
1278     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1279
1280   def CheckPrereq(self):
1281     """Check prerequisites.
1282
1283     """
1284
1285   @staticmethod
1286   def _DiagnoseByOS(node_list, rlist):
1287     """Remaps a per-node return list into an a per-os per-node dictionary
1288
1289       Args:
1290         node_list: a list with the names of all nodes
1291         rlist: a map with node names as keys and OS objects as values
1292
1293       Returns:
1294         map: a map with osnames as keys and as value another map, with
1295              nodes as
1296              keys and list of OS objects as values
1297              e.g. {"debian-etch": {"node1": [<object>,...],
1298                                    "node2": [<object>,]}
1299                   }
1300
1301     """
1302     all_os = {}
1303     for node_name, nr in rlist.iteritems():
1304       if not nr:
1305         continue
1306       for os_obj in nr:
1307         if os_obj.name not in all_os:
1308           # build a list of nodes for this os containing empty lists
1309           # for each node in node_list
1310           all_os[os_obj.name] = {}
1311           for nname in node_list:
1312             all_os[os_obj.name][nname] = []
1313         all_os[os_obj.name][node_name].append(os_obj)
1314     return all_os
1315
1316   def Exec(self, feedback_fn):
1317     """Compute the list of OSes.
1318
1319     """
1320     node_list = self.acquired_locks[locking.LEVEL_NODE]
1321     node_data = self.rpc.call_os_diagnose(node_list)
1322     if node_data == False:
1323       raise errors.OpExecError("Can't gather the list of OSes")
1324     pol = self._DiagnoseByOS(node_list, node_data)
1325     output = []
1326     for os_name, os_data in pol.iteritems():
1327       row = []
1328       for field in self.op.output_fields:
1329         if field == "name":
1330           val = os_name
1331         elif field == "valid":
1332           val = utils.all([osl and osl[0] for osl in os_data.values()])
1333         elif field == "node_status":
1334           val = {}
1335           for node_name, nos_list in os_data.iteritems():
1336             val[node_name] = [(v.status, v.path) for v in nos_list]
1337         else:
1338           raise errors.ParameterError(field)
1339         row.append(val)
1340       output.append(row)
1341
1342     return output
1343
1344
1345 class LURemoveNode(LogicalUnit):
1346   """Logical unit for removing a node.
1347
1348   """
1349   HPATH = "node-remove"
1350   HTYPE = constants.HTYPE_NODE
1351   _OP_REQP = ["node_name"]
1352
1353   def BuildHooksEnv(self):
1354     """Build hooks env.
1355
1356     This doesn't run on the target node in the pre phase as a failed
1357     node would then be impossible to remove.
1358
1359     """
1360     env = {
1361       "OP_TARGET": self.op.node_name,
1362       "NODE_NAME": self.op.node_name,
1363       }
1364     all_nodes = self.cfg.GetNodeList()
1365     all_nodes.remove(self.op.node_name)
1366     return env, all_nodes, all_nodes
1367
1368   def CheckPrereq(self):
1369     """Check prerequisites.
1370
1371     This checks:
1372      - the node exists in the configuration
1373      - it does not have primary or secondary instances
1374      - it's not the master
1375
1376     Any errors are signalled by raising errors.OpPrereqError.
1377
1378     """
1379     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1380     if node is None:
1381       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1382
1383     instance_list = self.cfg.GetInstanceList()
1384
1385     masternode = self.cfg.GetMasterNode()
1386     if node.name == masternode:
1387       raise errors.OpPrereqError("Node is the master node,"
1388                                  " you need to failover first.")
1389
1390     for instance_name in instance_list:
1391       instance = self.cfg.GetInstanceInfo(instance_name)
1392       if node.name == instance.primary_node:
1393         raise errors.OpPrereqError("Instance %s still running on the node,"
1394                                    " please remove first." % instance_name)
1395       if node.name in instance.secondary_nodes:
1396         raise errors.OpPrereqError("Instance %s has node as a secondary,"
1397                                    " please remove first." % instance_name)
1398     self.op.node_name = node.name
1399     self.node = node
1400
1401   def Exec(self, feedback_fn):
1402     """Removes the node from the cluster.
1403
1404     """
1405     node = self.node
1406     logger.Info("stopping the node daemon and removing configs from node %s" %
1407                 node.name)
1408
1409     self.context.RemoveNode(node.name)
1410
1411     self.rpc.call_node_leave_cluster(node.name)
1412
1413
1414 class LUQueryNodes(NoHooksLU):
1415   """Logical unit for querying nodes.
1416
1417   """
1418   _OP_REQP = ["output_fields", "names"]
1419   REQ_BGL = False
1420
1421   def ExpandNames(self):
1422     self.dynamic_fields = frozenset([
1423       "dtotal", "dfree",
1424       "mtotal", "mnode", "mfree",
1425       "bootid",
1426       "ctotal",
1427       ])
1428
1429     self.static_fields = frozenset([
1430       "name", "pinst_cnt", "sinst_cnt",
1431       "pinst_list", "sinst_list",
1432       "pip", "sip", "tags",
1433       "serial_no",
1434       ])
1435
1436     _CheckOutputFields(static=self.static_fields,
1437                        dynamic=self.dynamic_fields,
1438                        selected=self.op.output_fields)
1439
1440     self.needed_locks = {}
1441     self.share_locks[locking.LEVEL_NODE] = 1
1442
1443     if self.op.names:
1444       self.wanted = _GetWantedNodes(self, self.op.names)
1445     else:
1446       self.wanted = locking.ALL_SET
1447
1448     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
1449     if self.do_locking:
1450       # if we don't request only static fields, we need to lock the nodes
1451       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1452
1453
1454   def CheckPrereq(self):
1455     """Check prerequisites.
1456
1457     """
1458     # The validation of the node list is done in the _GetWantedNodes,
1459     # if non empty, and if empty, there's no validation to do
1460     pass
1461
1462   def Exec(self, feedback_fn):
1463     """Computes the list of nodes and their attributes.
1464
1465     """
1466     all_info = self.cfg.GetAllNodesInfo()
1467     if self.do_locking:
1468       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1469     elif self.wanted != locking.ALL_SET:
1470       nodenames = self.wanted
1471       missing = set(nodenames).difference(all_info.keys())
1472       if missing:
1473         raise errors.OpExecError(
1474           "Some nodes were removed before retrieving their data: %s" % missing)
1475     else:
1476       nodenames = all_info.keys()
1477     nodelist = [all_info[name] for name in nodenames]
1478
1479     # begin data gathering
1480
1481     if self.dynamic_fields.intersection(self.op.output_fields):
1482       live_data = {}
1483       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
1484                                           self.cfg.GetHypervisorType())
1485       for name in nodenames:
1486         nodeinfo = node_data.get(name, None)
1487         if nodeinfo:
1488           live_data[name] = {
1489             "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1490             "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1491             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1492             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1493             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1494             "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1495             "bootid": nodeinfo['bootid'],
1496             }
1497         else:
1498           live_data[name] = {}
1499     else:
1500       live_data = dict.fromkeys(nodenames, {})
1501
1502     node_to_primary = dict([(name, set()) for name in nodenames])
1503     node_to_secondary = dict([(name, set()) for name in nodenames])
1504
1505     inst_fields = frozenset(("pinst_cnt", "pinst_list",
1506                              "sinst_cnt", "sinst_list"))
1507     if inst_fields & frozenset(self.op.output_fields):
1508       instancelist = self.cfg.GetInstanceList()
1509
1510       for instance_name in instancelist:
1511         inst = self.cfg.GetInstanceInfo(instance_name)
1512         if inst.primary_node in node_to_primary:
1513           node_to_primary[inst.primary_node].add(inst.name)
1514         for secnode in inst.secondary_nodes:
1515           if secnode in node_to_secondary:
1516             node_to_secondary[secnode].add(inst.name)
1517
1518     # end data gathering
1519
1520     output = []
1521     for node in nodelist:
1522       node_output = []
1523       for field in self.op.output_fields:
1524         if field == "name":
1525           val = node.name
1526         elif field == "pinst_list":
1527           val = list(node_to_primary[node.name])
1528         elif field == "sinst_list":
1529           val = list(node_to_secondary[node.name])
1530         elif field == "pinst_cnt":
1531           val = len(node_to_primary[node.name])
1532         elif field == "sinst_cnt":
1533           val = len(node_to_secondary[node.name])
1534         elif field == "pip":
1535           val = node.primary_ip
1536         elif field == "sip":
1537           val = node.secondary_ip
1538         elif field == "tags":
1539           val = list(node.GetTags())
1540         elif field == "serial_no":
1541           val = node.serial_no
1542         elif field in self.dynamic_fields:
1543           val = live_data[node.name].get(field, None)
1544         else:
1545           raise errors.ParameterError(field)
1546         node_output.append(val)
1547       output.append(node_output)
1548
1549     return output
1550
1551
1552 class LUQueryNodeVolumes(NoHooksLU):
1553   """Logical unit for getting volumes on node(s).
1554
1555   """
1556   _OP_REQP = ["nodes", "output_fields"]
1557   REQ_BGL = False
1558
1559   def ExpandNames(self):
1560     _CheckOutputFields(static=["node"],
1561                        dynamic=["phys", "vg", "name", "size", "instance"],
1562                        selected=self.op.output_fields)
1563
1564     self.needed_locks = {}
1565     self.share_locks[locking.LEVEL_NODE] = 1
1566     if not self.op.nodes:
1567       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1568     else:
1569       self.needed_locks[locking.LEVEL_NODE] = \
1570         _GetWantedNodes(self, self.op.nodes)
1571
1572   def CheckPrereq(self):
1573     """Check prerequisites.
1574
1575     This checks that the fields required are valid output fields.
1576
1577     """
1578     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
1579
1580   def Exec(self, feedback_fn):
1581     """Computes the list of nodes and their attributes.
1582
1583     """
1584     nodenames = self.nodes
1585     volumes = self.rpc.call_node_volumes(nodenames)
1586
1587     ilist = [self.cfg.GetInstanceInfo(iname) for iname
1588              in self.cfg.GetInstanceList()]
1589
1590     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1591
1592     output = []
1593     for node in nodenames:
1594       if node not in volumes or not volumes[node]:
1595         continue
1596
1597       node_vols = volumes[node][:]
1598       node_vols.sort(key=lambda vol: vol['dev'])
1599
1600       for vol in node_vols:
1601         node_output = []
1602         for field in self.op.output_fields:
1603           if field == "node":
1604             val = node
1605           elif field == "phys":
1606             val = vol['dev']
1607           elif field == "vg":
1608             val = vol['vg']
1609           elif field == "name":
1610             val = vol['name']
1611           elif field == "size":
1612             val = int(float(vol['size']))
1613           elif field == "instance":
1614             for inst in ilist:
1615               if node not in lv_by_node[inst]:
1616                 continue
1617               if vol['name'] in lv_by_node[inst][node]:
1618                 val = inst.name
1619                 break
1620             else:
1621               val = '-'
1622           else:
1623             raise errors.ParameterError(field)
1624           node_output.append(str(val))
1625
1626         output.append(node_output)
1627
1628     return output
1629
1630
1631 class LUAddNode(LogicalUnit):
1632   """Logical unit for adding node to the cluster.
1633
1634   """
1635   HPATH = "node-add"
1636   HTYPE = constants.HTYPE_NODE
1637   _OP_REQP = ["node_name"]
1638
1639   def BuildHooksEnv(self):
1640     """Build hooks env.
1641
1642     This will run on all nodes before, and on all nodes + the new node after.
1643
1644     """
1645     env = {
1646       "OP_TARGET": self.op.node_name,
1647       "NODE_NAME": self.op.node_name,
1648       "NODE_PIP": self.op.primary_ip,
1649       "NODE_SIP": self.op.secondary_ip,
1650       }
1651     nodes_0 = self.cfg.GetNodeList()
1652     nodes_1 = nodes_0 + [self.op.node_name, ]
1653     return env, nodes_0, nodes_1
1654
1655   def CheckPrereq(self):
1656     """Check prerequisites.
1657
1658     This checks:
1659      - the new node is not already in the config
1660      - it is resolvable
1661      - its parameters (single/dual homed) matches the cluster
1662
1663     Any errors are signalled by raising errors.OpPrereqError.
1664
1665     """
1666     node_name = self.op.node_name
1667     cfg = self.cfg
1668
1669     dns_data = utils.HostInfo(node_name)
1670
1671     node = dns_data.name
1672     primary_ip = self.op.primary_ip = dns_data.ip
1673     secondary_ip = getattr(self.op, "secondary_ip", None)
1674     if secondary_ip is None:
1675       secondary_ip = primary_ip
1676     if not utils.IsValidIP(secondary_ip):
1677       raise errors.OpPrereqError("Invalid secondary IP given")
1678     self.op.secondary_ip = secondary_ip
1679
1680     node_list = cfg.GetNodeList()
1681     if not self.op.readd and node in node_list:
1682       raise errors.OpPrereqError("Node %s is already in the configuration" %
1683                                  node)
1684     elif self.op.readd and node not in node_list:
1685       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1686
1687     for existing_node_name in node_list:
1688       existing_node = cfg.GetNodeInfo(existing_node_name)
1689
1690       if self.op.readd and node == existing_node_name:
1691         if (existing_node.primary_ip != primary_ip or
1692             existing_node.secondary_ip != secondary_ip):
1693           raise errors.OpPrereqError("Readded node doesn't have the same IP"
1694                                      " address configuration as before")
1695         continue
1696
1697       if (existing_node.primary_ip == primary_ip or
1698           existing_node.secondary_ip == primary_ip or
1699           existing_node.primary_ip == secondary_ip or
1700           existing_node.secondary_ip == secondary_ip):
1701         raise errors.OpPrereqError("New node ip address(es) conflict with"
1702                                    " existing node %s" % existing_node.name)
1703
1704     # check that the type of the node (single versus dual homed) is the
1705     # same as for the master
1706     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
1707     master_singlehomed = myself.secondary_ip == myself.primary_ip
1708     newbie_singlehomed = secondary_ip == primary_ip
1709     if master_singlehomed != newbie_singlehomed:
1710       if master_singlehomed:
1711         raise errors.OpPrereqError("The master has no private ip but the"
1712                                    " new node has one")
1713       else:
1714         raise errors.OpPrereqError("The master has a private ip but the"
1715                                    " new node doesn't have one")
1716
1717     # checks reachablity
1718     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1719       raise errors.OpPrereqError("Node not reachable by ping")
1720
1721     if not newbie_singlehomed:
1722       # check reachability from my secondary ip to newbie's secondary ip
1723       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1724                            source=myself.secondary_ip):
1725         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1726                                    " based ping to noded port")
1727
1728     self.new_node = objects.Node(name=node,
1729                                  primary_ip=primary_ip,
1730                                  secondary_ip=secondary_ip)
1731
1732   def Exec(self, feedback_fn):
1733     """Adds the new node to the cluster.
1734
1735     """
1736     new_node = self.new_node
1737     node = new_node.name
1738
1739     # check connectivity
1740     result = self.rpc.call_version([node])[node]
1741     if result:
1742       if constants.PROTOCOL_VERSION == result:
1743         logger.Info("communication to node %s fine, sw version %s match" %
1744                     (node, result))
1745       else:
1746         raise errors.OpExecError("Version mismatch master version %s,"
1747                                  " node version %s" %
1748                                  (constants.PROTOCOL_VERSION, result))
1749     else:
1750       raise errors.OpExecError("Cannot get version from the new node")
1751
1752     # setup ssh on node
1753     logger.Info("copy ssh key to node %s" % node)
1754     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1755     keyarray = []
1756     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1757                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1758                 priv_key, pub_key]
1759
1760     for i in keyfiles:
1761       f = open(i, 'r')
1762       try:
1763         keyarray.append(f.read())
1764       finally:
1765         f.close()
1766
1767     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
1768                                     keyarray[2],
1769                                     keyarray[3], keyarray[4], keyarray[5])
1770
1771     if not result:
1772       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1773
1774     # Add node to our /etc/hosts, and add key to known_hosts
1775     utils.AddHostToEtcHosts(new_node.name)
1776
1777     if new_node.secondary_ip != new_node.primary_ip:
1778       if not self.rpc.call_node_has_ip_address(new_node.name,
1779                                                new_node.secondary_ip):
1780         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1781                                  " you gave (%s). Please fix and re-run this"
1782                                  " command." % new_node.secondary_ip)
1783
1784     node_verify_list = [self.cfg.GetMasterNode()]
1785     node_verify_param = {
1786       'nodelist': [node],
1787       # TODO: do a node-net-test as well?
1788     }
1789
1790     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
1791                                        self.cfg.GetClusterName())
1792     for verifier in node_verify_list:
1793       if not result[verifier]:
1794         raise errors.OpExecError("Cannot communicate with %s's node daemon"
1795                                  " for remote verification" % verifier)
1796       if result[verifier]['nodelist']:
1797         for failed in result[verifier]['nodelist']:
1798           feedback_fn("ssh/hostname verification failed %s -> %s" %
1799                       (verifier, result[verifier]['nodelist'][failed]))
1800         raise errors.OpExecError("ssh/hostname verification failed.")
1801
1802     # Distribute updated /etc/hosts and known_hosts to all nodes,
1803     # including the node just added
1804     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
1805     dist_nodes = self.cfg.GetNodeList()
1806     if not self.op.readd:
1807       dist_nodes.append(node)
1808     if myself.name in dist_nodes:
1809       dist_nodes.remove(myself.name)
1810
1811     logger.Debug("Copying hosts and known_hosts to all nodes")
1812     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1813       result = self.rpc.call_upload_file(dist_nodes, fname)
1814       for to_node in dist_nodes:
1815         if not result[to_node]:
1816           logger.Error("copy of file %s to node %s failed" %
1817                        (fname, to_node))
1818
1819     to_copy = []
1820     if constants.HT_XEN_HVM in self.cfg.GetClusterInfo().enabled_hypervisors:
1821       to_copy.append(constants.VNC_PASSWORD_FILE)
1822     for fname in to_copy:
1823       result = self.rpc.call_upload_file([node], fname)
1824       if not result[node]:
1825         logger.Error("could not copy file %s to node %s" % (fname, node))
1826
1827     if self.op.readd:
1828       self.context.ReaddNode(new_node)
1829     else:
1830       self.context.AddNode(new_node)
1831
1832
1833 class LUQueryClusterInfo(NoHooksLU):
1834   """Query cluster configuration.
1835
1836   """
1837   _OP_REQP = []
1838   REQ_MASTER = False
1839   REQ_BGL = False
1840
1841   def ExpandNames(self):
1842     self.needed_locks = {}
1843
1844   def CheckPrereq(self):
1845     """No prerequsites needed for this LU.
1846
1847     """
1848     pass
1849
1850   def Exec(self, feedback_fn):
1851     """Return cluster config.
1852
1853     """
1854     result = {
1855       "name": self.cfg.GetClusterName(),
1856       "software_version": constants.RELEASE_VERSION,
1857       "protocol_version": constants.PROTOCOL_VERSION,
1858       "config_version": constants.CONFIG_VERSION,
1859       "os_api_version": constants.OS_API_VERSION,
1860       "export_version": constants.EXPORT_VERSION,
1861       "master": self.cfg.GetMasterNode(),
1862       "architecture": (platform.architecture()[0], platform.machine()),
1863       "hypervisor_type": self.cfg.GetHypervisorType(),
1864       "enabled_hypervisors": self.cfg.GetClusterInfo().enabled_hypervisors,
1865       }
1866
1867     return result
1868
1869
1870 class LUQueryConfigValues(NoHooksLU):
1871   """Return configuration values.
1872
1873   """
1874   _OP_REQP = []
1875   REQ_BGL = False
1876
1877   def ExpandNames(self):
1878     self.needed_locks = {}
1879
1880     static_fields = ["cluster_name", "master_node"]
1881     _CheckOutputFields(static=static_fields,
1882                        dynamic=[],
1883                        selected=self.op.output_fields)
1884
1885   def CheckPrereq(self):
1886     """No prerequisites.
1887
1888     """
1889     pass
1890
1891   def Exec(self, feedback_fn):
1892     """Dump a representation of the cluster config to the standard output.
1893
1894     """
1895     values = []
1896     for field in self.op.output_fields:
1897       if field == "cluster_name":
1898         values.append(self.cfg.GetClusterName())
1899       elif field == "master_node":
1900         values.append(self.cfg.GetMasterNode())
1901       else:
1902         raise errors.ParameterError(field)
1903     return values
1904
1905
1906 class LUActivateInstanceDisks(NoHooksLU):
1907   """Bring up an instance's disks.
1908
1909   """
1910   _OP_REQP = ["instance_name"]
1911   REQ_BGL = False
1912
1913   def ExpandNames(self):
1914     self._ExpandAndLockInstance()
1915     self.needed_locks[locking.LEVEL_NODE] = []
1916     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1917
1918   def DeclareLocks(self, level):
1919     if level == locking.LEVEL_NODE:
1920       self._LockInstancesNodes()
1921
1922   def CheckPrereq(self):
1923     """Check prerequisites.
1924
1925     This checks that the instance is in the cluster.
1926
1927     """
1928     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1929     assert self.instance is not None, \
1930       "Cannot retrieve locked instance %s" % self.op.instance_name
1931
1932   def Exec(self, feedback_fn):
1933     """Activate the disks.
1934
1935     """
1936     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
1937     if not disks_ok:
1938       raise errors.OpExecError("Cannot activate block devices")
1939
1940     return disks_info
1941
1942
1943 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
1944   """Prepare the block devices for an instance.
1945
1946   This sets up the block devices on all nodes.
1947
1948   Args:
1949     instance: a ganeti.objects.Instance object
1950     ignore_secondaries: if true, errors on secondary nodes won't result
1951                         in an error return from the function
1952
1953   Returns:
1954     false if the operation failed
1955     list of (host, instance_visible_name, node_visible_name) if the operation
1956          suceeded with the mapping from node devices to instance devices
1957   """
1958   device_info = []
1959   disks_ok = True
1960   iname = instance.name
1961   # With the two passes mechanism we try to reduce the window of
1962   # opportunity for the race condition of switching DRBD to primary
1963   # before handshaking occured, but we do not eliminate it
1964
1965   # The proper fix would be to wait (with some limits) until the
1966   # connection has been made and drbd transitions from WFConnection
1967   # into any other network-connected state (Connected, SyncTarget,
1968   # SyncSource, etc.)
1969
1970   # 1st pass, assemble on all nodes in secondary mode
1971   for inst_disk in instance.disks:
1972     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1973       lu.cfg.SetDiskID(node_disk, node)
1974       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
1975       if not result:
1976         logger.Error("could not prepare block device %s on node %s"
1977                      " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1978         if not ignore_secondaries:
1979           disks_ok = False
1980
1981   # FIXME: race condition on drbd migration to primary
1982
1983   # 2nd pass, do only the primary node
1984   for inst_disk in instance.disks:
1985     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1986       if node != instance.primary_node:
1987         continue
1988       lu.cfg.SetDiskID(node_disk, node)
1989       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
1990       if not result:
1991         logger.Error("could not prepare block device %s on node %s"
1992                      " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1993         disks_ok = False
1994     device_info.append((instance.primary_node, inst_disk.iv_name, result))
1995
1996   # leave the disks configured for the primary node
1997   # this is a workaround that would be fixed better by
1998   # improving the logical/physical id handling
1999   for disk in instance.disks:
2000     lu.cfg.SetDiskID(disk, instance.primary_node)
2001
2002   return disks_ok, device_info
2003
2004
2005 def _StartInstanceDisks(lu, instance, force):
2006   """Start the disks of an instance.
2007
2008   """
2009   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2010                                            ignore_secondaries=force)
2011   if not disks_ok:
2012     _ShutdownInstanceDisks(lu, instance)
2013     if force is not None and not force:
2014       logger.Error("If the message above refers to a secondary node,"
2015                    " you can retry the operation using '--force'.")
2016     raise errors.OpExecError("Disk consistency error")
2017
2018
2019 class LUDeactivateInstanceDisks(NoHooksLU):
2020   """Shutdown an instance's disks.
2021
2022   """
2023   _OP_REQP = ["instance_name"]
2024   REQ_BGL = False
2025
2026   def ExpandNames(self):
2027     self._ExpandAndLockInstance()
2028     self.needed_locks[locking.LEVEL_NODE] = []
2029     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2030
2031   def DeclareLocks(self, level):
2032     if level == locking.LEVEL_NODE:
2033       self._LockInstancesNodes()
2034
2035   def CheckPrereq(self):
2036     """Check prerequisites.
2037
2038     This checks that the instance is in the cluster.
2039
2040     """
2041     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2042     assert self.instance is not None, \
2043       "Cannot retrieve locked instance %s" % self.op.instance_name
2044
2045   def Exec(self, feedback_fn):
2046     """Deactivate the disks
2047
2048     """
2049     instance = self.instance
2050     _SafeShutdownInstanceDisks(self, instance)
2051
2052
2053 def _SafeShutdownInstanceDisks(lu, instance):
2054   """Shutdown block devices of an instance.
2055
2056   This function checks if an instance is running, before calling
2057   _ShutdownInstanceDisks.
2058
2059   """
2060   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2061                                       [instance.hypervisor])
2062   ins_l = ins_l[instance.primary_node]
2063   if not type(ins_l) is list:
2064     raise errors.OpExecError("Can't contact node '%s'" %
2065                              instance.primary_node)
2066
2067   if instance.name in ins_l:
2068     raise errors.OpExecError("Instance is running, can't shutdown"
2069                              " block devices.")
2070
2071   _ShutdownInstanceDisks(lu, instance)
2072
2073
2074 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2075   """Shutdown block devices of an instance.
2076
2077   This does the shutdown on all nodes of the instance.
2078
2079   If the ignore_primary is false, errors on the primary node are
2080   ignored.
2081
2082   """
2083   result = True
2084   for disk in instance.disks:
2085     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2086       lu.cfg.SetDiskID(top_disk, node)
2087       if not lu.rpc.call_blockdev_shutdown(node, top_disk):
2088         logger.Error("could not shutdown block device %s on node %s" %
2089                      (disk.iv_name, node))
2090         if not ignore_primary or node != instance.primary_node:
2091           result = False
2092   return result
2093
2094
2095 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor):
2096   """Checks if a node has enough free memory.
2097
2098   This function check if a given node has the needed amount of free
2099   memory. In case the node has less memory or we cannot get the
2100   information from the node, this function raise an OpPrereqError
2101   exception.
2102
2103   @type lu: C{LogicalUnit}
2104   @param lu: a logical unit from which we get configuration data
2105   @type node: C{str}
2106   @param node: the node to check
2107   @type reason: C{str}
2108   @param reason: string to use in the error message
2109   @type requested: C{int}
2110   @param requested: the amount of memory in MiB to check for
2111   @type hypervisor: C{str}
2112   @param hypervisor: the hypervisor to ask for memory stats
2113   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2114       we cannot check the node
2115
2116   """
2117   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor)
2118   if not nodeinfo or not isinstance(nodeinfo, dict):
2119     raise errors.OpPrereqError("Could not contact node %s for resource"
2120                              " information" % (node,))
2121
2122   free_mem = nodeinfo[node].get('memory_free')
2123   if not isinstance(free_mem, int):
2124     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2125                              " was '%s'" % (node, free_mem))
2126   if requested > free_mem:
2127     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2128                              " needed %s MiB, available %s MiB" %
2129                              (node, reason, requested, free_mem))
2130
2131
2132 class LUStartupInstance(LogicalUnit):
2133   """Starts an instance.
2134
2135   """
2136   HPATH = "instance-start"
2137   HTYPE = constants.HTYPE_INSTANCE
2138   _OP_REQP = ["instance_name", "force"]
2139   REQ_BGL = False
2140
2141   def ExpandNames(self):
2142     self._ExpandAndLockInstance()
2143     self.needed_locks[locking.LEVEL_NODE] = []
2144     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2145
2146   def DeclareLocks(self, level):
2147     if level == locking.LEVEL_NODE:
2148       self._LockInstancesNodes()
2149
2150   def BuildHooksEnv(self):
2151     """Build hooks env.
2152
2153     This runs on master, primary and secondary nodes of the instance.
2154
2155     """
2156     env = {
2157       "FORCE": self.op.force,
2158       }
2159     env.update(_BuildInstanceHookEnvByObject(self.instance))
2160     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2161           list(self.instance.secondary_nodes))
2162     return env, nl, nl
2163
2164   def CheckPrereq(self):
2165     """Check prerequisites.
2166
2167     This checks that the instance is in the cluster.
2168
2169     """
2170     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2171     assert self.instance is not None, \
2172       "Cannot retrieve locked instance %s" % self.op.instance_name
2173
2174     # check bridges existance
2175     _CheckInstanceBridgesExist(self, instance)
2176
2177     _CheckNodeFreeMemory(self, instance.primary_node,
2178                          "starting instance %s" % instance.name,
2179                          instance.memory, instance.hypervisor)
2180
2181   def Exec(self, feedback_fn):
2182     """Start the instance.
2183
2184     """
2185     instance = self.instance
2186     force = self.op.force
2187     extra_args = getattr(self.op, "extra_args", "")
2188
2189     self.cfg.MarkInstanceUp(instance.name)
2190
2191     node_current = instance.primary_node
2192
2193     _StartInstanceDisks(self, instance, force)
2194
2195     if not self.rpc.call_instance_start(node_current, instance, extra_args):
2196       _ShutdownInstanceDisks(self, instance)
2197       raise errors.OpExecError("Could not start instance")
2198
2199
2200 class LURebootInstance(LogicalUnit):
2201   """Reboot an instance.
2202
2203   """
2204   HPATH = "instance-reboot"
2205   HTYPE = constants.HTYPE_INSTANCE
2206   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2207   REQ_BGL = False
2208
2209   def ExpandNames(self):
2210     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2211                                    constants.INSTANCE_REBOOT_HARD,
2212                                    constants.INSTANCE_REBOOT_FULL]:
2213       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2214                                   (constants.INSTANCE_REBOOT_SOFT,
2215                                    constants.INSTANCE_REBOOT_HARD,
2216                                    constants.INSTANCE_REBOOT_FULL))
2217     self._ExpandAndLockInstance()
2218     self.needed_locks[locking.LEVEL_NODE] = []
2219     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2220
2221   def DeclareLocks(self, level):
2222     if level == locking.LEVEL_NODE:
2223       primary_only = not constants.INSTANCE_REBOOT_FULL
2224       self._LockInstancesNodes(primary_only=primary_only)
2225
2226   def BuildHooksEnv(self):
2227     """Build hooks env.
2228
2229     This runs on master, primary and secondary nodes of the instance.
2230
2231     """
2232     env = {
2233       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2234       }
2235     env.update(_BuildInstanceHookEnvByObject(self.instance))
2236     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2237           list(self.instance.secondary_nodes))
2238     return env, nl, nl
2239
2240   def CheckPrereq(self):
2241     """Check prerequisites.
2242
2243     This checks that the instance is in the cluster.
2244
2245     """
2246     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2247     assert self.instance is not None, \
2248       "Cannot retrieve locked instance %s" % self.op.instance_name
2249
2250     # check bridges existance
2251     _CheckInstanceBridgesExist(self, instance)
2252
2253   def Exec(self, feedback_fn):
2254     """Reboot the instance.
2255
2256     """
2257     instance = self.instance
2258     ignore_secondaries = self.op.ignore_secondaries
2259     reboot_type = self.op.reboot_type
2260     extra_args = getattr(self.op, "extra_args", "")
2261
2262     node_current = instance.primary_node
2263
2264     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2265                        constants.INSTANCE_REBOOT_HARD]:
2266       if not self.rpc.call_instance_reboot(node_current, instance,
2267                                            reboot_type, extra_args):
2268         raise errors.OpExecError("Could not reboot instance")
2269     else:
2270       if not self.rpc.call_instance_shutdown(node_current, instance):
2271         raise errors.OpExecError("could not shutdown instance for full reboot")
2272       _ShutdownInstanceDisks(self, instance)
2273       _StartInstanceDisks(self, instance, ignore_secondaries)
2274       if not self.rpc.call_instance_start(node_current, instance, extra_args):
2275         _ShutdownInstanceDisks(self, instance)
2276         raise errors.OpExecError("Could not start instance for full reboot")
2277
2278     self.cfg.MarkInstanceUp(instance.name)
2279
2280
2281 class LUShutdownInstance(LogicalUnit):
2282   """Shutdown an instance.
2283
2284   """
2285   HPATH = "instance-stop"
2286   HTYPE = constants.HTYPE_INSTANCE
2287   _OP_REQP = ["instance_name"]
2288   REQ_BGL = False
2289
2290   def ExpandNames(self):
2291     self._ExpandAndLockInstance()
2292     self.needed_locks[locking.LEVEL_NODE] = []
2293     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2294
2295   def DeclareLocks(self, level):
2296     if level == locking.LEVEL_NODE:
2297       self._LockInstancesNodes()
2298
2299   def BuildHooksEnv(self):
2300     """Build hooks env.
2301
2302     This runs on master, primary and secondary nodes of the instance.
2303
2304     """
2305     env = _BuildInstanceHookEnvByObject(self.instance)
2306     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2307           list(self.instance.secondary_nodes))
2308     return env, nl, nl
2309
2310   def CheckPrereq(self):
2311     """Check prerequisites.
2312
2313     This checks that the instance is in the cluster.
2314
2315     """
2316     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2317     assert self.instance is not None, \
2318       "Cannot retrieve locked instance %s" % self.op.instance_name
2319
2320   def Exec(self, feedback_fn):
2321     """Shutdown the instance.
2322
2323     """
2324     instance = self.instance
2325     node_current = instance.primary_node
2326     self.cfg.MarkInstanceDown(instance.name)
2327     if not self.rpc.call_instance_shutdown(node_current, instance):
2328       logger.Error("could not shutdown instance")
2329
2330     _ShutdownInstanceDisks(self, instance)
2331
2332
2333 class LUReinstallInstance(LogicalUnit):
2334   """Reinstall an instance.
2335
2336   """
2337   HPATH = "instance-reinstall"
2338   HTYPE = constants.HTYPE_INSTANCE
2339   _OP_REQP = ["instance_name"]
2340   REQ_BGL = False
2341
2342   def ExpandNames(self):
2343     self._ExpandAndLockInstance()
2344     self.needed_locks[locking.LEVEL_NODE] = []
2345     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2346
2347   def DeclareLocks(self, level):
2348     if level == locking.LEVEL_NODE:
2349       self._LockInstancesNodes()
2350
2351   def BuildHooksEnv(self):
2352     """Build hooks env.
2353
2354     This runs on master, primary and secondary nodes of the instance.
2355
2356     """
2357     env = _BuildInstanceHookEnvByObject(self.instance)
2358     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2359           list(self.instance.secondary_nodes))
2360     return env, nl, nl
2361
2362   def CheckPrereq(self):
2363     """Check prerequisites.
2364
2365     This checks that the instance is in the cluster and is not running.
2366
2367     """
2368     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2369     assert instance is not None, \
2370       "Cannot retrieve locked instance %s" % self.op.instance_name
2371
2372     if instance.disk_template == constants.DT_DISKLESS:
2373       raise errors.OpPrereqError("Instance '%s' has no disks" %
2374                                  self.op.instance_name)
2375     if instance.status != "down":
2376       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2377                                  self.op.instance_name)
2378     remote_info = self.rpc.call_instance_info(instance.primary_node,
2379                                               instance.name,
2380                                               instance.hypervisor)
2381     if remote_info:
2382       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2383                                  (self.op.instance_name,
2384                                   instance.primary_node))
2385
2386     self.op.os_type = getattr(self.op, "os_type", None)
2387     if self.op.os_type is not None:
2388       # OS verification
2389       pnode = self.cfg.GetNodeInfo(
2390         self.cfg.ExpandNodeName(instance.primary_node))
2391       if pnode is None:
2392         raise errors.OpPrereqError("Primary node '%s' is unknown" %
2393                                    self.op.pnode)
2394       os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
2395       if not os_obj:
2396         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2397                                    " primary node"  % self.op.os_type)
2398
2399     self.instance = instance
2400
2401   def Exec(self, feedback_fn):
2402     """Reinstall the instance.
2403
2404     """
2405     inst = self.instance
2406
2407     if self.op.os_type is not None:
2408       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2409       inst.os = self.op.os_type
2410       self.cfg.Update(inst)
2411
2412     _StartInstanceDisks(self, inst, None)
2413     try:
2414       feedback_fn("Running the instance OS create scripts...")
2415       if not self.rpc.call_instance_os_add(inst.primary_node, inst,
2416                                            "sda", "sdb"):
2417         raise errors.OpExecError("Could not install OS for instance %s"
2418                                  " on node %s" %
2419                                  (inst.name, inst.primary_node))
2420     finally:
2421       _ShutdownInstanceDisks(self, inst)
2422
2423
2424 class LURenameInstance(LogicalUnit):
2425   """Rename an instance.
2426
2427   """
2428   HPATH = "instance-rename"
2429   HTYPE = constants.HTYPE_INSTANCE
2430   _OP_REQP = ["instance_name", "new_name"]
2431
2432   def BuildHooksEnv(self):
2433     """Build hooks env.
2434
2435     This runs on master, primary and secondary nodes of the instance.
2436
2437     """
2438     env = _BuildInstanceHookEnvByObject(self.instance)
2439     env["INSTANCE_NEW_NAME"] = self.op.new_name
2440     nl = ([self.cfg.GetMasterNode(), self.instance.primary_node] +
2441           list(self.instance.secondary_nodes))
2442     return env, nl, nl
2443
2444   def CheckPrereq(self):
2445     """Check prerequisites.
2446
2447     This checks that the instance is in the cluster and is not running.
2448
2449     """
2450     instance = self.cfg.GetInstanceInfo(
2451       self.cfg.ExpandInstanceName(self.op.instance_name))
2452     if instance is None:
2453       raise errors.OpPrereqError("Instance '%s' not known" %
2454                                  self.op.instance_name)
2455     if instance.status != "down":
2456       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2457                                  self.op.instance_name)
2458     remote_info = self.rpc.call_instance_info(instance.primary_node,
2459                                               instance.name,
2460                                               instance.hypervisor)
2461     if remote_info:
2462       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2463                                  (self.op.instance_name,
2464                                   instance.primary_node))
2465     self.instance = instance
2466
2467     # new name verification
2468     name_info = utils.HostInfo(self.op.new_name)
2469
2470     self.op.new_name = new_name = name_info.name
2471     instance_list = self.cfg.GetInstanceList()
2472     if new_name in instance_list:
2473       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2474                                  new_name)
2475
2476     if not getattr(self.op, "ignore_ip", False):
2477       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2478         raise errors.OpPrereqError("IP %s of instance %s already in use" %
2479                                    (name_info.ip, new_name))
2480
2481
2482   def Exec(self, feedback_fn):
2483     """Reinstall the instance.
2484
2485     """
2486     inst = self.instance
2487     old_name = inst.name
2488
2489     if inst.disk_template == constants.DT_FILE:
2490       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2491
2492     self.cfg.RenameInstance(inst.name, self.op.new_name)
2493     # Change the instance lock. This is definitely safe while we hold the BGL
2494     self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2495     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2496
2497     # re-read the instance from the configuration after rename
2498     inst = self.cfg.GetInstanceInfo(self.op.new_name)
2499
2500     if inst.disk_template == constants.DT_FILE:
2501       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2502       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2503                                                      old_file_storage_dir,
2504                                                      new_file_storage_dir)
2505
2506       if not result:
2507         raise errors.OpExecError("Could not connect to node '%s' to rename"
2508                                  " directory '%s' to '%s' (but the instance"
2509                                  " has been renamed in Ganeti)" % (
2510                                  inst.primary_node, old_file_storage_dir,
2511                                  new_file_storage_dir))
2512
2513       if not result[0]:
2514         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2515                                  " (but the instance has been renamed in"
2516                                  " Ganeti)" % (old_file_storage_dir,
2517                                                new_file_storage_dir))
2518
2519     _StartInstanceDisks(self, inst, None)
2520     try:
2521       if not self.rpc.call_instance_run_rename(inst.primary_node, inst,
2522                                                old_name,
2523                                                "sda", "sdb"):
2524         msg = ("Could not run OS rename script for instance %s on node %s"
2525                " (but the instance has been renamed in Ganeti)" %
2526                (inst.name, inst.primary_node))
2527         logger.Error(msg)
2528     finally:
2529       _ShutdownInstanceDisks(self, inst)
2530
2531
2532 class LURemoveInstance(LogicalUnit):
2533   """Remove an instance.
2534
2535   """
2536   HPATH = "instance-remove"
2537   HTYPE = constants.HTYPE_INSTANCE
2538   _OP_REQP = ["instance_name", "ignore_failures"]
2539   REQ_BGL = False
2540
2541   def ExpandNames(self):
2542     self._ExpandAndLockInstance()
2543     self.needed_locks[locking.LEVEL_NODE] = []
2544     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2545
2546   def DeclareLocks(self, level):
2547     if level == locking.LEVEL_NODE:
2548       self._LockInstancesNodes()
2549
2550   def BuildHooksEnv(self):
2551     """Build hooks env.
2552
2553     This runs on master, primary and secondary nodes of the instance.
2554
2555     """
2556     env = _BuildInstanceHookEnvByObject(self.instance)
2557     nl = [self.cfg.GetMasterNode()]
2558     return env, nl, nl
2559
2560   def CheckPrereq(self):
2561     """Check prerequisites.
2562
2563     This checks that the instance is in the cluster.
2564
2565     """
2566     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2567     assert self.instance is not None, \
2568       "Cannot retrieve locked instance %s" % self.op.instance_name
2569
2570   def Exec(self, feedback_fn):
2571     """Remove the instance.
2572
2573     """
2574     instance = self.instance
2575     logger.Info("shutting down instance %s on node %s" %
2576                 (instance.name, instance.primary_node))
2577
2578     if not self.rpc.call_instance_shutdown(instance.primary_node, instance):
2579       if self.op.ignore_failures:
2580         feedback_fn("Warning: can't shutdown instance")
2581       else:
2582         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2583                                  (instance.name, instance.primary_node))
2584
2585     logger.Info("removing block devices for instance %s" % instance.name)
2586
2587     if not _RemoveDisks(self, instance):
2588       if self.op.ignore_failures:
2589         feedback_fn("Warning: can't remove instance's disks")
2590       else:
2591         raise errors.OpExecError("Can't remove instance's disks")
2592
2593     logger.Info("removing instance %s out of cluster config" % instance.name)
2594
2595     self.cfg.RemoveInstance(instance.name)
2596     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2597
2598
2599 class LUQueryInstances(NoHooksLU):
2600   """Logical unit for querying instances.
2601
2602   """
2603   _OP_REQP = ["output_fields", "names"]
2604   REQ_BGL = False
2605
2606   def ExpandNames(self):
2607     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2608     self.static_fields = frozenset([
2609       "name", "os", "pnode", "snodes",
2610       "admin_state", "admin_ram",
2611       "disk_template", "ip", "mac", "bridge",
2612       "sda_size", "sdb_size", "vcpus", "tags",
2613       "network_port",
2614       "serial_no", "hypervisor", "hvparams",
2615       ] + ["hv/%s" % name for name in constants.HVS_PARAMETERS])
2616     _CheckOutputFields(static=self.static_fields,
2617                        dynamic=self.dynamic_fields,
2618                        selected=self.op.output_fields)
2619
2620     self.needed_locks = {}
2621     self.share_locks[locking.LEVEL_INSTANCE] = 1
2622     self.share_locks[locking.LEVEL_NODE] = 1
2623
2624     if self.op.names:
2625       self.wanted = _GetWantedInstances(self, self.op.names)
2626     else:
2627       self.wanted = locking.ALL_SET
2628
2629     self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
2630     if self.do_locking:
2631       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
2632       self.needed_locks[locking.LEVEL_NODE] = []
2633       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2634
2635   def DeclareLocks(self, level):
2636     if level == locking.LEVEL_NODE and self.do_locking:
2637       self._LockInstancesNodes()
2638
2639   def CheckPrereq(self):
2640     """Check prerequisites.
2641
2642     """
2643     pass
2644
2645   def Exec(self, feedback_fn):
2646     """Computes the list of nodes and their attributes.
2647
2648     """
2649     all_info = self.cfg.GetAllInstancesInfo()
2650     if self.do_locking:
2651       instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
2652     elif self.wanted != locking.ALL_SET:
2653       instance_names = self.wanted
2654       missing = set(instance_names).difference(all_info.keys())
2655       if missing:
2656         raise errors.OpExecError(
2657           "Some instances were removed before retrieving their data: %s"
2658           % missing)
2659     else:
2660       instance_names = all_info.keys()
2661     instance_list = [all_info[iname] for iname in instance_names]
2662
2663     # begin data gathering
2664
2665     nodes = frozenset([inst.primary_node for inst in instance_list])
2666     hv_list = list(set([inst.hypervisor for inst in instance_list]))
2667
2668     bad_nodes = []
2669     if self.dynamic_fields.intersection(self.op.output_fields):
2670       live_data = {}
2671       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
2672       for name in nodes:
2673         result = node_data[name]
2674         if result:
2675           live_data.update(result)
2676         elif result == False:
2677           bad_nodes.append(name)
2678         # else no instance is alive
2679     else:
2680       live_data = dict([(name, {}) for name in instance_names])
2681
2682     # end data gathering
2683
2684     HVPREFIX = "hv/"
2685     output = []
2686     for instance in instance_list:
2687       iout = []
2688       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
2689       for field in self.op.output_fields:
2690         if field == "name":
2691           val = instance.name
2692         elif field == "os":
2693           val = instance.os
2694         elif field == "pnode":
2695           val = instance.primary_node
2696         elif field == "snodes":
2697           val = list(instance.secondary_nodes)
2698         elif field == "admin_state":
2699           val = (instance.status != "down")
2700         elif field == "oper_state":
2701           if instance.primary_node in bad_nodes:
2702             val = None
2703           else:
2704             val = bool(live_data.get(instance.name))
2705         elif field == "status":
2706           if instance.primary_node in bad_nodes:
2707             val = "ERROR_nodedown"
2708           else:
2709             running = bool(live_data.get(instance.name))
2710             if running:
2711               if instance.status != "down":
2712                 val = "running"
2713               else:
2714                 val = "ERROR_up"
2715             else:
2716               if instance.status != "down":
2717                 val = "ERROR_down"
2718               else:
2719                 val = "ADMIN_down"
2720         elif field == "admin_ram":
2721           val = instance.memory
2722         elif field == "oper_ram":
2723           if instance.primary_node in bad_nodes:
2724             val = None
2725           elif instance.name in live_data:
2726             val = live_data[instance.name].get("memory", "?")
2727           else:
2728             val = "-"
2729         elif field == "disk_template":
2730           val = instance.disk_template
2731         elif field == "ip":
2732           val = instance.nics[0].ip
2733         elif field == "bridge":
2734           val = instance.nics[0].bridge
2735         elif field == "mac":
2736           val = instance.nics[0].mac
2737         elif field == "sda_size" or field == "sdb_size":
2738           disk = instance.FindDisk(field[:3])
2739           if disk is None:
2740             val = None
2741           else:
2742             val = disk.size
2743         elif field == "vcpus":
2744           val = instance.vcpus
2745         elif field == "tags":
2746           val = list(instance.GetTags())
2747         elif field == "serial_no":
2748           val = instance.serial_no
2749         elif field == "network_port":
2750           val = instance.network_port
2751         elif (field.startswith(HVPREFIX) and
2752               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
2753           val = i_hv.get(field[len(HVPREFIX):], None)
2754         elif field == "hvparams":
2755           val = i_hv
2756         elif field == "hypervisor":
2757           val = instance.hypervisor
2758         else:
2759           raise errors.ParameterError(field)
2760         iout.append(val)
2761       output.append(iout)
2762
2763     return output
2764
2765
2766 class LUFailoverInstance(LogicalUnit):
2767   """Failover an instance.
2768
2769   """
2770   HPATH = "instance-failover"
2771   HTYPE = constants.HTYPE_INSTANCE
2772   _OP_REQP = ["instance_name", "ignore_consistency"]
2773   REQ_BGL = False
2774
2775   def ExpandNames(self):
2776     self._ExpandAndLockInstance()
2777     self.needed_locks[locking.LEVEL_NODE] = []
2778     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2779
2780   def DeclareLocks(self, level):
2781     if level == locking.LEVEL_NODE:
2782       self._LockInstancesNodes()
2783
2784   def BuildHooksEnv(self):
2785     """Build hooks env.
2786
2787     This runs on master, primary and secondary nodes of the instance.
2788
2789     """
2790     env = {
2791       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2792       }
2793     env.update(_BuildInstanceHookEnvByObject(self.instance))
2794     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
2795     return env, nl, nl
2796
2797   def CheckPrereq(self):
2798     """Check prerequisites.
2799
2800     This checks that the instance is in the cluster.
2801
2802     """
2803     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2804     assert self.instance is not None, \
2805       "Cannot retrieve locked instance %s" % self.op.instance_name
2806
2807     if instance.disk_template not in constants.DTS_NET_MIRROR:
2808       raise errors.OpPrereqError("Instance's disk layout is not"
2809                                  " network mirrored, cannot failover.")
2810
2811     secondary_nodes = instance.secondary_nodes
2812     if not secondary_nodes:
2813       raise errors.ProgrammerError("no secondary node but using "
2814                                    "a mirrored disk template")
2815
2816     target_node = secondary_nodes[0]
2817     # check memory requirements on the secondary node
2818     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2819                          instance.name, instance.memory,
2820                          instance.hypervisor)
2821
2822     # check bridge existance
2823     brlist = [nic.bridge for nic in instance.nics]
2824     if not self.rpc.call_bridges_exist(target_node, brlist):
2825       raise errors.OpPrereqError("One or more target bridges %s does not"
2826                                  " exist on destination node '%s'" %
2827                                  (brlist, target_node))
2828
2829   def Exec(self, feedback_fn):
2830     """Failover an instance.
2831
2832     The failover is done by shutting it down on its present node and
2833     starting it on the secondary.
2834
2835     """
2836     instance = self.instance
2837
2838     source_node = instance.primary_node
2839     target_node = instance.secondary_nodes[0]
2840
2841     feedback_fn("* checking disk consistency between source and target")
2842     for dev in instance.disks:
2843       # for drbd, these are drbd over lvm
2844       if not _CheckDiskConsistency(self, dev, target_node, False):
2845         if instance.status == "up" and not self.op.ignore_consistency:
2846           raise errors.OpExecError("Disk %s is degraded on target node,"
2847                                    " aborting failover." % dev.iv_name)
2848
2849     feedback_fn("* shutting down instance on source node")
2850     logger.Info("Shutting down instance %s on node %s" %
2851                 (instance.name, source_node))
2852
2853     if not self.rpc.call_instance_shutdown(source_node, instance):
2854       if self.op.ignore_consistency:
2855         logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2856                      " anyway. Please make sure node %s is down"  %
2857                      (instance.name, source_node, source_node))
2858       else:
2859         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2860                                  (instance.name, source_node))
2861
2862     feedback_fn("* deactivating the instance's disks on source node")
2863     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
2864       raise errors.OpExecError("Can't shut down the instance's disks.")
2865
2866     instance.primary_node = target_node
2867     # distribute new instance config to the other nodes
2868     self.cfg.Update(instance)
2869
2870     # Only start the instance if it's marked as up
2871     if instance.status == "up":
2872       feedback_fn("* activating the instance's disks on target node")
2873       logger.Info("Starting instance %s on node %s" %
2874                   (instance.name, target_node))
2875
2876       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
2877                                                ignore_secondaries=True)
2878       if not disks_ok:
2879         _ShutdownInstanceDisks(self, instance)
2880         raise errors.OpExecError("Can't activate the instance's disks")
2881
2882       feedback_fn("* starting the instance on the target node")
2883       if not self.rpc.call_instance_start(target_node, instance, None):
2884         _ShutdownInstanceDisks(self, instance)
2885         raise errors.OpExecError("Could not start instance %s on node %s." %
2886                                  (instance.name, target_node))
2887
2888
2889 def _CreateBlockDevOnPrimary(lu, node, instance, device, info):
2890   """Create a tree of block devices on the primary node.
2891
2892   This always creates all devices.
2893
2894   """
2895   if device.children:
2896     for child in device.children:
2897       if not _CreateBlockDevOnPrimary(lu, node, instance, child, info):
2898         return False
2899
2900   lu.cfg.SetDiskID(device, node)
2901   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2902                                        instance.name, True, info)
2903   if not new_id:
2904     return False
2905   if device.physical_id is None:
2906     device.physical_id = new_id
2907   return True
2908
2909
2910 def _CreateBlockDevOnSecondary(lu, node, instance, device, force, info):
2911   """Create a tree of block devices on a secondary node.
2912
2913   If this device type has to be created on secondaries, create it and
2914   all its children.
2915
2916   If not, just recurse to children keeping the same 'force' value.
2917
2918   """
2919   if device.CreateOnSecondary():
2920     force = True
2921   if device.children:
2922     for child in device.children:
2923       if not _CreateBlockDevOnSecondary(lu, node, instance,
2924                                         child, force, info):
2925         return False
2926
2927   if not force:
2928     return True
2929   lu.cfg.SetDiskID(device, node)
2930   new_id = lu.rpc.call_blockdev_create(node, device, device.size,
2931                                        instance.name, False, info)
2932   if not new_id:
2933     return False
2934   if device.physical_id is None:
2935     device.physical_id = new_id
2936   return True
2937
2938
2939 def _GenerateUniqueNames(lu, exts):
2940   """Generate a suitable LV name.
2941
2942   This will generate a logical volume name for the given instance.
2943
2944   """
2945   results = []
2946   for val in exts:
2947     new_id = lu.cfg.GenerateUniqueID()
2948     results.append("%s%s" % (new_id, val))
2949   return results
2950
2951
2952 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
2953                          p_minor, s_minor):
2954   """Generate a drbd8 device complete with its children.
2955
2956   """
2957   port = lu.cfg.AllocatePort()
2958   vgname = lu.cfg.GetVGName()
2959   shared_secret = lu.cfg.GenerateDRBDSecret()
2960   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2961                           logical_id=(vgname, names[0]))
2962   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2963                           logical_id=(vgname, names[1]))
2964   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2965                           logical_id=(primary, secondary, port,
2966                                       p_minor, s_minor,
2967                                       shared_secret),
2968                           children=[dev_data, dev_meta],
2969                           iv_name=iv_name)
2970   return drbd_dev
2971
2972
2973 def _GenerateDiskTemplate(lu, template_name,
2974                           instance_name, primary_node,
2975                           secondary_nodes, disk_sz, swap_sz,
2976                           file_storage_dir, file_driver):
2977   """Generate the entire disk layout for a given template type.
2978
2979   """
2980   #TODO: compute space requirements
2981
2982   vgname = lu.cfg.GetVGName()
2983   if template_name == constants.DT_DISKLESS:
2984     disks = []
2985   elif template_name == constants.DT_PLAIN:
2986     if len(secondary_nodes) != 0:
2987       raise errors.ProgrammerError("Wrong template configuration")
2988
2989     names = _GenerateUniqueNames(lu, [".sda", ".sdb"])
2990     sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2991                            logical_id=(vgname, names[0]),
2992                            iv_name = "sda")
2993     sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2994                            logical_id=(vgname, names[1]),
2995                            iv_name = "sdb")
2996     disks = [sda_dev, sdb_dev]
2997   elif template_name == constants.DT_DRBD8:
2998     if len(secondary_nodes) != 1:
2999       raise errors.ProgrammerError("Wrong template configuration")
3000     remote_node = secondary_nodes[0]
3001     (minor_pa, minor_pb,
3002      minor_sa, minor_sb) = lu.cfg.AllocateDRBDMinor(
3003       [primary_node, primary_node, remote_node, remote_node], instance_name)
3004
3005     names = _GenerateUniqueNames(lu, [".sda_data", ".sda_meta",
3006                                       ".sdb_data", ".sdb_meta"])
3007     drbd_sda_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3008                                         disk_sz, names[0:2], "sda",
3009                                         minor_pa, minor_sa)
3010     drbd_sdb_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
3011                                         swap_sz, names[2:4], "sdb",
3012                                         minor_pb, minor_sb)
3013     disks = [drbd_sda_dev, drbd_sdb_dev]
3014   elif template_name == constants.DT_FILE:
3015     if len(secondary_nodes) != 0:
3016       raise errors.ProgrammerError("Wrong template configuration")
3017
3018     file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
3019                                 iv_name="sda", logical_id=(file_driver,
3020                                 "%s/sda" % file_storage_dir))
3021     file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
3022                                 iv_name="sdb", logical_id=(file_driver,
3023                                 "%s/sdb" % file_storage_dir))
3024     disks = [file_sda_dev, file_sdb_dev]
3025   else:
3026     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
3027   return disks
3028
3029
3030 def _GetInstanceInfoText(instance):
3031   """Compute that text that should be added to the disk's metadata.
3032
3033   """
3034   return "originstname+%s" % instance.name
3035
3036
3037 def _CreateDisks(lu, instance):
3038   """Create all disks for an instance.
3039
3040   This abstracts away some work from AddInstance.
3041
3042   Args:
3043     instance: the instance object
3044
3045   Returns:
3046     True or False showing the success of the creation process
3047
3048   """
3049   info = _GetInstanceInfoText(instance)
3050
3051   if instance.disk_template == constants.DT_FILE:
3052     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3053     result = lu.rpc.call_file_storage_dir_create(instance.primary_node,
3054                                                  file_storage_dir)
3055
3056     if not result:
3057       logger.Error("Could not connect to node '%s'" % instance.primary_node)
3058       return False
3059
3060     if not result[0]:
3061       logger.Error("failed to create directory '%s'" % file_storage_dir)
3062       return False
3063
3064   for device in instance.disks:
3065     logger.Info("creating volume %s for instance %s" %
3066                 (device.iv_name, instance.name))
3067     #HARDCODE
3068     for secondary_node in instance.secondary_nodes:
3069       if not _CreateBlockDevOnSecondary(lu, secondary_node, instance,
3070                                         device, False, info):
3071         logger.Error("failed to create volume %s (%s) on secondary node %s!" %
3072                      (device.iv_name, device, secondary_node))
3073         return False
3074     #HARDCODE
3075     if not _CreateBlockDevOnPrimary(lu, instance.primary_node,
3076                                     instance, device, info):
3077       logger.Error("failed to create volume %s on primary!" %
3078                    device.iv_name)
3079       return False
3080
3081   return True
3082
3083
3084 def _RemoveDisks(lu, instance):
3085   """Remove all disks for an instance.
3086
3087   This abstracts away some work from `AddInstance()` and
3088   `RemoveInstance()`. Note that in case some of the devices couldn't
3089   be removed, the removal will continue with the other ones (compare
3090   with `_CreateDisks()`).
3091
3092   Args:
3093     instance: the instance object
3094
3095   Returns:
3096     True or False showing the success of the removal proces
3097
3098   """
3099   logger.Info("removing block devices for instance %s" % instance.name)
3100
3101   result = True
3102   for device in instance.disks:
3103     for node, disk in device.ComputeNodeTree(instance.primary_node):
3104       lu.cfg.SetDiskID(disk, node)
3105       if not lu.rpc.call_blockdev_remove(node, disk):
3106         logger.Error("could not remove block device %s on node %s,"
3107                      " continuing anyway" %
3108                      (device.iv_name, node))
3109         result = False
3110
3111   if instance.disk_template == constants.DT_FILE:
3112     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
3113     if not lu.rpc.call_file_storage_dir_remove(instance.primary_node,
3114                                                file_storage_dir):
3115       logger.Error("could not remove directory '%s'" % file_storage_dir)
3116       result = False
3117
3118   return result
3119
3120
3121 def _ComputeDiskSize(disk_template, disk_size, swap_size):
3122   """Compute disk size requirements in the volume group
3123
3124   This is currently hard-coded for the two-drive layout.
3125
3126   """
3127   # Required free disk space as a function of disk and swap space
3128   req_size_dict = {
3129     constants.DT_DISKLESS: None,
3130     constants.DT_PLAIN: disk_size + swap_size,
3131     # 256 MB are added for drbd metadata, 128MB for each drbd device
3132     constants.DT_DRBD8: disk_size + swap_size + 256,
3133     constants.DT_FILE: None,
3134   }
3135
3136   if disk_template not in req_size_dict:
3137     raise errors.ProgrammerError("Disk template '%s' size requirement"
3138                                  " is unknown" %  disk_template)
3139
3140   return req_size_dict[disk_template]
3141
3142
3143 def _CheckHVParams(lu, nodenames, hvname, hvparams):
3144   """Hypervisor parameter validation.
3145
3146   This function abstract the hypervisor parameter validation to be
3147   used in both instance create and instance modify.
3148
3149   @type lu: L{LogicalUnit}
3150   @param lu: the logical unit for which we check
3151   @type nodenames: list
3152   @param nodenames: the list of nodes on which we should check
3153   @type hvname: string
3154   @param hvname: the name of the hypervisor we should use
3155   @type hvparams: dict
3156   @param hvparams: the parameters which we need to check
3157   @raise errors.OpPrereqError: if the parameters are not valid
3158
3159   """
3160   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
3161                                                   hvname,
3162                                                   hvparams)
3163   for node in nodenames:
3164     info = hvinfo.get(node, None)
3165     if not info or not isinstance(info, (tuple, list)):
3166       raise errors.OpPrereqError("Cannot get current information"
3167                                  " from node '%s' (%s)" % (node, info))
3168     if not info[0]:
3169       raise errors.OpPrereqError("Hypervisor parameter validation failed:"
3170                                  " %s" % info[1])
3171
3172
3173 class LUCreateInstance(LogicalUnit):
3174   """Create an instance.
3175
3176   """
3177   HPATH = "instance-add"
3178   HTYPE = constants.HTYPE_INSTANCE
3179   _OP_REQP = ["instance_name", "mem_size", "disk_size",
3180               "disk_template", "swap_size", "mode", "start", "vcpus",
3181               "wait_for_sync", "ip_check", "mac", "hvparams"]
3182   REQ_BGL = False
3183
3184   def _ExpandNode(self, node):
3185     """Expands and checks one node name.
3186
3187     """
3188     node_full = self.cfg.ExpandNodeName(node)
3189     if node_full is None:
3190       raise errors.OpPrereqError("Unknown node %s" % node)
3191     return node_full
3192
3193   def ExpandNames(self):
3194     """ExpandNames for CreateInstance.
3195
3196     Figure out the right locks for instance creation.
3197
3198     """
3199     self.needed_locks = {}
3200
3201     # set optional parameters to none if they don't exist
3202     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
3203       if not hasattr(self.op, attr):
3204         setattr(self.op, attr, None)
3205
3206     # cheap checks, mostly valid constants given
3207
3208     # verify creation mode
3209     if self.op.mode not in (constants.INSTANCE_CREATE,
3210                             constants.INSTANCE_IMPORT):
3211       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3212                                  self.op.mode)
3213
3214     # disk template and mirror node verification
3215     if self.op.disk_template not in constants.DISK_TEMPLATES:
3216       raise errors.OpPrereqError("Invalid disk template name")
3217
3218     if self.op.hypervisor is None:
3219       self.op.hypervisor = self.cfg.GetHypervisorType()
3220
3221     enabled_hvs = self.cfg.GetClusterInfo().enabled_hypervisors
3222     if self.op.hypervisor not in enabled_hvs:
3223       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
3224                                  " cluster (%s)" % (self.op.hypervisor,
3225                                   ",".join(enabled_hvs)))
3226
3227     # check hypervisor parameter syntax (locally)
3228
3229     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
3230     hv_type.CheckParameterSyntax(self.op.hvparams)
3231
3232     #### instance parameters check
3233
3234     # instance name verification
3235     hostname1 = utils.HostInfo(self.op.instance_name)
3236     self.op.instance_name = instance_name = hostname1.name
3237
3238     # this is just a preventive check, but someone might still add this
3239     # instance in the meantime, and creation will fail at lock-add time
3240     if instance_name in self.cfg.GetInstanceList():
3241       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3242                                  instance_name)
3243
3244     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
3245
3246     # ip validity checks
3247     ip = getattr(self.op, "ip", None)
3248     if ip is None or ip.lower() == "none":
3249       inst_ip = None
3250     elif ip.lower() == "auto":
3251       inst_ip = hostname1.ip
3252     else:
3253       if not utils.IsValidIP(ip):
3254         raise errors.OpPrereqError("given IP address '%s' doesn't look"
3255                                    " like a valid IP" % ip)
3256       inst_ip = ip
3257     self.inst_ip = self.op.ip = inst_ip
3258     # used in CheckPrereq for ip ping check
3259     self.check_ip = hostname1.ip
3260
3261     # MAC address verification
3262     if self.op.mac != "auto":
3263       if not utils.IsValidMac(self.op.mac.lower()):
3264         raise errors.OpPrereqError("invalid MAC address specified: %s" %
3265                                    self.op.mac)
3266
3267     # file storage checks
3268     if (self.op.file_driver and
3269         not self.op.file_driver in constants.FILE_DRIVER):
3270       raise errors.OpPrereqError("Invalid file driver name '%s'" %
3271                                  self.op.file_driver)
3272
3273     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3274       raise errors.OpPrereqError("File storage directory path not absolute")
3275
3276     ### Node/iallocator related checks
3277     if [self.op.iallocator, self.op.pnode].count(None) != 1:
3278       raise errors.OpPrereqError("One and only one of iallocator and primary"
3279                                  " node must be given")
3280
3281     if self.op.iallocator:
3282       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3283     else:
3284       self.op.pnode = self._ExpandNode(self.op.pnode)
3285       nodelist = [self.op.pnode]
3286       if self.op.snode is not None:
3287         self.op.snode = self._ExpandNode(self.op.snode)
3288         nodelist.append(self.op.snode)
3289       self.needed_locks[locking.LEVEL_NODE] = nodelist
3290
3291     # in case of import lock the source node too
3292     if self.op.mode == constants.INSTANCE_IMPORT:
3293       src_node = getattr(self.op, "src_node", None)
3294       src_path = getattr(self.op, "src_path", None)
3295
3296       if src_node is None or src_path is None:
3297         raise errors.OpPrereqError("Importing an instance requires source"
3298                                    " node and path options")
3299
3300       if not os.path.isabs(src_path):
3301         raise errors.OpPrereqError("The source path must be absolute")
3302
3303       self.op.src_node = src_node = self._ExpandNode(src_node)
3304       if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
3305         self.needed_locks[locking.LEVEL_NODE].append(src_node)
3306
3307     else: # INSTANCE_CREATE
3308       if getattr(self.op, "os_type", None) is None:
3309         raise errors.OpPrereqError("No guest OS specified")
3310
3311   def _RunAllocator(self):
3312     """Run the allocator based on input opcode.
3313
3314     """
3315     disks = [{"size": self.op.disk_size, "mode": "w"},
3316              {"size": self.op.swap_size, "mode": "w"}]
3317     nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
3318              "bridge": self.op.bridge}]
3319     ial = IAllocator(self,
3320                      mode=constants.IALLOCATOR_MODE_ALLOC,
3321                      name=self.op.instance_name,
3322                      disk_template=self.op.disk_template,
3323                      tags=[],
3324                      os=self.op.os_type,
3325                      vcpus=self.op.vcpus,
3326                      mem_size=self.op.mem_size,
3327                      disks=disks,
3328                      nics=nics,
3329                      )
3330
3331     ial.Run(self.op.iallocator)
3332
3333     if not ial.success:
3334       raise errors.OpPrereqError("Can't compute nodes using"
3335                                  " iallocator '%s': %s" % (self.op.iallocator,
3336                                                            ial.info))
3337     if len(ial.nodes) != ial.required_nodes:
3338       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3339                                  " of nodes (%s), required %s" %
3340                                  (self.op.iallocator, len(ial.nodes),
3341                                   ial.required_nodes))
3342     self.op.pnode = ial.nodes[0]
3343     logger.ToStdout("Selected nodes for the instance: %s" %
3344                     (", ".join(ial.nodes),))
3345     logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3346                 (self.op.instance_name, self.op.iallocator, ial.nodes))
3347     if ial.required_nodes == 2:
3348       self.op.snode = ial.nodes[1]
3349
3350   def BuildHooksEnv(self):
3351     """Build hooks env.
3352
3353     This runs on master, primary and secondary nodes of the instance.
3354
3355     """
3356     env = {
3357       "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3358       "INSTANCE_DISK_SIZE": self.op.disk_size,
3359       "INSTANCE_SWAP_SIZE": self.op.swap_size,
3360       "INSTANCE_ADD_MODE": self.op.mode,
3361       }
3362     if self.op.mode == constants.INSTANCE_IMPORT:
3363       env["INSTANCE_SRC_NODE"] = self.op.src_node
3364       env["INSTANCE_SRC_PATH"] = self.op.src_path
3365       env["INSTANCE_SRC_IMAGE"] = self.src_image
3366
3367     env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3368       primary_node=self.op.pnode,
3369       secondary_nodes=self.secondaries,
3370       status=self.instance_status,
3371       os_type=self.op.os_type,
3372       memory=self.op.mem_size,
3373       vcpus=self.op.vcpus,
3374       nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3375     ))
3376
3377     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
3378           self.secondaries)
3379     return env, nl, nl
3380
3381
3382   def CheckPrereq(self):
3383     """Check prerequisites.
3384
3385     """
3386     if (not self.cfg.GetVGName() and
3387         self.op.disk_template not in constants.DTS_NOT_LVM):
3388       raise errors.OpPrereqError("Cluster does not support lvm-based"
3389                                  " instances")
3390
3391
3392     if self.op.mode == constants.INSTANCE_IMPORT:
3393       src_node = self.op.src_node
3394       src_path = self.op.src_path
3395
3396       export_info = self.rpc.call_export_info(src_node, src_path)
3397
3398       if not export_info:
3399         raise errors.OpPrereqError("No export found in dir %s" % src_path)
3400
3401       if not export_info.has_section(constants.INISECT_EXP):
3402         raise errors.ProgrammerError("Corrupted export config")
3403
3404       ei_version = export_info.get(constants.INISECT_EXP, 'version')
3405       if (int(ei_version) != constants.EXPORT_VERSION):
3406         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3407                                    (ei_version, constants.EXPORT_VERSION))
3408
3409       if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3410         raise errors.OpPrereqError("Can't import instance with more than"
3411                                    " one data disk")
3412
3413       # FIXME: are the old os-es, disk sizes, etc. useful?
3414       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3415       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3416                                                          'disk0_dump'))
3417       self.src_image = diskimage
3418
3419     # ip ping checks (we use the same ip that was resolved in ExpandNames)
3420
3421     if self.op.start and not self.op.ip_check:
3422       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3423                                  " adding an instance in start mode")
3424
3425     if self.op.ip_check:
3426       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
3427         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3428                                    (self.check_ip, self.op.instance_name))
3429
3430     # bridge verification
3431     bridge = getattr(self.op, "bridge", None)
3432     if bridge is None:
3433       self.op.bridge = self.cfg.GetDefBridge()
3434     else:
3435       self.op.bridge = bridge
3436
3437     #### allocator run
3438
3439     if self.op.iallocator is not None:
3440       self._RunAllocator()
3441
3442     #### node related checks
3443
3444     # check primary node
3445     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
3446     assert self.pnode is not None, \
3447       "Cannot retrieve locked node %s" % self.op.pnode
3448     self.secondaries = []
3449
3450     # mirror node verification
3451     if self.op.disk_template in constants.DTS_NET_MIRROR:
3452       if self.op.snode is None:
3453         raise errors.OpPrereqError("The networked disk templates need"
3454                                    " a mirror node")
3455       if self.op.snode == pnode.name:
3456         raise errors.OpPrereqError("The secondary node cannot be"
3457                                    " the primary node.")
3458       self.secondaries.append(self.op.snode)
3459
3460     nodenames = [pnode.name] + self.secondaries
3461
3462     req_size = _ComputeDiskSize(self.op.disk_template,
3463                                 self.op.disk_size, self.op.swap_size)
3464
3465     # Check lv size requirements
3466     if req_size is not None:
3467       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
3468                                          self.op.hypervisor)
3469       for node in nodenames:
3470         info = nodeinfo.get(node, None)
3471         if not info:
3472           raise errors.OpPrereqError("Cannot get current information"
3473                                      " from node '%s'" % node)
3474         vg_free = info.get('vg_free', None)
3475         if not isinstance(vg_free, int):
3476           raise errors.OpPrereqError("Can't compute free disk space on"
3477                                      " node %s" % node)
3478         if req_size > info['vg_free']:
3479           raise errors.OpPrereqError("Not enough disk space on target node %s."
3480                                      " %d MB available, %d MB required" %
3481                                      (node, info['vg_free'], req_size))
3482
3483     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
3484
3485     # os verification
3486     os_obj = self.rpc.call_os_get(pnode.name, self.op.os_type)
3487     if not os_obj:
3488       raise errors.OpPrereqError("OS '%s' not in supported os list for"
3489                                  " primary node"  % self.op.os_type)
3490
3491     # bridge check on primary node
3492     if not self.rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3493       raise errors.OpPrereqError("target bridge '%s' does not exist on"
3494                                  " destination node '%s'" %
3495                                  (self.op.bridge, pnode.name))
3496
3497     # memory check on primary node
3498     if self.op.start:
3499       _CheckNodeFreeMemory(self, self.pnode.name,
3500                            "creating instance %s" % self.op.instance_name,
3501                            self.op.mem_size, self.op.hypervisor)
3502
3503     if self.op.start:
3504       self.instance_status = 'up'
3505     else:
3506       self.instance_status = 'down'
3507
3508   def Exec(self, feedback_fn):
3509     """Create and add the instance to the cluster.
3510
3511     """
3512     instance = self.op.instance_name
3513     pnode_name = self.pnode.name
3514
3515     if self.op.mac == "auto":
3516       mac_address = self.cfg.GenerateMAC()
3517     else:
3518       mac_address = self.op.mac
3519
3520     nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3521     if self.inst_ip is not None:
3522       nic.ip = self.inst_ip
3523
3524     ht_kind = self.op.hypervisor
3525     if ht_kind in constants.HTS_REQ_PORT:
3526       network_port = self.cfg.AllocatePort()
3527     else:
3528       network_port = None
3529
3530     ##if self.op.vnc_bind_address is None:
3531     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3532
3533     # this is needed because os.path.join does not accept None arguments
3534     if self.op.file_storage_dir is None:
3535       string_file_storage_dir = ""
3536     else:
3537       string_file_storage_dir = self.op.file_storage_dir
3538
3539     # build the full file storage dir path
3540     file_storage_dir = os.path.normpath(os.path.join(
3541                                         self.cfg.GetFileStorageDir(),
3542                                         string_file_storage_dir, instance))
3543
3544
3545     disks = _GenerateDiskTemplate(self,
3546                                   self.op.disk_template,
3547                                   instance, pnode_name,
3548                                   self.secondaries, self.op.disk_size,
3549                                   self.op.swap_size,
3550                                   file_storage_dir,
3551                                   self.op.file_driver)
3552
3553     iobj = objects.Instance(name=instance, os=self.op.os_type,
3554                             primary_node=pnode_name,
3555                             memory=self.op.mem_size,
3556                             vcpus=self.op.vcpus,
3557                             nics=[nic], disks=disks,
3558                             disk_template=self.op.disk_template,
3559                             status=self.instance_status,
3560                             network_port=network_port,
3561                             hvparams=self.op.hvparams,
3562                             hypervisor=self.op.hypervisor,
3563                             )
3564
3565     feedback_fn("* creating instance disks...")
3566     if not _CreateDisks(self, iobj):
3567       _RemoveDisks(self, iobj)
3568       self.cfg.ReleaseDRBDMinors(instance)
3569       raise errors.OpExecError("Device creation failed, reverting...")
3570
3571     feedback_fn("adding instance %s to cluster config" % instance)
3572
3573     self.cfg.AddInstance(iobj)
3574     # Declare that we don't want to remove the instance lock anymore, as we've
3575     # added the instance to the config
3576     del self.remove_locks[locking.LEVEL_INSTANCE]
3577     # Remove the temp. assignements for the instance's drbds
3578     self.cfg.ReleaseDRBDMinors(instance)
3579
3580     if self.op.wait_for_sync:
3581       disk_abort = not _WaitForSync(self, iobj)
3582     elif iobj.disk_template in constants.DTS_NET_MIRROR:
3583       # make sure the disks are not degraded (still sync-ing is ok)
3584       time.sleep(15)
3585       feedback_fn("* checking mirrors status")
3586       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
3587     else:
3588       disk_abort = False
3589
3590     if disk_abort:
3591       _RemoveDisks(self, iobj)
3592       self.cfg.RemoveInstance(iobj.name)
3593       # Make sure the instance lock gets removed
3594       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
3595       raise errors.OpExecError("There are some degraded disks for"
3596                                " this instance")
3597
3598     feedback_fn("creating os for instance %s on node %s" %
3599                 (instance, pnode_name))
3600
3601     if iobj.disk_template != constants.DT_DISKLESS:
3602       if self.op.mode == constants.INSTANCE_CREATE:
3603         feedback_fn("* running the instance OS create scripts...")
3604         if not self.rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3605           raise errors.OpExecError("could not add os for instance %s"
3606                                    " on node %s" %
3607                                    (instance, pnode_name))
3608
3609       elif self.op.mode == constants.INSTANCE_IMPORT:
3610         feedback_fn("* running the instance OS import scripts...")
3611         src_node = self.op.src_node
3612         src_image = self.src_image
3613         cluster_name = self.cfg.GetClusterName()
3614         if not self.rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3615                                                 src_node, src_image,
3616                                                 cluster_name):
3617           raise errors.OpExecError("Could not import os for instance"
3618                                    " %s on node %s" %
3619                                    (instance, pnode_name))
3620       else:
3621         # also checked in the prereq part
3622         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3623                                      % self.op.mode)
3624
3625     if self.op.start:
3626       logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3627       feedback_fn("* starting instance...")
3628       if not self.rpc.call_instance_start(pnode_name, iobj, None):
3629         raise errors.OpExecError("Could not start instance")
3630
3631
3632 class LUConnectConsole(NoHooksLU):
3633   """Connect to an instance's console.
3634
3635   This is somewhat special in that it returns the command line that
3636   you need to run on the master node in order to connect to the
3637   console.
3638
3639   """
3640   _OP_REQP = ["instance_name"]
3641   REQ_BGL = False
3642
3643   def ExpandNames(self):
3644     self._ExpandAndLockInstance()
3645
3646   def CheckPrereq(self):
3647     """Check prerequisites.
3648
3649     This checks that the instance is in the cluster.
3650
3651     """
3652     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3653     assert self.instance is not None, \
3654       "Cannot retrieve locked instance %s" % self.op.instance_name
3655
3656   def Exec(self, feedback_fn):
3657     """Connect to the console of an instance
3658
3659     """
3660     instance = self.instance
3661     node = instance.primary_node
3662
3663     node_insts = self.rpc.call_instance_list([node],
3664                                              [instance.hypervisor])[node]
3665     if node_insts is False:
3666       raise errors.OpExecError("Can't connect to node %s." % node)
3667
3668     if instance.name not in node_insts:
3669       raise errors.OpExecError("Instance %s is not running." % instance.name)
3670
3671     logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3672
3673     hyper = hypervisor.GetHypervisor(instance.hypervisor)
3674     console_cmd = hyper.GetShellCommandForConsole(instance)
3675
3676     # build ssh cmdline
3677     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3678
3679
3680 class LUReplaceDisks(LogicalUnit):
3681   """Replace the disks of an instance.
3682
3683   """
3684   HPATH = "mirrors-replace"
3685   HTYPE = constants.HTYPE_INSTANCE
3686   _OP_REQP = ["instance_name", "mode", "disks"]
3687   REQ_BGL = False
3688
3689   def ExpandNames(self):
3690     self._ExpandAndLockInstance()
3691
3692     if not hasattr(self.op, "remote_node"):
3693       self.op.remote_node = None
3694
3695     ia_name = getattr(self.op, "iallocator", None)
3696     if ia_name is not None:
3697       if self.op.remote_node is not None:
3698         raise errors.OpPrereqError("Give either the iallocator or the new"
3699                                    " secondary, not both")
3700       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
3701     elif self.op.remote_node is not None:
3702       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
3703       if remote_node is None:
3704         raise errors.OpPrereqError("Node '%s' not known" %
3705                                    self.op.remote_node)
3706       self.op.remote_node = remote_node
3707       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
3708       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3709     else:
3710       self.needed_locks[locking.LEVEL_NODE] = []
3711       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3712
3713   def DeclareLocks(self, level):
3714     # If we're not already locking all nodes in the set we have to declare the
3715     # instance's primary/secondary nodes.
3716     if (level == locking.LEVEL_NODE and
3717         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
3718       self._LockInstancesNodes()
3719
3720   def _RunAllocator(self):
3721     """Compute a new secondary node using an IAllocator.
3722
3723     """
3724     ial = IAllocator(self,
3725                      mode=constants.IALLOCATOR_MODE_RELOC,
3726                      name=self.op.instance_name,
3727                      relocate_from=[self.sec_node])
3728
3729     ial.Run(self.op.iallocator)
3730
3731     if not ial.success:
3732       raise errors.OpPrereqError("Can't compute nodes using"
3733                                  " iallocator '%s': %s" % (self.op.iallocator,
3734                                                            ial.info))
3735     if len(ial.nodes) != ial.required_nodes:
3736       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3737                                  " of nodes (%s), required %s" %
3738                                  (len(ial.nodes), ial.required_nodes))
3739     self.op.remote_node = ial.nodes[0]
3740     logger.ToStdout("Selected new secondary for the instance: %s" %
3741                     self.op.remote_node)
3742
3743   def BuildHooksEnv(self):
3744     """Build hooks env.
3745
3746     This runs on the master, the primary and all the secondaries.
3747
3748     """
3749     env = {
3750       "MODE": self.op.mode,
3751       "NEW_SECONDARY": self.op.remote_node,
3752       "OLD_SECONDARY": self.instance.secondary_nodes[0],
3753       }
3754     env.update(_BuildInstanceHookEnvByObject(self.instance))
3755     nl = [
3756       self.cfg.GetMasterNode(),
3757       self.instance.primary_node,
3758       ]
3759     if self.op.remote_node is not None:
3760       nl.append(self.op.remote_node)
3761     return env, nl, nl
3762
3763   def CheckPrereq(self):
3764     """Check prerequisites.
3765
3766     This checks that the instance is in the cluster.
3767
3768     """
3769     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3770     assert instance is not None, \
3771       "Cannot retrieve locked instance %s" % self.op.instance_name
3772     self.instance = instance
3773
3774     if instance.disk_template not in constants.DTS_NET_MIRROR:
3775       raise errors.OpPrereqError("Instance's disk layout is not"
3776                                  " network mirrored.")
3777
3778     if len(instance.secondary_nodes) != 1:
3779       raise errors.OpPrereqError("The instance has a strange layout,"
3780                                  " expected one secondary but found %d" %
3781                                  len(instance.secondary_nodes))
3782
3783     self.sec_node = instance.secondary_nodes[0]
3784
3785     ia_name = getattr(self.op, "iallocator", None)
3786     if ia_name is not None:
3787       self._RunAllocator()
3788
3789     remote_node = self.op.remote_node
3790     if remote_node is not None:
3791       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3792       assert self.remote_node_info is not None, \
3793         "Cannot retrieve locked node %s" % remote_node
3794     else:
3795       self.remote_node_info = None
3796     if remote_node == instance.primary_node:
3797       raise errors.OpPrereqError("The specified node is the primary node of"
3798                                  " the instance.")
3799     elif remote_node == self.sec_node:
3800       if self.op.mode == constants.REPLACE_DISK_SEC:
3801         # this is for DRBD8, where we can't execute the same mode of
3802         # replacement as for drbd7 (no different port allocated)
3803         raise errors.OpPrereqError("Same secondary given, cannot execute"
3804                                    " replacement")
3805     if instance.disk_template == constants.DT_DRBD8:
3806       if (self.op.mode == constants.REPLACE_DISK_ALL and
3807           remote_node is not None):
3808         # switch to replace secondary mode
3809         self.op.mode = constants.REPLACE_DISK_SEC
3810
3811       if self.op.mode == constants.REPLACE_DISK_ALL:
3812         raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3813                                    " secondary disk replacement, not"
3814                                    " both at once")
3815       elif self.op.mode == constants.REPLACE_DISK_PRI:
3816         if remote_node is not None:
3817           raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3818                                      " the secondary while doing a primary"
3819                                      " node disk replacement")
3820         self.tgt_node = instance.primary_node
3821         self.oth_node = instance.secondary_nodes[0]
3822       elif self.op.mode == constants.REPLACE_DISK_SEC:
3823         self.new_node = remote_node # this can be None, in which case
3824                                     # we don't change the secondary
3825         self.tgt_node = instance.secondary_nodes[0]
3826         self.oth_node = instance.primary_node
3827       else:
3828         raise errors.ProgrammerError("Unhandled disk replace mode")
3829
3830     for name in self.op.disks:
3831       if instance.FindDisk(name) is None:
3832         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3833                                    (name, instance.name))
3834
3835   def _ExecD8DiskOnly(self, feedback_fn):
3836     """Replace a disk on the primary or secondary for dbrd8.
3837
3838     The algorithm for replace is quite complicated:
3839       - for each disk to be replaced:
3840         - create new LVs on the target node with unique names
3841         - detach old LVs from the drbd device
3842         - rename old LVs to name_replaced.<time_t>
3843         - rename new LVs to old LVs
3844         - attach the new LVs (with the old names now) to the drbd device
3845       - wait for sync across all devices
3846       - for each modified disk:
3847         - remove old LVs (which have the name name_replaces.<time_t>)
3848
3849     Failures are not very well handled.
3850
3851     """
3852     steps_total = 6
3853     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3854     instance = self.instance
3855     iv_names = {}
3856     vgname = self.cfg.GetVGName()
3857     # start of work
3858     cfg = self.cfg
3859     tgt_node = self.tgt_node
3860     oth_node = self.oth_node
3861
3862     # Step: check device activation
3863     self.proc.LogStep(1, steps_total, "check device existence")
3864     info("checking volume groups")
3865     my_vg = cfg.GetVGName()
3866     results = self.rpc.call_vg_list([oth_node, tgt_node])
3867     if not results:
3868       raise errors.OpExecError("Can't list volume groups on the nodes")
3869     for node in oth_node, tgt_node:
3870       res = results.get(node, False)
3871       if not res or my_vg not in res:
3872         raise errors.OpExecError("Volume group '%s' not found on %s" %
3873                                  (my_vg, node))
3874     for dev in instance.disks:
3875       if not dev.iv_name in self.op.disks:
3876         continue
3877       for node in tgt_node, oth_node:
3878         info("checking %s on %s" % (dev.iv_name, node))
3879         cfg.SetDiskID(dev, node)
3880         if not self.rpc.call_blockdev_find(node, dev):
3881           raise errors.OpExecError("Can't find device %s on node %s" %
3882                                    (dev.iv_name, node))
3883
3884     # Step: check other node consistency
3885     self.proc.LogStep(2, steps_total, "check peer consistency")
3886     for dev in instance.disks:
3887       if not dev.iv_name in self.op.disks:
3888         continue
3889       info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3890       if not _CheckDiskConsistency(self, dev, oth_node,
3891                                    oth_node==instance.primary_node):
3892         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3893                                  " to replace disks on this node (%s)" %
3894                                  (oth_node, tgt_node))
3895
3896     # Step: create new storage
3897     self.proc.LogStep(3, steps_total, "allocate new storage")
3898     for dev in instance.disks:
3899       if not dev.iv_name in self.op.disks:
3900         continue
3901       size = dev.size
3902       cfg.SetDiskID(dev, tgt_node)
3903       lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3904       names = _GenerateUniqueNames(self, lv_names)
3905       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3906                              logical_id=(vgname, names[0]))
3907       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3908                              logical_id=(vgname, names[1]))
3909       new_lvs = [lv_data, lv_meta]
3910       old_lvs = dev.children
3911       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3912       info("creating new local storage on %s for %s" %
3913            (tgt_node, dev.iv_name))
3914       # since we *always* want to create this LV, we use the
3915       # _Create...OnPrimary (which forces the creation), even if we
3916       # are talking about the secondary node
3917       for new_lv in new_lvs:
3918         if not _CreateBlockDevOnPrimary(self, tgt_node, instance, new_lv,
3919                                         _GetInstanceInfoText(instance)):
3920           raise errors.OpExecError("Failed to create new LV named '%s' on"
3921                                    " node '%s'" %
3922                                    (new_lv.logical_id[1], tgt_node))
3923
3924     # Step: for each lv, detach+rename*2+attach
3925     self.proc.LogStep(4, steps_total, "change drbd configuration")
3926     for dev, old_lvs, new_lvs in iv_names.itervalues():
3927       info("detaching %s drbd from local storage" % dev.iv_name)
3928       if not self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3929         raise errors.OpExecError("Can't detach drbd from local storage on node"
3930                                  " %s for device %s" % (tgt_node, dev.iv_name))
3931       #dev.children = []
3932       #cfg.Update(instance)
3933
3934       # ok, we created the new LVs, so now we know we have the needed
3935       # storage; as such, we proceed on the target node to rename
3936       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3937       # using the assumption that logical_id == physical_id (which in
3938       # turn is the unique_id on that node)
3939
3940       # FIXME(iustin): use a better name for the replaced LVs
3941       temp_suffix = int(time.time())
3942       ren_fn = lambda d, suff: (d.physical_id[0],
3943                                 d.physical_id[1] + "_replaced-%s" % suff)
3944       # build the rename list based on what LVs exist on the node
3945       rlist = []
3946       for to_ren in old_lvs:
3947         find_res = self.rpc.call_blockdev_find(tgt_node, to_ren)
3948         if find_res is not None: # device exists
3949           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3950
3951       info("renaming the old LVs on the target node")
3952       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3953         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3954       # now we rename the new LVs to the old LVs
3955       info("renaming the new LVs on the target node")
3956       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3957       if not self.rpc.call_blockdev_rename(tgt_node, rlist):
3958         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3959
3960       for old, new in zip(old_lvs, new_lvs):
3961         new.logical_id = old.logical_id
3962         cfg.SetDiskID(new, tgt_node)
3963
3964       for disk in old_lvs:
3965         disk.logical_id = ren_fn(disk, temp_suffix)
3966         cfg.SetDiskID(disk, tgt_node)
3967
3968       # now that the new lvs have the old name, we can add them to the device
3969       info("adding new mirror component on %s" % tgt_node)
3970       if not self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3971         for new_lv in new_lvs:
3972           if not self.rpc.call_blockdev_remove(tgt_node, new_lv):
3973             warning("Can't rollback device %s", hint="manually cleanup unused"
3974                     " logical volumes")
3975         raise errors.OpExecError("Can't add local storage to drbd")
3976
3977       dev.children = new_lvs
3978       cfg.Update(instance)
3979
3980     # Step: wait for sync
3981
3982     # this can fail as the old devices are degraded and _WaitForSync
3983     # does a combined result over all disks, so we don't check its
3984     # return value
3985     self.proc.LogStep(5, steps_total, "sync devices")
3986     _WaitForSync(self, instance, unlock=True)
3987
3988     # so check manually all the devices
3989     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3990       cfg.SetDiskID(dev, instance.primary_node)
3991       is_degr = self.rpc.call_blockdev_find(instance.primary_node, dev)[5]
3992       if is_degr:
3993         raise errors.OpExecError("DRBD device %s is degraded!" % name)
3994
3995     # Step: remove old storage
3996     self.proc.LogStep(6, steps_total, "removing old storage")
3997     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3998       info("remove logical volumes for %s" % name)
3999       for lv in old_lvs:
4000         cfg.SetDiskID(lv, tgt_node)
4001         if not self.rpc.call_blockdev_remove(tgt_node, lv):
4002           warning("Can't remove old LV", hint="manually remove unused LVs")
4003           continue
4004
4005   def _ExecD8Secondary(self, feedback_fn):
4006     """Replace the secondary node for drbd8.
4007
4008     The algorithm for replace is quite complicated:
4009       - for all disks of the instance:
4010         - create new LVs on the new node with same names
4011         - shutdown the drbd device on the old secondary
4012         - disconnect the drbd network on the primary
4013         - create the drbd device on the new secondary
4014         - network attach the drbd on the primary, using an artifice:
4015           the drbd code for Attach() will connect to the network if it
4016           finds a device which is connected to the good local disks but
4017           not network enabled
4018       - wait for sync across all devices
4019       - remove all disks from the old secondary
4020
4021     Failures are not very well handled.
4022
4023     """
4024     steps_total = 6
4025     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
4026     instance = self.instance
4027     iv_names = {}
4028     vgname = self.cfg.GetVGName()
4029     # start of work
4030     cfg = self.cfg
4031     old_node = self.tgt_node
4032     new_node = self.new_node
4033     pri_node = instance.primary_node
4034
4035     # Step: check device activation
4036     self.proc.LogStep(1, steps_total, "check device existence")
4037     info("checking volume groups")
4038     my_vg = cfg.GetVGName()
4039     results = self.rpc.call_vg_list([pri_node, new_node])
4040     if not results:
4041       raise errors.OpExecError("Can't list volume groups on the nodes")
4042     for node in pri_node, new_node:
4043       res = results.get(node, False)
4044       if not res or my_vg not in res:
4045         raise errors.OpExecError("Volume group '%s' not found on %s" %
4046                                  (my_vg, node))
4047     for dev in instance.disks:
4048       if not dev.iv_name in self.op.disks:
4049         continue
4050       info("checking %s on %s" % (dev.iv_name, pri_node))
4051       cfg.SetDiskID(dev, pri_node)
4052       if not self.rpc.call_blockdev_find(pri_node, dev):
4053         raise errors.OpExecError("Can't find device %s on node %s" %
4054                                  (dev.iv_name, pri_node))
4055
4056     # Step: check other node consistency
4057     self.proc.LogStep(2, steps_total, "check peer consistency")
4058     for dev in instance.disks:
4059       if not dev.iv_name in self.op.disks:
4060         continue
4061       info("checking %s consistency on %s" % (dev.iv_name, pri_node))
4062       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
4063         raise errors.OpExecError("Primary node (%s) has degraded storage,"
4064                                  " unsafe to replace the secondary" %
4065                                  pri_node)
4066
4067     # Step: create new storage
4068     self.proc.LogStep(3, steps_total, "allocate new storage")
4069     for dev in instance.disks:
4070       size = dev.size
4071       info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
4072       # since we *always* want to create this LV, we use the
4073       # _Create...OnPrimary (which forces the creation), even if we
4074       # are talking about the secondary node
4075       for new_lv in dev.children:
4076         if not _CreateBlockDevOnPrimary(self, new_node, instance, new_lv,
4077                                         _GetInstanceInfoText(instance)):
4078           raise errors.OpExecError("Failed to create new LV named '%s' on"
4079                                    " node '%s'" %
4080                                    (new_lv.logical_id[1], new_node))
4081
4082
4083     # Step 4: dbrd minors and drbd setups changes
4084     # after this, we must manually remove the drbd minors on both the
4085     # error and the success paths
4086     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
4087                                    instance.name)
4088     logging.debug("Allocated minors %s" % (minors,))
4089     self.proc.LogStep(4, steps_total, "changing drbd configuration")
4090     for dev, new_minor in zip(instance.disks, minors):
4091       size = dev.size
4092       info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
4093       # create new devices on new_node
4094       if pri_node == dev.logical_id[0]:
4095         new_logical_id = (pri_node, new_node,
4096                           dev.logical_id[2], dev.logical_id[3], new_minor,
4097                           dev.logical_id[5])
4098       else:
4099         new_logical_id = (new_node, pri_node,
4100                           dev.logical_id[2], new_minor, dev.logical_id[4],
4101                           dev.logical_id[5])
4102       iv_names[dev.iv_name] = (dev, dev.children, new_logical_id)
4103       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
4104                     new_logical_id)
4105       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
4106                               logical_id=new_logical_id,
4107                               children=dev.children)
4108       if not _CreateBlockDevOnSecondary(self, new_node, instance,
4109                                         new_drbd, False,
4110                                         _GetInstanceInfoText(instance)):
4111         self.cfg.ReleaseDRBDMinors(instance.name)
4112         raise errors.OpExecError("Failed to create new DRBD on"
4113                                  " node '%s'" % new_node)
4114
4115     for dev in instance.disks:
4116       # we have new devices, shutdown the drbd on the old secondary
4117       info("shutting down drbd for %s on old node" % dev.iv_name)
4118       cfg.SetDiskID(dev, old_node)
4119       if not self.rpc.call_blockdev_shutdown(old_node, dev):
4120         warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
4121                 hint="Please cleanup this device manually as soon as possible")
4122
4123     info("detaching primary drbds from the network (=> standalone)")
4124     done = 0
4125     for dev in instance.disks:
4126       cfg.SetDiskID(dev, pri_node)
4127       # set the network part of the physical (unique in bdev terms) id
4128       # to None, meaning detach from network
4129       dev.physical_id = (None, None, None, None) + dev.physical_id[4:]
4130       # and 'find' the device, which will 'fix' it to match the
4131       # standalone state
4132       if self.rpc.call_blockdev_find(pri_node, dev):
4133         done += 1
4134       else:
4135         warning("Failed to detach drbd %s from network, unusual case" %
4136                 dev.iv_name)
4137
4138     if not done:
4139       # no detaches succeeded (very unlikely)
4140       self.cfg.ReleaseDRBDMinors(instance.name)
4141       raise errors.OpExecError("Can't detach at least one DRBD from old node")
4142
4143     # if we managed to detach at least one, we update all the disks of
4144     # the instance to point to the new secondary
4145     info("updating instance configuration")
4146     for dev, _, new_logical_id in iv_names.itervalues():
4147       dev.logical_id = new_logical_id
4148       cfg.SetDiskID(dev, pri_node)
4149     cfg.Update(instance)
4150     # we can remove now the temp minors as now the new values are
4151     # written to the config file (and therefore stable)
4152     self.cfg.ReleaseDRBDMinors(instance.name)
4153
4154     # and now perform the drbd attach
4155     info("attaching primary drbds to new secondary (standalone => connected)")
4156     failures = []
4157     for dev in instance.disks:
4158       info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
4159       # since the attach is smart, it's enough to 'find' the device,
4160       # it will automatically activate the network, if the physical_id
4161       # is correct
4162       cfg.SetDiskID(dev, pri_node)
4163       logging.debug("Disk to attach: %s", dev)
4164       if not self.rpc.call_blockdev_find(pri_node, dev):
4165         warning("can't attach drbd %s to new secondary!" % dev.iv_name,
4166                 "please do a gnt-instance info to see the status of disks")
4167
4168     # this can fail as the old devices are degraded and _WaitForSync
4169     # does a combined result over all disks, so we don't check its
4170     # return value
4171     self.proc.LogStep(5, steps_total, "sync devices")
4172     _WaitForSync(self, instance, unlock=True)
4173
4174     # so check manually all the devices
4175     for name, (dev, old_lvs, _) in iv_names.iteritems():
4176       cfg.SetDiskID(dev, pri_node)
4177       is_degr = self.rpc.call_blockdev_find(pri_node, dev)[5]
4178       if is_degr:
4179         raise errors.OpExecError("DRBD device %s is degraded!" % name)
4180
4181     self.proc.LogStep(6, steps_total, "removing old storage")
4182     for name, (dev, old_lvs, _) in iv_names.iteritems():
4183       info("remove logical volumes for %s" % name)
4184       for lv in old_lvs:
4185         cfg.SetDiskID(lv, old_node)
4186         if not self.rpc.call_blockdev_remove(old_node, lv):
4187           warning("Can't remove LV on old secondary",
4188                   hint="Cleanup stale volumes by hand")
4189
4190   def Exec(self, feedback_fn):
4191     """Execute disk replacement.
4192
4193     This dispatches the disk replacement to the appropriate handler.
4194
4195     """
4196     instance = self.instance
4197
4198     # Activate the instance disks if we're replacing them on a down instance
4199     if instance.status == "down":
4200       _StartInstanceDisks(self, instance, True)
4201
4202     if instance.disk_template == constants.DT_DRBD8:
4203       if self.op.remote_node is None:
4204         fn = self._ExecD8DiskOnly
4205       else:
4206         fn = self._ExecD8Secondary
4207     else:
4208       raise errors.ProgrammerError("Unhandled disk replacement case")
4209
4210     ret = fn(feedback_fn)
4211
4212     # Deactivate the instance disks if we're replacing them on a down instance
4213     if instance.status == "down":
4214       _SafeShutdownInstanceDisks(self, instance)
4215
4216     return ret
4217
4218
4219 class LUGrowDisk(LogicalUnit):
4220   """Grow a disk of an instance.
4221
4222   """
4223   HPATH = "disk-grow"
4224   HTYPE = constants.HTYPE_INSTANCE
4225   _OP_REQP = ["instance_name", "disk", "amount"]
4226   REQ_BGL = False
4227
4228   def ExpandNames(self):
4229     self._ExpandAndLockInstance()
4230     self.needed_locks[locking.LEVEL_NODE] = []
4231     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4232
4233   def DeclareLocks(self, level):
4234     if level == locking.LEVEL_NODE:
4235       self._LockInstancesNodes()
4236
4237   def BuildHooksEnv(self):
4238     """Build hooks env.
4239
4240     This runs on the master, the primary and all the secondaries.
4241
4242     """
4243     env = {
4244       "DISK": self.op.disk,
4245       "AMOUNT": self.op.amount,
4246       }
4247     env.update(_BuildInstanceHookEnvByObject(self.instance))
4248     nl = [
4249       self.cfg.GetMasterNode(),
4250       self.instance.primary_node,
4251       ]
4252     return env, nl, nl
4253
4254   def CheckPrereq(self):
4255     """Check prerequisites.
4256
4257     This checks that the instance is in the cluster.
4258
4259     """
4260     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4261     assert instance is not None, \
4262       "Cannot retrieve locked instance %s" % self.op.instance_name
4263
4264     self.instance = instance
4265
4266     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
4267       raise errors.OpPrereqError("Instance's disk layout does not support"
4268                                  " growing.")
4269
4270     if instance.FindDisk(self.op.disk) is None:
4271       raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
4272                                  (self.op.disk, instance.name))
4273
4274     nodenames = [instance.primary_node] + list(instance.secondary_nodes)
4275     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4276                                        instance.hypervisor)
4277     for node in nodenames:
4278       info = nodeinfo.get(node, None)
4279       if not info:
4280         raise errors.OpPrereqError("Cannot get current information"
4281                                    " from node '%s'" % node)
4282       vg_free = info.get('vg_free', None)
4283       if not isinstance(vg_free, int):
4284         raise errors.OpPrereqError("Can't compute free disk space on"
4285                                    " node %s" % node)
4286       if self.op.amount > info['vg_free']:
4287         raise errors.OpPrereqError("Not enough disk space on target node %s:"
4288                                    " %d MiB available, %d MiB required" %
4289                                    (node, info['vg_free'], self.op.amount))
4290
4291   def Exec(self, feedback_fn):
4292     """Execute disk grow.
4293
4294     """
4295     instance = self.instance
4296     disk = instance.FindDisk(self.op.disk)
4297     for node in (instance.secondary_nodes + (instance.primary_node,)):
4298       self.cfg.SetDiskID(disk, node)
4299       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
4300       if (not result or not isinstance(result, (list, tuple)) or
4301           len(result) != 2):
4302         raise errors.OpExecError("grow request failed to node %s" % node)
4303       elif not result[0]:
4304         raise errors.OpExecError("grow request failed to node %s: %s" %
4305                                  (node, result[1]))
4306     disk.RecordGrow(self.op.amount)
4307     self.cfg.Update(instance)
4308     return
4309
4310
4311 class LUQueryInstanceData(NoHooksLU):
4312   """Query runtime instance data.
4313
4314   """
4315   _OP_REQP = ["instances"]
4316   REQ_BGL = False
4317
4318   def ExpandNames(self):
4319     self.needed_locks = {}
4320     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
4321
4322     if not isinstance(self.op.instances, list):
4323       raise errors.OpPrereqError("Invalid argument type 'instances'")
4324
4325     if self.op.instances:
4326       self.wanted_names = []
4327       for name in self.op.instances:
4328         full_name = self.cfg.ExpandInstanceName(name)
4329         if full_name is None:
4330           raise errors.OpPrereqError("Instance '%s' not known" %
4331                                      self.op.instance_name)
4332         self.wanted_names.append(full_name)
4333       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
4334     else:
4335       self.wanted_names = None
4336       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
4337
4338     self.needed_locks[locking.LEVEL_NODE] = []
4339     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4340
4341   def DeclareLocks(self, level):
4342     if level == locking.LEVEL_NODE:
4343       self._LockInstancesNodes()
4344
4345   def CheckPrereq(self):
4346     """Check prerequisites.
4347
4348     This only checks the optional instance list against the existing names.
4349
4350     """
4351     if self.wanted_names is None:
4352       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
4353
4354     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4355                              in self.wanted_names]
4356     return
4357
4358   def _ComputeDiskStatus(self, instance, snode, dev):
4359     """Compute block device status.
4360
4361     """
4362     self.cfg.SetDiskID(dev, instance.primary_node)
4363     dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
4364     if dev.dev_type in constants.LDS_DRBD:
4365       # we change the snode then (otherwise we use the one passed in)
4366       if dev.logical_id[0] == instance.primary_node:
4367         snode = dev.logical_id[1]
4368       else:
4369         snode = dev.logical_id[0]
4370
4371     if snode:
4372       self.cfg.SetDiskID(dev, snode)
4373       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
4374     else:
4375       dev_sstatus = None
4376
4377     if dev.children:
4378       dev_children = [self._ComputeDiskStatus(instance, snode, child)
4379                       for child in dev.children]
4380     else:
4381       dev_children = []
4382
4383     data = {
4384       "iv_name": dev.iv_name,
4385       "dev_type": dev.dev_type,
4386       "logical_id": dev.logical_id,
4387       "physical_id": dev.physical_id,
4388       "pstatus": dev_pstatus,
4389       "sstatus": dev_sstatus,
4390       "children": dev_children,
4391       }
4392
4393     return data
4394
4395   def Exec(self, feedback_fn):
4396     """Gather and return data"""
4397     result = {}
4398     for instance in self.wanted_instances:
4399       remote_info = self.rpc.call_instance_info(instance.primary_node,
4400                                                 instance.name,
4401                                                 instance.hypervisor)
4402       if remote_info and "state" in remote_info:
4403         remote_state = "up"
4404       else:
4405         remote_state = "down"
4406       if instance.status == "down":
4407         config_state = "down"
4408       else:
4409         config_state = "up"
4410
4411       disks = [self._ComputeDiskStatus(instance, None, device)
4412                for device in instance.disks]
4413
4414       idict = {
4415         "name": instance.name,
4416         "config_state": config_state,
4417         "run_state": remote_state,
4418         "pnode": instance.primary_node,
4419         "snodes": instance.secondary_nodes,
4420         "os": instance.os,
4421         "memory": instance.memory,
4422         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4423         "disks": disks,
4424         "vcpus": instance.vcpus,
4425         "hypervisor": instance.hypervisor,
4426         }
4427
4428       htkind = instance.hypervisor
4429       if htkind == constants.HT_XEN_PVM:
4430         idict["kernel_path"] = instance.kernel_path
4431         idict["initrd_path"] = instance.initrd_path
4432
4433       if htkind == constants.HT_XEN_HVM:
4434         idict["hvm_boot_order"] = instance.hvm_boot_order
4435         idict["hvm_acpi"] = instance.hvm_acpi
4436         idict["hvm_pae"] = instance.hvm_pae
4437         idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4438         idict["hvm_nic_type"] = instance.hvm_nic_type
4439         idict["hvm_disk_type"] = instance.hvm_disk_type
4440
4441       if htkind in constants.HTS_REQ_PORT:
4442         if instance.vnc_bind_address is None:
4443           vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4444         else:
4445           vnc_bind_address = instance.vnc_bind_address
4446         if instance.network_port is None:
4447           vnc_console_port = None
4448         elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
4449           vnc_console_port = "%s:%s" % (instance.primary_node,
4450                                        instance.network_port)
4451         elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
4452           vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
4453                                                    instance.network_port,
4454                                                    instance.primary_node)
4455         else:
4456           vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
4457                                         instance.network_port)
4458         idict["vnc_console_port"] = vnc_console_port
4459         idict["vnc_bind_address"] = vnc_bind_address
4460         idict["network_port"] = instance.network_port
4461
4462       result[instance.name] = idict
4463
4464     return result
4465
4466
4467 class LUSetInstanceParams(LogicalUnit):
4468   """Modifies an instances's parameters.
4469
4470   """
4471   HPATH = "instance-modify"
4472   HTYPE = constants.HTYPE_INSTANCE
4473   _OP_REQP = ["instance_name", "hvparams"]
4474   REQ_BGL = False
4475
4476   def ExpandNames(self):
4477     self._ExpandAndLockInstance()
4478     self.needed_locks[locking.LEVEL_NODE] = []
4479     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
4480
4481
4482   def DeclareLocks(self, level):
4483     if level == locking.LEVEL_NODE:
4484       self._LockInstancesNodes()
4485
4486   def BuildHooksEnv(self):
4487     """Build hooks env.
4488
4489     This runs on the master, primary and secondaries.
4490
4491     """
4492     args = dict()
4493     if self.mem:
4494       args['memory'] = self.mem
4495     if self.vcpus:
4496       args['vcpus'] = self.vcpus
4497     if self.do_ip or self.do_bridge or self.mac:
4498       if self.do_ip:
4499         ip = self.ip
4500       else:
4501         ip = self.instance.nics[0].ip
4502       if self.bridge:
4503         bridge = self.bridge
4504       else:
4505         bridge = self.instance.nics[0].bridge
4506       if self.mac:
4507         mac = self.mac
4508       else:
4509         mac = self.instance.nics[0].mac
4510       args['nics'] = [(ip, bridge, mac)]
4511     env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4512     nl = [self.cfg.GetMasterNode(),
4513           self.instance.primary_node] + list(self.instance.secondary_nodes)
4514     return env, nl, nl
4515
4516   def CheckPrereq(self):
4517     """Check prerequisites.
4518
4519     This only checks the instance list against the existing names.
4520
4521     """
4522     # FIXME: all the parameters could be checked before, in ExpandNames, or in
4523     # a separate CheckArguments function, if we implement one, so the operation
4524     # can be aborted without waiting for any lock, should it have an error...
4525     self.mem = getattr(self.op, "mem", None)
4526     self.vcpus = getattr(self.op, "vcpus", None)
4527     self.ip = getattr(self.op, "ip", None)
4528     self.mac = getattr(self.op, "mac", None)
4529     self.bridge = getattr(self.op, "bridge", None)
4530     self.kernel_path = getattr(self.op, "kernel_path", None)
4531     self.initrd_path = getattr(self.op, "initrd_path", None)
4532     self.force = getattr(self.op, "force", None)
4533     all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac]
4534     if all_parms.count(None) == len(all_parms) and not self.op.hvparams:
4535       raise errors.OpPrereqError("No changes submitted")
4536     if self.mem is not None:
4537       try:
4538         self.mem = int(self.mem)
4539       except ValueError, err:
4540         raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4541     if self.vcpus is not None:
4542       try:
4543         self.vcpus = int(self.vcpus)
4544       except ValueError, err:
4545         raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4546     if self.ip is not None:
4547       self.do_ip = True
4548       if self.ip.lower() == "none":
4549         self.ip = None
4550       else:
4551         if not utils.IsValidIP(self.ip):
4552           raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4553     else:
4554       self.do_ip = False
4555     self.do_bridge = (self.bridge is not None)
4556     if self.mac is not None:
4557       if self.cfg.IsMacInUse(self.mac):
4558         raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4559                                    self.mac)
4560       if not utils.IsValidMac(self.mac):
4561         raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4562
4563     # checking the new params on the primary/secondary nodes
4564
4565     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4566     assert self.instance is not None, \
4567       "Cannot retrieve locked instance %s" % self.op.instance_name
4568     pnode = self.instance.primary_node
4569     nodelist = [pnode]
4570     nodelist.extend(instance.secondary_nodes)
4571
4572     if self.op.hvparams:
4573       i_hvdict = copy.deepcopy(instance.hvparams)
4574       for key, val in self.op.hvparams.iteritems():
4575         if val is None:
4576           try:
4577             del i_hvdict[key]
4578           except KeyError:
4579             pass
4580         else:
4581           i_hvdict[key] = val
4582       cluster = self.cfg.GetClusterInfo()
4583       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
4584                                 i_hvdict)
4585       # local check
4586       hypervisor.GetHypervisor(
4587         instance.hypervisor).CheckParameterSyntax(hv_new)
4588       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
4589       self.hv_new = hv_new
4590
4591     self.warn = []
4592     if self.mem is not None and not self.force:
4593       instance_info = self.rpc.call_instance_info(pnode, instance.name,
4594                                                   instance.hypervisor)
4595       nodeinfo = self.rpc.call_node_info(nodelist, self.cfg.GetVGName(),
4596                                          instance.hypervisor)
4597
4598       if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
4599         # Assume the primary node is unreachable and go ahead
4600         self.warn.append("Can't get info from primary node %s" % pnode)
4601       else:
4602         if instance_info:
4603           current_mem = instance_info['memory']
4604         else:
4605           # Assume instance not running
4606           # (there is a slight race condition here, but it's not very probable,
4607           # and we have no other way to check)
4608           current_mem = 0
4609         miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
4610         if miss_mem > 0:
4611           raise errors.OpPrereqError("This change will prevent the instance"
4612                                      " from starting, due to %d MB of memory"
4613                                      " missing on its primary node" % miss_mem)
4614
4615       for node in instance.secondary_nodes:
4616         if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
4617           self.warn.append("Can't get info from secondary node %s" % node)
4618         elif self.mem > nodeinfo[node]['memory_free']:
4619           self.warn.append("Not enough memory to failover instance to"
4620                            " secondary node %s" % node)
4621
4622     return
4623
4624   def Exec(self, feedback_fn):
4625     """Modifies an instance.
4626
4627     All parameters take effect only at the next restart of the instance.
4628     """
4629     # Process here the warnings from CheckPrereq, as we don't have a
4630     # feedback_fn there.
4631     for warn in self.warn:
4632       feedback_fn("WARNING: %s" % warn)
4633
4634     result = []
4635     instance = self.instance
4636     if self.mem:
4637       instance.memory = self.mem
4638       result.append(("mem", self.mem))
4639     if self.vcpus:
4640       instance.vcpus = self.vcpus
4641       result.append(("vcpus",  self.vcpus))
4642     if self.do_ip:
4643       instance.nics[0].ip = self.ip
4644       result.append(("ip", self.ip))
4645     if self.bridge:
4646       instance.nics[0].bridge = self.bridge
4647       result.append(("bridge", self.bridge))
4648     if self.mac:
4649       instance.nics[0].mac = self.mac
4650       result.append(("mac", self.mac))
4651     if self.op.hvparams:
4652       instance.hvparams = self.hv_new
4653       for key, val in self.op.hvparams.iteritems():
4654         result.append(("hv/%s" % key, val))
4655
4656     self.cfg.Update(instance)
4657
4658     return result
4659
4660
4661 class LUQueryExports(NoHooksLU):
4662   """Query the exports list
4663
4664   """
4665   _OP_REQP = ['nodes']
4666   REQ_BGL = False
4667
4668   def ExpandNames(self):
4669     self.needed_locks = {}
4670     self.share_locks[locking.LEVEL_NODE] = 1
4671     if not self.op.nodes:
4672       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4673     else:
4674       self.needed_locks[locking.LEVEL_NODE] = \
4675         _GetWantedNodes(self, self.op.nodes)
4676
4677   def CheckPrereq(self):
4678     """Check prerequisites.
4679
4680     """
4681     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
4682
4683   def Exec(self, feedback_fn):
4684     """Compute the list of all the exported system images.
4685
4686     Returns:
4687       a dictionary with the structure node->(export-list)
4688       where export-list is a list of the instances exported on
4689       that node.
4690
4691     """
4692     return self.rpc.call_export_list(self.nodes)
4693
4694
4695 class LUExportInstance(LogicalUnit):
4696   """Export an instance to an image in the cluster.
4697
4698   """
4699   HPATH = "instance-export"
4700   HTYPE = constants.HTYPE_INSTANCE
4701   _OP_REQP = ["instance_name", "target_node", "shutdown"]
4702   REQ_BGL = False
4703
4704   def ExpandNames(self):
4705     self._ExpandAndLockInstance()
4706     # FIXME: lock only instance primary and destination node
4707     #
4708     # Sad but true, for now we have do lock all nodes, as we don't know where
4709     # the previous export might be, and and in this LU we search for it and
4710     # remove it from its current node. In the future we could fix this by:
4711     #  - making a tasklet to search (share-lock all), then create the new one,
4712     #    then one to remove, after
4713     #  - removing the removal operation altoghether
4714     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4715
4716   def DeclareLocks(self, level):
4717     """Last minute lock declaration."""
4718     # All nodes are locked anyway, so nothing to do here.
4719
4720   def BuildHooksEnv(self):
4721     """Build hooks env.
4722
4723     This will run on the master, primary node and target node.
4724
4725     """
4726     env = {
4727       "EXPORT_NODE": self.op.target_node,
4728       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4729       }
4730     env.update(_BuildInstanceHookEnvByObject(self.instance))
4731     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
4732           self.op.target_node]
4733     return env, nl, nl
4734
4735   def CheckPrereq(self):
4736     """Check prerequisites.
4737
4738     This checks that the instance and node names are valid.
4739
4740     """
4741     instance_name = self.op.instance_name
4742     self.instance = self.cfg.GetInstanceInfo(instance_name)
4743     assert self.instance is not None, \
4744           "Cannot retrieve locked instance %s" % self.op.instance_name
4745
4746     self.dst_node = self.cfg.GetNodeInfo(
4747       self.cfg.ExpandNodeName(self.op.target_node))
4748
4749     assert self.dst_node is not None, \
4750           "Cannot retrieve locked node %s" % self.op.target_node
4751
4752     # instance disk type verification
4753     for disk in self.instance.disks:
4754       if disk.dev_type == constants.LD_FILE:
4755         raise errors.OpPrereqError("Export not supported for instances with"
4756                                    " file-based disks")
4757
4758   def Exec(self, feedback_fn):
4759     """Export an instance to an image in the cluster.
4760
4761     """
4762     instance = self.instance
4763     dst_node = self.dst_node
4764     src_node = instance.primary_node
4765     if self.op.shutdown:
4766       # shutdown the instance, but not the disks
4767       if not self.rpc.call_instance_shutdown(src_node, instance):
4768         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4769                                  (instance.name, src_node))
4770
4771     vgname = self.cfg.GetVGName()
4772
4773     snap_disks = []
4774
4775     try:
4776       for disk in instance.disks:
4777         if disk.iv_name == "sda":
4778           # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4779           new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
4780
4781           if not new_dev_name:
4782             logger.Error("could not snapshot block device %s on node %s" %
4783                          (disk.logical_id[1], src_node))
4784           else:
4785             new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4786                                       logical_id=(vgname, new_dev_name),
4787                                       physical_id=(vgname, new_dev_name),
4788                                       iv_name=disk.iv_name)
4789             snap_disks.append(new_dev)
4790
4791     finally:
4792       if self.op.shutdown and instance.status == "up":
4793         if not self.rpc.call_instance_start(src_node, instance, None):
4794           _ShutdownInstanceDisks(self, instance)
4795           raise errors.OpExecError("Could not start instance")
4796
4797     # TODO: check for size
4798
4799     cluster_name = self.cfg.GetClusterName()
4800     for dev in snap_disks:
4801       if not self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
4802                                       instance, cluster_name):
4803         logger.Error("could not export block device %s from node %s to node %s"
4804                      % (dev.logical_id[1], src_node, dst_node.name))
4805       if not self.rpc.call_blockdev_remove(src_node, dev):
4806         logger.Error("could not remove snapshot block device %s from node %s" %
4807                      (dev.logical_id[1], src_node))
4808
4809     if not self.rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4810       logger.Error("could not finalize export for instance %s on node %s" %
4811                    (instance.name, dst_node.name))
4812
4813     nodelist = self.cfg.GetNodeList()
4814     nodelist.remove(dst_node.name)
4815
4816     # on one-node clusters nodelist will be empty after the removal
4817     # if we proceed the backup would be removed because OpQueryExports
4818     # substitutes an empty list with the full cluster node list.
4819     if nodelist:
4820       exportlist = self.rpc.call_export_list(nodelist)
4821       for node in exportlist:
4822         if instance.name in exportlist[node]:
4823           if not self.rpc.call_export_remove(node, instance.name):
4824             logger.Error("could not remove older export for instance %s"
4825                          " on node %s" % (instance.name, node))
4826
4827
4828 class LURemoveExport(NoHooksLU):
4829   """Remove exports related to the named instance.
4830
4831   """
4832   _OP_REQP = ["instance_name"]
4833   REQ_BGL = False
4834
4835   def ExpandNames(self):
4836     self.needed_locks = {}
4837     # We need all nodes to be locked in order for RemoveExport to work, but we
4838     # don't need to lock the instance itself, as nothing will happen to it (and
4839     # we can remove exports also for a removed instance)
4840     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4841
4842   def CheckPrereq(self):
4843     """Check prerequisites.
4844     """
4845     pass
4846
4847   def Exec(self, feedback_fn):
4848     """Remove any export.
4849
4850     """
4851     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4852     # If the instance was not found we'll try with the name that was passed in.
4853     # This will only work if it was an FQDN, though.
4854     fqdn_warn = False
4855     if not instance_name:
4856       fqdn_warn = True
4857       instance_name = self.op.instance_name
4858
4859     exportlist = self.rpc.call_export_list(self.acquired_locks[
4860       locking.LEVEL_NODE])
4861     found = False
4862     for node in exportlist:
4863       if instance_name in exportlist[node]:
4864         found = True
4865         if not self.rpc.call_export_remove(node, instance_name):
4866           logger.Error("could not remove export for instance %s"
4867                        " on node %s" % (instance_name, node))
4868
4869     if fqdn_warn and not found:
4870       feedback_fn("Export not found. If trying to remove an export belonging"
4871                   " to a deleted instance please use its Fully Qualified"
4872                   " Domain Name.")
4873
4874
4875 class TagsLU(NoHooksLU):
4876   """Generic tags LU.
4877
4878   This is an abstract class which is the parent of all the other tags LUs.
4879
4880   """
4881
4882   def ExpandNames(self):
4883     self.needed_locks = {}
4884     if self.op.kind == constants.TAG_NODE:
4885       name = self.cfg.ExpandNodeName(self.op.name)
4886       if name is None:
4887         raise errors.OpPrereqError("Invalid node name (%s)" %
4888                                    (self.op.name,))
4889       self.op.name = name
4890       self.needed_locks[locking.LEVEL_NODE] = name
4891     elif self.op.kind == constants.TAG_INSTANCE:
4892       name = self.cfg.ExpandInstanceName(self.op.name)
4893       if name is None:
4894         raise errors.OpPrereqError("Invalid instance name (%s)" %
4895                                    (self.op.name,))
4896       self.op.name = name
4897       self.needed_locks[locking.LEVEL_INSTANCE] = name
4898
4899   def CheckPrereq(self):
4900     """Check prerequisites.
4901
4902     """
4903     if self.op.kind == constants.TAG_CLUSTER:
4904       self.target = self.cfg.GetClusterInfo()
4905     elif self.op.kind == constants.TAG_NODE:
4906       self.target = self.cfg.GetNodeInfo(self.op.name)
4907     elif self.op.kind == constants.TAG_INSTANCE:
4908       self.target = self.cfg.GetInstanceInfo(self.op.name)
4909     else:
4910       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4911                                  str(self.op.kind))
4912
4913
4914 class LUGetTags(TagsLU):
4915   """Returns the tags of a given object.
4916
4917   """
4918   _OP_REQP = ["kind", "name"]
4919   REQ_BGL = False
4920
4921   def Exec(self, feedback_fn):
4922     """Returns the tag list.
4923
4924     """
4925     return list(self.target.GetTags())
4926
4927
4928 class LUSearchTags(NoHooksLU):
4929   """Searches the tags for a given pattern.
4930
4931   """
4932   _OP_REQP = ["pattern"]
4933   REQ_BGL = False
4934
4935   def ExpandNames(self):
4936     self.needed_locks = {}
4937
4938   def CheckPrereq(self):
4939     """Check prerequisites.
4940
4941     This checks the pattern passed for validity by compiling it.
4942
4943     """
4944     try:
4945       self.re = re.compile(self.op.pattern)
4946     except re.error, err:
4947       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4948                                  (self.op.pattern, err))
4949
4950   def Exec(self, feedback_fn):
4951     """Returns the tag list.
4952
4953     """
4954     cfg = self.cfg
4955     tgts = [("/cluster", cfg.GetClusterInfo())]
4956     ilist = cfg.GetAllInstancesInfo().values()
4957     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4958     nlist = cfg.GetAllNodesInfo().values()
4959     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4960     results = []
4961     for path, target in tgts:
4962       for tag in target.GetTags():
4963         if self.re.search(tag):
4964           results.append((path, tag))
4965     return results
4966
4967
4968 class LUAddTags(TagsLU):
4969   """Sets a tag on a given object.
4970
4971   """
4972   _OP_REQP = ["kind", "name", "tags"]
4973   REQ_BGL = False
4974
4975   def CheckPrereq(self):
4976     """Check prerequisites.
4977
4978     This checks the type and length of the tag name and value.
4979
4980     """
4981     TagsLU.CheckPrereq(self)
4982     for tag in self.op.tags:
4983       objects.TaggableObject.ValidateTag(tag)
4984
4985   def Exec(self, feedback_fn):
4986     """Sets the tag.
4987
4988     """
4989     try:
4990       for tag in self.op.tags:
4991         self.target.AddTag(tag)
4992     except errors.TagError, err:
4993       raise errors.OpExecError("Error while setting tag: %s" % str(err))
4994     try:
4995       self.cfg.Update(self.target)
4996     except errors.ConfigurationError:
4997       raise errors.OpRetryError("There has been a modification to the"
4998                                 " config file and the operation has been"
4999                                 " aborted. Please retry.")
5000
5001
5002 class LUDelTags(TagsLU):
5003   """Delete a list of tags from a given object.
5004
5005   """
5006   _OP_REQP = ["kind", "name", "tags"]
5007   REQ_BGL = False
5008
5009   def CheckPrereq(self):
5010     """Check prerequisites.
5011
5012     This checks that we have the given tag.
5013
5014     """
5015     TagsLU.CheckPrereq(self)
5016     for tag in self.op.tags:
5017       objects.TaggableObject.ValidateTag(tag)
5018     del_tags = frozenset(self.op.tags)
5019     cur_tags = self.target.GetTags()
5020     if not del_tags <= cur_tags:
5021       diff_tags = del_tags - cur_tags
5022       diff_names = ["'%s'" % tag for tag in diff_tags]
5023       diff_names.sort()
5024       raise errors.OpPrereqError("Tag(s) %s not found" %
5025                                  (",".join(diff_names)))
5026
5027   def Exec(self, feedback_fn):
5028     """Remove the tag from the object.
5029
5030     """
5031     for tag in self.op.tags:
5032       self.target.RemoveTag(tag)
5033     try:
5034       self.cfg.Update(self.target)
5035     except errors.ConfigurationError:
5036       raise errors.OpRetryError("There has been a modification to the"
5037                                 " config file and the operation has been"
5038                                 " aborted. Please retry.")
5039
5040
5041 class LUTestDelay(NoHooksLU):
5042   """Sleep for a specified amount of time.
5043
5044   This LU sleeps on the master and/or nodes for a specified amount of
5045   time.
5046
5047   """
5048   _OP_REQP = ["duration", "on_master", "on_nodes"]
5049   REQ_BGL = False
5050
5051   def ExpandNames(self):
5052     """Expand names and set required locks.
5053
5054     This expands the node list, if any.
5055
5056     """
5057     self.needed_locks = {}
5058     if self.op.on_nodes:
5059       # _GetWantedNodes can be used here, but is not always appropriate to use
5060       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
5061       # more information.
5062       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
5063       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
5064
5065   def CheckPrereq(self):
5066     """Check prerequisites.
5067
5068     """
5069
5070   def Exec(self, feedback_fn):
5071     """Do the actual sleep.
5072
5073     """
5074     if self.op.on_master:
5075       if not utils.TestDelay(self.op.duration):
5076         raise errors.OpExecError("Error during master delay test")
5077     if self.op.on_nodes:
5078       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
5079       if not result:
5080         raise errors.OpExecError("Complete failure from rpc call")
5081       for node, node_result in result.items():
5082         if not node_result:
5083           raise errors.OpExecError("Failure during rpc call to node %s,"
5084                                    " result: %s" % (node, node_result))
5085
5086
5087 class IAllocator(object):
5088   """IAllocator framework.
5089
5090   An IAllocator instance has three sets of attributes:
5091     - cfg that is needed to query the cluster
5092     - input data (all members of the _KEYS class attribute are required)
5093     - four buffer attributes (in|out_data|text), that represent the
5094       input (to the external script) in text and data structure format,
5095       and the output from it, again in two formats
5096     - the result variables from the script (success, info, nodes) for
5097       easy usage
5098
5099   """
5100   _ALLO_KEYS = [
5101     "mem_size", "disks", "disk_template",
5102     "os", "tags", "nics", "vcpus",
5103     ]
5104   _RELO_KEYS = [
5105     "relocate_from",
5106     ]
5107
5108   def __init__(self, lu, mode, name, **kwargs):
5109     self.lu = lu
5110     # init buffer variables
5111     self.in_text = self.out_text = self.in_data = self.out_data = None
5112     # init all input fields so that pylint is happy
5113     self.mode = mode
5114     self.name = name
5115     self.mem_size = self.disks = self.disk_template = None
5116     self.os = self.tags = self.nics = self.vcpus = None
5117     self.relocate_from = None
5118     # computed fields
5119     self.required_nodes = None
5120     # init result fields
5121     self.success = self.info = self.nodes = None
5122     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5123       keyset = self._ALLO_KEYS
5124     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
5125       keyset = self._RELO_KEYS
5126     else:
5127       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
5128                                    " IAllocator" % self.mode)
5129     for key in kwargs:
5130       if key not in keyset:
5131         raise errors.ProgrammerError("Invalid input parameter '%s' to"
5132                                      " IAllocator" % key)
5133       setattr(self, key, kwargs[key])
5134     for key in keyset:
5135       if key not in kwargs:
5136         raise errors.ProgrammerError("Missing input parameter '%s' to"
5137                                      " IAllocator" % key)
5138     self._BuildInputData()
5139
5140   def _ComputeClusterData(self):
5141     """Compute the generic allocator input data.
5142
5143     This is the data that is independent of the actual operation.
5144
5145     """
5146     cfg = self.lu.cfg
5147     cluster_info = cfg.GetClusterInfo()
5148     # cluster data
5149     data = {
5150       "version": 1,
5151       "cluster_name": cfg.GetClusterName(),
5152       "cluster_tags": list(cluster_info.GetTags()),
5153       "enable_hypervisors": list(cluster_info.enabled_hypervisors),
5154       # we don't have job IDs
5155       }
5156
5157     i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
5158
5159     # node data
5160     node_results = {}
5161     node_list = cfg.GetNodeList()
5162     # FIXME: here we have only one hypervisor information, but
5163     # instance can belong to different hypervisors
5164     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
5165                                            cfg.GetHypervisorType())
5166     for nname in node_list:
5167       ninfo = cfg.GetNodeInfo(nname)
5168       if nname not in node_data or not isinstance(node_data[nname], dict):
5169         raise errors.OpExecError("Can't get data for node %s" % nname)
5170       remote_info = node_data[nname]
5171       for attr in ['memory_total', 'memory_free', 'memory_dom0',
5172                    'vg_size', 'vg_free', 'cpu_total']:
5173         if attr not in remote_info:
5174           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
5175                                    (nname, attr))
5176         try:
5177           remote_info[attr] = int(remote_info[attr])
5178         except ValueError, err:
5179           raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
5180                                    " %s" % (nname, attr, str(err)))
5181       # compute memory used by primary instances
5182       i_p_mem = i_p_up_mem = 0
5183       for iinfo in i_list:
5184         if iinfo.primary_node == nname:
5185           i_p_mem += iinfo.memory
5186           if iinfo.status == "up":
5187             i_p_up_mem += iinfo.memory
5188
5189       # compute memory used by instances
5190       pnr = {
5191         "tags": list(ninfo.GetTags()),
5192         "total_memory": remote_info['memory_total'],
5193         "reserved_memory": remote_info['memory_dom0'],
5194         "free_memory": remote_info['memory_free'],
5195         "i_pri_memory": i_p_mem,
5196         "i_pri_up_memory": i_p_up_mem,
5197         "total_disk": remote_info['vg_size'],
5198         "free_disk": remote_info['vg_free'],
5199         "primary_ip": ninfo.primary_ip,
5200         "secondary_ip": ninfo.secondary_ip,
5201         "total_cpus": remote_info['cpu_total'],
5202         }
5203       node_results[nname] = pnr
5204     data["nodes"] = node_results
5205
5206     # instance data
5207     instance_data = {}
5208     for iinfo in i_list:
5209       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
5210                   for n in iinfo.nics]
5211       pir = {
5212         "tags": list(iinfo.GetTags()),
5213         "should_run": iinfo.status == "up",
5214         "vcpus": iinfo.vcpus,
5215         "memory": iinfo.memory,
5216         "os": iinfo.os,
5217         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
5218         "nics": nic_data,
5219         "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
5220         "disk_template": iinfo.disk_template,
5221         "hypervisor": iinfo.hypervisor,
5222         }
5223       instance_data[iinfo.name] = pir
5224
5225     data["instances"] = instance_data
5226
5227     self.in_data = data
5228
5229   def _AddNewInstance(self):
5230     """Add new instance data to allocator structure.
5231
5232     This in combination with _AllocatorGetClusterData will create the
5233     correct structure needed as input for the allocator.
5234
5235     The checks for the completeness of the opcode must have already been
5236     done.
5237
5238     """
5239     data = self.in_data
5240     if len(self.disks) != 2:
5241       raise errors.OpExecError("Only two-disk configurations supported")
5242
5243     disk_space = _ComputeDiskSize(self.disk_template,
5244                                   self.disks[0]["size"], self.disks[1]["size"])
5245
5246     if self.disk_template in constants.DTS_NET_MIRROR:
5247       self.required_nodes = 2
5248     else:
5249       self.required_nodes = 1
5250     request = {
5251       "type": "allocate",
5252       "name": self.name,
5253       "disk_template": self.disk_template,
5254       "tags": self.tags,
5255       "os": self.os,
5256       "vcpus": self.vcpus,
5257       "memory": self.mem_size,
5258       "disks": self.disks,
5259       "disk_space_total": disk_space,
5260       "nics": self.nics,
5261       "required_nodes": self.required_nodes,
5262       }
5263     data["request"] = request
5264
5265   def _AddRelocateInstance(self):
5266     """Add relocate instance data to allocator structure.
5267
5268     This in combination with _IAllocatorGetClusterData will create the
5269     correct structure needed as input for the allocator.
5270
5271     The checks for the completeness of the opcode must have already been
5272     done.
5273
5274     """
5275     instance = self.lu.cfg.GetInstanceInfo(self.name)
5276     if instance is None:
5277       raise errors.ProgrammerError("Unknown instance '%s' passed to"
5278                                    " IAllocator" % self.name)
5279
5280     if instance.disk_template not in constants.DTS_NET_MIRROR:
5281       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
5282
5283     if len(instance.secondary_nodes) != 1:
5284       raise errors.OpPrereqError("Instance has not exactly one secondary node")
5285
5286     self.required_nodes = 1
5287
5288     disk_space = _ComputeDiskSize(instance.disk_template,
5289                                   instance.disks[0].size,
5290                                   instance.disks[1].size)
5291
5292     request = {
5293       "type": "relocate",
5294       "name": self.name,
5295       "disk_space_total": disk_space,
5296       "required_nodes": self.required_nodes,
5297       "relocate_from": self.relocate_from,
5298       }
5299     self.in_data["request"] = request
5300
5301   def _BuildInputData(self):
5302     """Build input data structures.
5303
5304     """
5305     self._ComputeClusterData()
5306
5307     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
5308       self._AddNewInstance()
5309     else:
5310       self._AddRelocateInstance()
5311
5312     self.in_text = serializer.Dump(self.in_data)
5313
5314   def Run(self, name, validate=True, call_fn=None):
5315     """Run an instance allocator and return the results.
5316
5317     """
5318     if call_fn is None:
5319       call_fn = self.lu.rpc.call_iallocator_runner
5320     data = self.in_text
5321
5322     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
5323
5324     if not isinstance(result, (list, tuple)) or len(result) != 4:
5325       raise errors.OpExecError("Invalid result from master iallocator runner")
5326
5327     rcode, stdout, stderr, fail = result
5328
5329     if rcode == constants.IARUN_NOTFOUND:
5330       raise errors.OpExecError("Can't find allocator '%s'" % name)
5331     elif rcode == constants.IARUN_FAILURE:
5332       raise errors.OpExecError("Instance allocator call failed: %s,"
5333                                " output: %s" % (fail, stdout+stderr))
5334     self.out_text = stdout
5335     if validate:
5336       self._ValidateResult()
5337
5338   def _ValidateResult(self):
5339     """Process the allocator results.
5340
5341     This will process and if successful save the result in
5342     self.out_data and the other parameters.
5343
5344     """
5345     try:
5346       rdict = serializer.Load(self.out_text)
5347     except Exception, err:
5348       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
5349
5350     if not isinstance(rdict, dict):
5351       raise errors.OpExecError("Can't parse iallocator results: not a dict")
5352
5353     for key in "success", "info", "nodes":
5354       if key not in rdict:
5355         raise errors.OpExecError("Can't parse iallocator results:"
5356                                  " missing key '%s'" % key)
5357       setattr(self, key, rdict[key])
5358
5359     if not isinstance(rdict["nodes"], list):
5360       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
5361                                " is not a list")
5362     self.out_data = rdict
5363
5364
5365 class LUTestAllocator(NoHooksLU):
5366   """Run allocator tests.
5367
5368   This LU runs the allocator tests
5369
5370   """
5371   _OP_REQP = ["direction", "mode", "name"]
5372
5373   def CheckPrereq(self):
5374     """Check prerequisites.
5375
5376     This checks the opcode parameters depending on the director and mode test.
5377
5378     """
5379     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5380       for attr in ["name", "mem_size", "disks", "disk_template",
5381                    "os", "tags", "nics", "vcpus"]:
5382         if not hasattr(self.op, attr):
5383           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
5384                                      attr)
5385       iname = self.cfg.ExpandInstanceName(self.op.name)
5386       if iname is not None:
5387         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
5388                                    iname)
5389       if not isinstance(self.op.nics, list):
5390         raise errors.OpPrereqError("Invalid parameter 'nics'")
5391       for row in self.op.nics:
5392         if (not isinstance(row, dict) or
5393             "mac" not in row or
5394             "ip" not in row or
5395             "bridge" not in row):
5396           raise errors.OpPrereqError("Invalid contents of the"
5397                                      " 'nics' parameter")
5398       if not isinstance(self.op.disks, list):
5399         raise errors.OpPrereqError("Invalid parameter 'disks'")
5400       if len(self.op.disks) != 2:
5401         raise errors.OpPrereqError("Only two-disk configurations supported")
5402       for row in self.op.disks:
5403         if (not isinstance(row, dict) or
5404             "size" not in row or
5405             not isinstance(row["size"], int) or
5406             "mode" not in row or
5407             row["mode"] not in ['r', 'w']):
5408           raise errors.OpPrereqError("Invalid contents of the"
5409                                      " 'disks' parameter")
5410     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5411       if not hasattr(self.op, "name"):
5412         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5413       fname = self.cfg.ExpandInstanceName(self.op.name)
5414       if fname is None:
5415         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5416                                    self.op.name)
5417       self.op.name = fname
5418       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5419     else:
5420       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5421                                  self.op.mode)
5422
5423     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5424       if not hasattr(self.op, "allocator") or self.op.allocator is None:
5425         raise errors.OpPrereqError("Missing allocator name")
5426     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5427       raise errors.OpPrereqError("Wrong allocator test '%s'" %
5428                                  self.op.direction)
5429
5430   def Exec(self, feedback_fn):
5431     """Run the allocator test.
5432
5433     """
5434     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5435       ial = IAllocator(self,
5436                        mode=self.op.mode,
5437                        name=self.op.name,
5438                        mem_size=self.op.mem_size,
5439                        disks=self.op.disks,
5440                        disk_template=self.op.disk_template,
5441                        os=self.op.os,
5442                        tags=self.op.tags,
5443                        nics=self.op.nics,
5444                        vcpus=self.op.vcpus,
5445                        )
5446     else:
5447       ial = IAllocator(self,
5448                        mode=self.op.mode,
5449                        name=self.op.name,
5450                        relocate_from=list(self.relocate_from),
5451                        )
5452
5453     if self.op.direction == constants.IALLOCATOR_DIR_IN:
5454       result = ial.in_text
5455     else:
5456       ial.Run(self.op.allocator, validate=False)
5457       result = ial.out_text
5458     return result